From a3a62b6dbb330019384b8ffa4dcc20e9a37acc1f Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Fri, 21 May 2021 10:24:48 +1000 Subject: [PATCH] Implement ClosedGroupPollerV2 --- .../pollers/ClosedGroupPollerV2.kt | 50 +++++++++++++++++-- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt index 8dfcf78ffc..1466ed9dd1 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt @@ -1,10 +1,24 @@ package org.session.libsession.messaging.sending_receiving.pollers import nl.komponents.kovenant.Promise +import nl.komponents.kovenant.functional.bind +import nl.komponents.kovenant.functional.map import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.jobs.JobQueue +import org.session.libsession.messaging.jobs.MessageReceiveJob +import org.session.libsession.snode.SnodeAPI +import org.session.libsignal.crypto.getRandomElementOrNull +import org.session.libsignal.utilities.Log +import org.session.libsignal.utilities.successBackground +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit +import kotlin.math.min class ClosedGroupPollerV2 { + private val executorService = Executors.newScheduledThreadPool(4) private var isPolling = mutableMapOf() + private var futures = mutableMapOf>() private fun isPolling(groupPublicKey: String): Boolean { return isPolling[groupPublicKey] ?: false @@ -39,7 +53,7 @@ class ClosedGroupPollerV2 { } fun stopPolling(groupPublicKey: String) { - // TODO: Invalidate future + futures[groupPublicKey]?.cancel(false) isPolling[groupPublicKey] = false } @@ -56,11 +70,39 @@ class ClosedGroupPollerV2 { if (!isPolling(groupPublicKey)) { return } // Get the received date of the last message in the thread. If we don't have any messages yet, pick some // reasonable fake time interval to use instead. - + val timeSinceLastMessage = 5 * 60 * 1000 // TODO: Implement + val minPollInterval = Companion.minPollInterval + val limit = 12 * 60 * 60 * 1000 + val a = (Companion.maxPollInterval - minPollInterval).toDouble() / limit.toDouble() + val nextPollInterval = a * min(timeSinceLastMessage, limit) + minPollInterval + Log.d("Loki", "Next poll interval for closed group with public key: $groupPublicKey is ${nextPollInterval / 1000} s.") + executorService?.schedule({ + poll(groupPublicKey).success { + pollRecursively(groupPublicKey) + }.fail { + // The error is logged in poll(_:) + pollRecursively(groupPublicKey) + } + }, nextPollInterval.toLong(), TimeUnit.MILLISECONDS) } private fun poll(groupPublicKey: String): Promise { - return Promise.ofFail(InsufficientSnodesException()) + if (!isPolling(groupPublicKey)) { return Promise.of(Unit) } + val promise = SnodeAPI.getSwarm(groupPublicKey).bind { swarm -> + val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure + if (!isPolling(groupPublicKey)) { throw PollingCanceledException() } + SnodeAPI.getRawMessages(snode, groupPublicKey).map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey) } + } + promise.success { envelopes -> + if (!isPolling(groupPublicKey)) { return@success } + envelopes.forEach { envelope -> + val job = MessageReceiveJob(envelope.toByteArray()) + JobQueue.shared.add(job) + } + } + promise.fail { + Log.d("Loki", "Polling failed for closed group with public key: $groupPublicKey due to error: $it.") + } + return promise.map { } } - }