diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index b39dd3c5b0..8de298955c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -191,16 +191,16 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc // Set application UI mode (day/night theme) to the user selected one. UiModeUtilities.setupUiModeToUserSelected(this); // ======== - initializeJobManager(); initializeExpiringMessageManager(); initializeTypingStatusRepository(); initializeTypingStatusSender(); initializeReadReceiptManager(); initializeProfileManager(); initializePeriodicTasks(); + SSKEnvironment.Companion.configure(getTypingStatusRepository(), getReadReceiptManager(), getProfileManager(), messageNotifier, getExpiringMessageManager()); + initializeJobManager(); initializeWebRtc(); initializeBlobProvider(); - SSKEnvironment.Companion.configure(getTypingStatusRepository(), getReadReceiptManager(), getProfileManager(), messageNotifier, getExpiringMessageManager()); } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 367223411f..177006deb1 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -94,6 +94,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, override fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List, groupPublicKey: String?, openGroupID: String?, attachments: List): Long? { var messageID: Long? = null val senderAddress = Address.fromSerialized(message.sender!!) + val isUserSender = message.sender!! == getUserPublicKey() val group: Optional = when { openGroupID != null -> Optional.of(SignalServiceGroup(openGroupID.toByteArray(), SignalServiceGroup.GroupType.PUBLIC_CHAT)) groupPublicKey != null -> { @@ -105,7 +106,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, val pointerAttachments = attachments.mapNotNull { it.toSignalAttachment() } - val targetAddress = if (senderAddress.serialize() == getUserPublicKey() && message.syncTarget != null) { + val targetAddress = if (isUserSender && !message.syncTarget.isNullOrEmpty()) { Address.fromSerialized(message.syncTarget!!) } else if (group.isPresent) { Address.fromSerialized(GroupUtil.getEncodedId(group.get())) @@ -130,7 +131,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, } val mediaMessage = IncomingMediaMessage.from(message, senderAddress, targetRecipient.expireMessages * 1000L, group, signalServiceAttachments, quote, linkPreviews) mmsDatabase.beginTransaction() - mmsDatabase.insertSecureDecryptedMessageInbox(mediaMessage, message.threadID ?: -1, message.sentTimestamp ?: 0) + mmsDatabase.insertSecureDecryptedMessageInbox(mediaMessage, message.threadID ?: -1, message.receivedTimestamp ?: 0) } if (insertResult.isPresent) { mmsDatabase.setTransactionSuccessful() @@ -145,7 +146,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, } else { val textMessage = IncomingTextMessage.from(message, senderAddress, group, targetRecipient.expireMessages * 1000L) val encrypted = IncomingEncryptedMessage(textMessage, textMessage.messageBody) - smsDatabase.insertMessageInbox(encrypted, message.sentTimestamp ?: 0) + smsDatabase.insertMessageInbox(encrypted, message.receivedTimestamp ?: 0) } insertResult.orNull()?.let { result -> messageID = result.messageId @@ -474,7 +475,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, override fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long { val database = DatabaseFactory.getThreadDatabase(context) if (!openGroupID.isNullOrEmpty()) { - val recipient = Recipient.from(context, Address.fromSerialized(openGroupID), false) + val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())), false) return database.getOrCreateThreadIdFor(recipient) } else if (!groupPublicKey.isNullOrEmpty()) { val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false) diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt index aca0b020f7..c069cdfe26 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt @@ -17,12 +17,14 @@ import org.thoughtcrime.securesms.database.DatabaseContentProviders import org.thoughtcrime.securesms.database.DatabaseFactory import org.thoughtcrime.securesms.groups.GroupManager import org.thoughtcrime.securesms.util.BitmapUtil +import java.util.concurrent.Executors class PublicChatManager(private val context: Context) { private var chats = mutableMapOf() private val pollers = mutableMapOf() private val observers = mutableMapOf() private var isPolling = false + private val executorService = Executors.newScheduledThreadPool(16) public fun areAllCaughtUp(): Boolean { var areAllCaughtUp = true @@ -37,7 +39,7 @@ class PublicChatManager(private val context: Context) { public fun markAllAsNotCaughtUp() { refreshChatsAndPollers() for ((threadID, chat) in chats) { - val poller = pollers[threadID] ?: OpenGroupPoller(chat) + val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService) poller.isCaughtUp = false } } @@ -46,7 +48,7 @@ class PublicChatManager(private val context: Context) { refreshChatsAndPollers() for ((threadId, chat) in chats) { - val poller = pollers[threadId] ?: OpenGroupPoller(chat) + val poller = pollers[threadId] ?: OpenGroupPoller(chat, executorService) poller.startIfNeeded() listenToThreadDeletion(threadId) if (!pollers.containsKey(threadId)) { pollers[threadId] = poller } @@ -57,6 +59,7 @@ class PublicChatManager(private val context: Context) { public fun stopPollers() { pollers.values.forEach { it.stop() } isPolling = false + executorService.shutdown() } //TODO Declare a specific type of checked exception instead of "Exception". diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/protocol/MultiDeviceProtocol.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/protocol/MultiDeviceProtocol.kt index 69dfc70126..41b3de72f8 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/protocol/MultiDeviceProtocol.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/protocol/MultiDeviceProtocol.kt @@ -126,6 +126,5 @@ object MultiDeviceProtocol { threadDatabase.notifyUpdatedFromConfig() } } - // TODO: handle new configuration message fields or handle in new pipeline } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt index f500ea4841..9f99aff6bf 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt @@ -62,7 +62,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) // DECRYPTION // Assume we're retrieving an attachment for an open group server if the digest is not set - val stream = if (attachment.digest == null || attachment.key == null) FileInputStream(tempFile) + val stream = if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile) else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest) messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt index 89cc060ec1..e1b7e652ef 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt @@ -7,6 +7,7 @@ import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.utilities.GroupUtil import org.session.libsignal.service.internal.push.PushTransportDetails import org.session.libsignal.service.internal.push.SignalServiceProtos +import org.session.libsignal.utilities.logging.Log object MessageReceiver { @@ -121,7 +122,8 @@ object MessageReceiver { message.sender = sender message.recipient = userPublicKey message.sentTimestamp = envelope.timestamp - message.receivedTimestamp = System.currentTimeMillis() + message.receivedTimestamp = if (envelope.hasServerTimestamp()) envelope.serverTimestamp else System.currentTimeMillis() + Log.d("Loki", "time: ${envelope.timestamp}, sent: ${envelope.serverTimestamp}") message.groupPublicKey = groupPublicKey message.openGroupServerMessageID = openGroupServerID // Validate diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index 608a225089..a6d233b482 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -1,8 +1,6 @@ package org.session.libsession.messaging.sending_receiving.pollers -import android.os.Handler import com.google.protobuf.ByteString - import nl.komponents.kovenant.Promise import nl.komponents.kovenant.deferred import org.session.libsession.messaging.MessagingConfiguration @@ -11,61 +9,37 @@ import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.opengroups.OpenGroup import org.session.libsession.messaging.opengroups.OpenGroupAPI import org.session.libsession.messaging.opengroups.OpenGroupMessage - -import org.session.libsignal.utilities.successBackground -import org.session.libsignal.utilities.logging.Log import org.session.libsignal.service.internal.push.SignalServiceProtos.* - +import org.session.libsignal.utilities.logging.Log +import org.session.libsignal.utilities.successBackground import java.util.* +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit + +class OpenGroupPoller(private val openGroup: OpenGroup, private val executorService: ScheduledExecutorService? = null) { -class OpenGroupPoller(private val openGroup: OpenGroup) { - private val handler by lazy { Handler() } private var hasStarted = false - private var isPollOngoing = false - public var isCaughtUp = false + @Volatile private var isPollOngoing = false + var isCaughtUp = false + + private val cancellableFutures = mutableListOf>() // region Convenience private val userHexEncodedPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: "" - private var displayNameUpdatees = setOf() + private var displayNameUpdates = setOf() // endregion // region Tasks - private val pollForNewMessagesTask = object : Runnable { - - override fun run() { - pollForNewMessages() - handler.postDelayed(this, pollForNewMessagesInterval) - } - } - - private val pollForDeletedMessagesTask = object : Runnable { - - override fun run() { - pollForDeletedMessages() - handler.postDelayed(this, pollForDeletedMessagesInterval) - } - } - - private val pollForModeratorsTask = object : Runnable { - - override fun run() { - pollForModerators() - handler.postDelayed(this, pollForModeratorsInterval) - } - } - - private val pollForDisplayNamesTask = object : Runnable { - - override fun run() { - pollForDisplayNames() - handler.postDelayed(this, pollForDisplayNamesInterval) - } - } + private val pollForNewMessagesTask = Runnable { pollForNewMessages() } + private val pollForDeletedMessagesTask = Runnable { pollForDeletedMessages() } + private val pollForModeratorsTask = Runnable { pollForModerators() } + private val pollForDisplayNamesTask = Runnable { pollForDisplayNames() } // endregion // region Settings companion object { - private val pollForNewMessagesInterval: Long = 4 * 1000 + private val pollForNewMessagesInterval: Long = 10 * 1000 private val pollForDeletedMessagesInterval: Long = 60 * 1000 private val pollForModeratorsInterval: Long = 10 * 60 * 1000 private val pollForDisplayNamesInterval: Long = 60 * 1000 @@ -74,19 +48,21 @@ class OpenGroupPoller(private val openGroup: OpenGroup) { // region Lifecycle fun startIfNeeded() { - if (hasStarted) return - pollForNewMessagesTask.run() - pollForDeletedMessagesTask.run() - pollForModeratorsTask.run() - pollForDisplayNamesTask.run() + if (hasStarted || executorService == null) return + cancellableFutures += listOf( + executorService.scheduleAtFixedRate(pollForNewMessagesTask,0, pollForNewMessagesInterval, TimeUnit.MILLISECONDS), + executorService.scheduleAtFixedRate(pollForDeletedMessagesTask,0, pollForDeletedMessagesInterval, TimeUnit.MILLISECONDS), + executorService.scheduleAtFixedRate(pollForModeratorsTask,0, pollForModeratorsInterval, TimeUnit.MILLISECONDS), + executorService.scheduleAtFixedRate(pollForDisplayNamesTask,0, pollForDisplayNamesInterval, TimeUnit.MILLISECONDS) + ) hasStarted = true } fun stop() { - handler.removeCallbacks(pollForNewMessagesTask) - handler.removeCallbacks(pollForDeletedMessagesTask) - handler.removeCallbacks(pollForModeratorsTask) - handler.removeCallbacks(pollForDisplayNamesTask) + cancellableFutures.forEach { future -> + future.cancel(false) + } + cancellableFutures.clear() hasStarted = false } // endregion @@ -96,120 +72,127 @@ class OpenGroupPoller(private val openGroup: OpenGroup) { return pollForNewMessages(false) } - fun pollForNewMessages(isBackgroundPoll: Boolean): Promise { + private fun pollForNewMessages(isBackgroundPoll: Boolean): Promise { if (isPollOngoing) { return Promise.of(Unit) } isPollOngoing = true val deferred = deferred() // Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below OpenGroupAPI.getMessages(openGroup.channel, openGroup.server).successBackground { messages -> // Process messages in the background + Log.d("Loki", "received ${messages.size} messages") messages.forEach { message -> - val senderPublicKey = message.senderPublicKey - val wasSentByCurrentUser = (senderPublicKey == userHexEncodedPublicKey) - fun generateDisplayName(rawDisplayName: String): String { - return "${rawDisplayName} (${senderPublicKey.takeLast(8)})" - } - val senderDisplayName = MessagingConfiguration.shared.storage.getOpenGroupDisplayName(senderPublicKey, openGroup.channel, openGroup.server) ?: generateDisplayName("Anonymous") - val id = openGroup.id.toByteArray() - // Main message - val dataMessageProto = DataMessage.newBuilder() - val body = if (message.body == message.timestamp.toString()) { "" } else { message.body } - dataMessageProto.setBody(body) - dataMessageProto.setTimestamp(message.timestamp) - // Attachments - val attachmentProtos = message.attachments.mapNotNull { attachment -> - if (attachment.kind != OpenGroupMessage.Attachment.Kind.Attachment) { return@mapNotNull null } - val attachmentProto = AttachmentPointer.newBuilder() - attachmentProto.setId(attachment.serverID) - attachmentProto.setContentType(attachment.contentType) - attachmentProto.setSize(attachment.size) - attachmentProto.setFileName(attachment.fileName) - attachmentProto.setFlags(attachment.flags) - attachmentProto.setWidth(attachment.width) - attachmentProto.setHeight(attachment.height) - attachment.caption.let { attachmentProto.setCaption(it) } - attachmentProto.setUrl(attachment.url) - attachmentProto.build() - } - dataMessageProto.addAllAttachments(attachmentProtos) - // Link preview - val linkPreview = message.attachments.firstOrNull { it.kind == OpenGroupMessage.Attachment.Kind.LinkPreview } - if (linkPreview != null) { - val linkPreviewProto = DataMessage.Preview.newBuilder() - linkPreviewProto.setUrl(linkPreview.linkPreviewURL!!) - linkPreviewProto.setTitle(linkPreview.linkPreviewTitle!!) - val attachmentProto = AttachmentPointer.newBuilder() - attachmentProto.setId(linkPreview.serverID) - attachmentProto.setContentType(linkPreview.contentType) - attachmentProto.setSize(linkPreview.size) - attachmentProto.setFileName(linkPreview.fileName) - attachmentProto.setFlags(linkPreview.flags) - attachmentProto.setWidth(linkPreview.width) - attachmentProto.setHeight(linkPreview.height) - linkPreview.caption.let { attachmentProto.setCaption(it) } - attachmentProto.setUrl(linkPreview.url) - linkPreviewProto.setImage(attachmentProto.build()) - dataMessageProto.addPreview(linkPreviewProto.build()) - } - // Quote - val quote = message.quote - if (quote != null) { - val quoteProto = DataMessage.Quote.newBuilder() - quoteProto.setId(quote.quotedMessageTimestamp) - quoteProto.setAuthor(quote.quoteePublicKey) - if (quote.quotedMessageBody != quote.quotedMessageTimestamp.toString()) { quoteProto.setText(quote.quotedMessageBody) } - dataMessageProto.setQuote(quoteProto.build()) - } - val messageServerID = message.serverID - // Profile - val profileProto = DataMessage.LokiProfile.newBuilder() - profileProto.setDisplayName(message.displayName) - val profilePicture = message.profilePicture - if (profilePicture != null) { - profileProto.setProfilePicture(profilePicture.url) - dataMessageProto.setProfileKey(ByteString.copyFrom(profilePicture.profileKey)) - } - dataMessageProto.setProfile(profileProto.build()) - /* TODO: the signal service proto needs to be synced with iOS - // Open group info - if (messageServerID != null) { - val openGroupProto = PublicChatInfo.newBuilder() - openGroupProto.setServerID(messageServerID) - dataMessageProto.setPublicChatInfo(openGroupProto.build()) - } - */ - // Signal group context - val groupProto = GroupContext.newBuilder() - groupProto.setId(ByteString.copyFrom(id)) - groupProto.setType(GroupContext.Type.DELIVER) - groupProto.setName(openGroup.displayName) - dataMessageProto.setGroup(groupProto.build()) - // Sync target - if (wasSentByCurrentUser) { - dataMessageProto.setSyncTarget(openGroup.id) - } - // Content - val content = Content.newBuilder() - content.setDataMessage(dataMessageProto.build()) - // Envelope - val builder = Envelope.newBuilder() - builder.type = Envelope.Type.UNIDENTIFIED_SENDER - builder.source = senderPublicKey - builder.sourceDevice = 1 - builder.setContent(content.build().toByteString()) - builder.serverTimestamp = message.serverTimestamp - val envelope = builder.build() - val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, messageServerID, openGroup.id) - if (isBackgroundPoll) { - job.executeAsync().success { deferred.resolve(Unit) }.fail { deferred.resolve(Unit) } - // The promise is just used to keep track of when we're done - } else { - JobQueue.shared.add(job) - deferred.resolve(Unit) + try { + val senderPublicKey = message.senderPublicKey + fun generateDisplayName(rawDisplayName: String): String { + return "${rawDisplayName} (${senderPublicKey.takeLast(8)})" + } + val senderDisplayName = MessagingConfiguration.shared.storage.getOpenGroupDisplayName(senderPublicKey, openGroup.channel, openGroup.server) ?: generateDisplayName("Anonymous") + val id = openGroup.id.toByteArray() + // Main message + val dataMessageProto = DataMessage.newBuilder() + val body = if (message.body == message.timestamp.toString()) { "" } else { message.body } + dataMessageProto.setBody(body) + dataMessageProto.setTimestamp(message.timestamp) + // Attachments + val attachmentProtos = message.attachments.mapNotNull { attachment -> + try { + if (attachment.kind != OpenGroupMessage.Attachment.Kind.Attachment) { return@mapNotNull null } + val attachmentProto = AttachmentPointer.newBuilder() + attachmentProto.setId(attachment.serverID) + attachmentProto.setContentType(attachment.contentType) + attachmentProto.setSize(attachment.size) + attachmentProto.setFileName(attachment.fileName) + attachmentProto.setFlags(attachment.flags) + attachmentProto.setWidth(attachment.width) + attachmentProto.setHeight(attachment.height) + attachment.caption?.let { attachmentProto.setCaption(it) } + attachmentProto.setUrl(attachment.url) + attachmentProto.build() + } catch (e: Exception) { + Log.e("Loki","Failed to parse attachment as proto",e) + null + } + } + dataMessageProto.addAllAttachments(attachmentProtos) + // Link preview + val linkPreview = message.attachments.firstOrNull { it.kind == OpenGroupMessage.Attachment.Kind.LinkPreview } + if (linkPreview != null) { + val linkPreviewProto = DataMessage.Preview.newBuilder() + linkPreviewProto.setUrl(linkPreview.linkPreviewURL!!) + linkPreviewProto.setTitle(linkPreview.linkPreviewTitle!!) + val attachmentProto = AttachmentPointer.newBuilder() + attachmentProto.setId(linkPreview.serverID) + attachmentProto.setContentType(linkPreview.contentType) + attachmentProto.setSize(linkPreview.size) + attachmentProto.setFileName(linkPreview.fileName) + attachmentProto.setFlags(linkPreview.flags) + attachmentProto.setWidth(linkPreview.width) + attachmentProto.setHeight(linkPreview.height) + linkPreview.caption?.let { attachmentProto.setCaption(it) } + attachmentProto.setUrl(linkPreview.url) + linkPreviewProto.setImage(attachmentProto.build()) + dataMessageProto.addPreview(linkPreviewProto.build()) + } + // Quote + val quote = message.quote + if (quote != null) { + val quoteProto = DataMessage.Quote.newBuilder() + quoteProto.setId(quote.quotedMessageTimestamp) + quoteProto.setAuthor(quote.quoteePublicKey) + if (quote.quotedMessageBody != quote.quotedMessageTimestamp.toString()) { quoteProto.setText(quote.quotedMessageBody) } + dataMessageProto.setQuote(quoteProto.build()) + } + val messageServerID = message.serverID + // Profile + val profileProto = DataMessage.LokiProfile.newBuilder() + profileProto.setDisplayName(message.displayName) + val profilePicture = message.profilePicture + if (profilePicture != null) { + profileProto.setProfilePicture(profilePicture.url) + dataMessageProto.setProfileKey(ByteString.copyFrom(profilePicture.profileKey)) + } + dataMessageProto.setProfile(profileProto.build()) + /* TODO: the signal service proto needs to be synced with iOS + // Open group info + if (messageServerID != null) { + val openGroupProto = PublicChatInfo.newBuilder() + openGroupProto.setServerID(messageServerID) + dataMessageProto.setPublicChatInfo(openGroupProto.build()) + } + */ + // Signal group context + val groupProto = GroupContext.newBuilder() + groupProto.setId(ByteString.copyFrom(id)) + groupProto.setType(GroupContext.Type.DELIVER) + groupProto.setName(openGroup.displayName) + dataMessageProto.setGroup(groupProto.build()) + // Content + val content = Content.newBuilder() + content.setDataMessage(dataMessageProto.build()) + // Envelope + val builder = Envelope.newBuilder() + builder.type = Envelope.Type.UNIDENTIFIED_SENDER + builder.source = senderPublicKey + builder.sourceDevice = 1 + builder.setContent(content.build().toByteString()) + builder.timestamp = message.timestamp + builder.serverTimestamp = message.serverTimestamp + val envelope = builder.build() + val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, messageServerID, openGroup.id) + Log.d("Loki", "Scheduling Job $job") + if (isBackgroundPoll) { + job.executeAsync().always { deferred.resolve(Unit) } + // The promise is just used to keep track of when we're done + } else { + JobQueue.shared.add(job) + } + } catch (e: Exception) { + Log.e("Loki", "Exception parsing message", e) } } isCaughtUp = true isPollOngoing = false + deferred.resolve(Unit) }.fail { Log.d("Loki", "Failed to get messages for group chat with ID: ${openGroup.channel} on server: ${openGroup.server}.") isPollOngoing = false @@ -218,16 +201,16 @@ class OpenGroupPoller(private val openGroup: OpenGroup) { } private fun pollForDisplayNames() { - if (displayNameUpdatees.isEmpty()) { return } - val hexEncodedPublicKeys = displayNameUpdatees - displayNameUpdatees = setOf() + if (displayNameUpdates.isEmpty()) { return } + val hexEncodedPublicKeys = displayNameUpdates + displayNameUpdates = setOf() OpenGroupAPI.getDisplayNames(hexEncodedPublicKeys, openGroup.server).successBackground { mapping -> for (pair in mapping.entries) { val senderDisplayName = "${pair.value} (...${pair.key.takeLast(8)})" MessagingConfiguration.shared.storage.setOpenGroupDisplayName(pair.key, openGroup.channel, openGroup.server, senderDisplayName) } }.fail { - displayNameUpdatees = displayNameUpdatees.union(hexEncodedPublicKeys) + displayNameUpdates = displayNameUpdates.union(hexEncodedPublicKeys) } }