fix: open group fixes for new message receive pipeline

pull/486/head
jubb 4 years ago
parent d7c03c9d0a
commit db553544ec

@ -191,16 +191,16 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
// Set application UI mode (day/night theme) to the user selected one. // Set application UI mode (day/night theme) to the user selected one.
UiModeUtilities.setupUiModeToUserSelected(this); UiModeUtilities.setupUiModeToUserSelected(this);
// ======== // ========
initializeJobManager();
initializeExpiringMessageManager(); initializeExpiringMessageManager();
initializeTypingStatusRepository(); initializeTypingStatusRepository();
initializeTypingStatusSender(); initializeTypingStatusSender();
initializeReadReceiptManager(); initializeReadReceiptManager();
initializeProfileManager(); initializeProfileManager();
initializePeriodicTasks(); initializePeriodicTasks();
SSKEnvironment.Companion.configure(getTypingStatusRepository(), getReadReceiptManager(), getProfileManager(), messageNotifier, getExpiringMessageManager());
initializeJobManager();
initializeWebRtc(); initializeWebRtc();
initializeBlobProvider(); initializeBlobProvider();
SSKEnvironment.Companion.configure(getTypingStatusRepository(), getReadReceiptManager(), getProfileManager(), messageNotifier, getExpiringMessageManager());
} }
@Override @Override

@ -94,6 +94,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
override fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, groupPublicKey: String?, openGroupID: String?, attachments: List<Attachment>): Long? { override fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, groupPublicKey: String?, openGroupID: String?, attachments: List<Attachment>): Long? {
var messageID: Long? = null var messageID: Long? = null
val senderAddress = Address.fromSerialized(message.sender!!) val senderAddress = Address.fromSerialized(message.sender!!)
val isUserSender = message.sender!! == getUserPublicKey()
val group: Optional<SignalServiceGroup> = when { val group: Optional<SignalServiceGroup> = when {
openGroupID != null -> Optional.of(SignalServiceGroup(openGroupID.toByteArray(), SignalServiceGroup.GroupType.PUBLIC_CHAT)) openGroupID != null -> Optional.of(SignalServiceGroup(openGroupID.toByteArray(), SignalServiceGroup.GroupType.PUBLIC_CHAT))
groupPublicKey != null -> { groupPublicKey != null -> {
@ -105,7 +106,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
val pointerAttachments = attachments.mapNotNull { val pointerAttachments = attachments.mapNotNull {
it.toSignalAttachment() it.toSignalAttachment()
} }
val targetAddress = if (senderAddress.serialize() == getUserPublicKey() && message.syncTarget != null) { val targetAddress = if (isUserSender && !message.syncTarget.isNullOrEmpty()) {
Address.fromSerialized(message.syncTarget!!) Address.fromSerialized(message.syncTarget!!)
} else if (group.isPresent) { } else if (group.isPresent) {
Address.fromSerialized(GroupUtil.getEncodedId(group.get())) Address.fromSerialized(GroupUtil.getEncodedId(group.get()))
@ -130,7 +131,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
} }
val mediaMessage = IncomingMediaMessage.from(message, senderAddress, targetRecipient.expireMessages * 1000L, group, signalServiceAttachments, quote, linkPreviews) val mediaMessage = IncomingMediaMessage.from(message, senderAddress, targetRecipient.expireMessages * 1000L, group, signalServiceAttachments, quote, linkPreviews)
mmsDatabase.beginTransaction() mmsDatabase.beginTransaction()
mmsDatabase.insertSecureDecryptedMessageInbox(mediaMessage, message.threadID ?: -1, message.sentTimestamp ?: 0) mmsDatabase.insertSecureDecryptedMessageInbox(mediaMessage, message.threadID ?: -1, message.receivedTimestamp ?: 0)
} }
if (insertResult.isPresent) { if (insertResult.isPresent) {
mmsDatabase.setTransactionSuccessful() mmsDatabase.setTransactionSuccessful()
@ -145,7 +146,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
} else { } else {
val textMessage = IncomingTextMessage.from(message, senderAddress, group, targetRecipient.expireMessages * 1000L) val textMessage = IncomingTextMessage.from(message, senderAddress, group, targetRecipient.expireMessages * 1000L)
val encrypted = IncomingEncryptedMessage(textMessage, textMessage.messageBody) val encrypted = IncomingEncryptedMessage(textMessage, textMessage.messageBody)
smsDatabase.insertMessageInbox(encrypted, message.sentTimestamp ?: 0) smsDatabase.insertMessageInbox(encrypted, message.receivedTimestamp ?: 0)
} }
insertResult.orNull()?.let { result -> insertResult.orNull()?.let { result ->
messageID = result.messageId messageID = result.messageId
@ -474,7 +475,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
override fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long { override fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long {
val database = DatabaseFactory.getThreadDatabase(context) val database = DatabaseFactory.getThreadDatabase(context)
if (!openGroupID.isNullOrEmpty()) { if (!openGroupID.isNullOrEmpty()) {
val recipient = Recipient.from(context, Address.fromSerialized(openGroupID), false) val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())), false)
return database.getOrCreateThreadIdFor(recipient) return database.getOrCreateThreadIdFor(recipient)
} else if (!groupPublicKey.isNullOrEmpty()) { } else if (!groupPublicKey.isNullOrEmpty()) {
val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false) val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false)

@ -17,12 +17,14 @@ import org.thoughtcrime.securesms.database.DatabaseContentProviders
import org.thoughtcrime.securesms.database.DatabaseFactory import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.groups.GroupManager import org.thoughtcrime.securesms.groups.GroupManager
import org.thoughtcrime.securesms.util.BitmapUtil import org.thoughtcrime.securesms.util.BitmapUtil
import java.util.concurrent.Executors
class PublicChatManager(private val context: Context) { class PublicChatManager(private val context: Context) {
private var chats = mutableMapOf<Long, OpenGroup>() private var chats = mutableMapOf<Long, OpenGroup>()
private val pollers = mutableMapOf<Long, OpenGroupPoller>() private val pollers = mutableMapOf<Long, OpenGroupPoller>()
private val observers = mutableMapOf<Long, ContentObserver>() private val observers = mutableMapOf<Long, ContentObserver>()
private var isPolling = false private var isPolling = false
private val executorService = Executors.newScheduledThreadPool(16)
public fun areAllCaughtUp(): Boolean { public fun areAllCaughtUp(): Boolean {
var areAllCaughtUp = true var areAllCaughtUp = true
@ -37,7 +39,7 @@ class PublicChatManager(private val context: Context) {
public fun markAllAsNotCaughtUp() { public fun markAllAsNotCaughtUp() {
refreshChatsAndPollers() refreshChatsAndPollers()
for ((threadID, chat) in chats) { for ((threadID, chat) in chats) {
val poller = pollers[threadID] ?: OpenGroupPoller(chat) val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService)
poller.isCaughtUp = false poller.isCaughtUp = false
} }
} }
@ -46,7 +48,7 @@ class PublicChatManager(private val context: Context) {
refreshChatsAndPollers() refreshChatsAndPollers()
for ((threadId, chat) in chats) { for ((threadId, chat) in chats) {
val poller = pollers[threadId] ?: OpenGroupPoller(chat) val poller = pollers[threadId] ?: OpenGroupPoller(chat, executorService)
poller.startIfNeeded() poller.startIfNeeded()
listenToThreadDeletion(threadId) listenToThreadDeletion(threadId)
if (!pollers.containsKey(threadId)) { pollers[threadId] = poller } if (!pollers.containsKey(threadId)) { pollers[threadId] = poller }
@ -57,6 +59,7 @@ class PublicChatManager(private val context: Context) {
public fun stopPollers() { public fun stopPollers() {
pollers.values.forEach { it.stop() } pollers.values.forEach { it.stop() }
isPolling = false isPolling = false
executorService.shutdown()
} }
//TODO Declare a specific type of checked exception instead of "Exception". //TODO Declare a specific type of checked exception instead of "Exception".

@ -126,6 +126,5 @@ object MultiDeviceProtocol {
threadDatabase.notifyUpdatedFromConfig() threadDatabase.notifyUpdatedFromConfig()
} }
} }
// TODO: handle new configuration message fields or handle in new pipeline
} }
} }

@ -62,7 +62,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
// DECRYPTION // DECRYPTION
// Assume we're retrieving an attachment for an open group server if the digest is not set // Assume we're retrieving an attachment for an open group server if the digest is not set
val stream = if (attachment.digest == null || attachment.key == null) FileInputStream(tempFile) val stream = if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile)
else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest) else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest)
messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream) messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream)

@ -7,6 +7,7 @@ import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.service.internal.push.PushTransportDetails import org.session.libsignal.service.internal.push.PushTransportDetails
import org.session.libsignal.service.internal.push.SignalServiceProtos import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.logging.Log
object MessageReceiver { object MessageReceiver {
@ -121,7 +122,8 @@ object MessageReceiver {
message.sender = sender message.sender = sender
message.recipient = userPublicKey message.recipient = userPublicKey
message.sentTimestamp = envelope.timestamp message.sentTimestamp = envelope.timestamp
message.receivedTimestamp = System.currentTimeMillis() message.receivedTimestamp = if (envelope.hasServerTimestamp()) envelope.serverTimestamp else System.currentTimeMillis()
Log.d("Loki", "time: ${envelope.timestamp}, sent: ${envelope.serverTimestamp}")
message.groupPublicKey = groupPublicKey message.groupPublicKey = groupPublicKey
message.openGroupServerMessageID = openGroupServerID message.openGroupServerMessageID = openGroupServerID
// Validate // Validate

@ -1,8 +1,6 @@
package org.session.libsession.messaging.sending_receiving.pollers package org.session.libsession.messaging.sending_receiving.pollers
import android.os.Handler
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.MessagingConfiguration import org.session.libsession.messaging.MessagingConfiguration
@ -11,61 +9,37 @@ import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.opengroups.OpenGroup import org.session.libsession.messaging.opengroups.OpenGroup
import org.session.libsession.messaging.opengroups.OpenGroupAPI import org.session.libsession.messaging.opengroups.OpenGroupAPI
import org.session.libsession.messaging.opengroups.OpenGroupMessage import org.session.libsession.messaging.opengroups.OpenGroupMessage
import org.session.libsignal.utilities.successBackground
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos.* import org.session.libsignal.service.internal.push.SignalServiceProtos.*
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.successBackground
import java.util.* import java.util.*
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
class OpenGroupPoller(private val openGroup: OpenGroup, private val executorService: ScheduledExecutorService? = null) {
class OpenGroupPoller(private val openGroup: OpenGroup) {
private val handler by lazy { Handler() }
private var hasStarted = false private var hasStarted = false
private var isPollOngoing = false @Volatile private var isPollOngoing = false
public var isCaughtUp = false var isCaughtUp = false
private val cancellableFutures = mutableListOf<ScheduledFuture<out Any>>()
// region Convenience // region Convenience
private val userHexEncodedPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: "" private val userHexEncodedPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: ""
private var displayNameUpdatees = setOf<String>() private var displayNameUpdates = setOf<String>()
// endregion // endregion
// region Tasks // region Tasks
private val pollForNewMessagesTask = object : Runnable { private val pollForNewMessagesTask = Runnable { pollForNewMessages() }
private val pollForDeletedMessagesTask = Runnable { pollForDeletedMessages() }
override fun run() { private val pollForModeratorsTask = Runnable { pollForModerators() }
pollForNewMessages() private val pollForDisplayNamesTask = Runnable { pollForDisplayNames() }
handler.postDelayed(this, pollForNewMessagesInterval)
}
}
private val pollForDeletedMessagesTask = object : Runnable {
override fun run() {
pollForDeletedMessages()
handler.postDelayed(this, pollForDeletedMessagesInterval)
}
}
private val pollForModeratorsTask = object : Runnable {
override fun run() {
pollForModerators()
handler.postDelayed(this, pollForModeratorsInterval)
}
}
private val pollForDisplayNamesTask = object : Runnable {
override fun run() {
pollForDisplayNames()
handler.postDelayed(this, pollForDisplayNamesInterval)
}
}
// endregion // endregion
// region Settings // region Settings
companion object { companion object {
private val pollForNewMessagesInterval: Long = 4 * 1000 private val pollForNewMessagesInterval: Long = 10 * 1000
private val pollForDeletedMessagesInterval: Long = 60 * 1000 private val pollForDeletedMessagesInterval: Long = 60 * 1000
private val pollForModeratorsInterval: Long = 10 * 60 * 1000 private val pollForModeratorsInterval: Long = 10 * 60 * 1000
private val pollForDisplayNamesInterval: Long = 60 * 1000 private val pollForDisplayNamesInterval: Long = 60 * 1000
@ -74,19 +48,21 @@ class OpenGroupPoller(private val openGroup: OpenGroup) {
// region Lifecycle // region Lifecycle
fun startIfNeeded() { fun startIfNeeded() {
if (hasStarted) return if (hasStarted || executorService == null) return
pollForNewMessagesTask.run() cancellableFutures += listOf(
pollForDeletedMessagesTask.run() executorService.scheduleAtFixedRate(pollForNewMessagesTask,0, pollForNewMessagesInterval, TimeUnit.MILLISECONDS),
pollForModeratorsTask.run() executorService.scheduleAtFixedRate(pollForDeletedMessagesTask,0, pollForDeletedMessagesInterval, TimeUnit.MILLISECONDS),
pollForDisplayNamesTask.run() executorService.scheduleAtFixedRate(pollForModeratorsTask,0, pollForModeratorsInterval, TimeUnit.MILLISECONDS),
executorService.scheduleAtFixedRate(pollForDisplayNamesTask,0, pollForDisplayNamesInterval, TimeUnit.MILLISECONDS)
)
hasStarted = true hasStarted = true
} }
fun stop() { fun stop() {
handler.removeCallbacks(pollForNewMessagesTask) cancellableFutures.forEach { future ->
handler.removeCallbacks(pollForDeletedMessagesTask) future.cancel(false)
handler.removeCallbacks(pollForModeratorsTask) }
handler.removeCallbacks(pollForDisplayNamesTask) cancellableFutures.clear()
hasStarted = false hasStarted = false
} }
// endregion // endregion
@ -96,120 +72,127 @@ class OpenGroupPoller(private val openGroup: OpenGroup) {
return pollForNewMessages(false) return pollForNewMessages(false)
} }
fun pollForNewMessages(isBackgroundPoll: Boolean): Promise<Unit, Exception> { private fun pollForNewMessages(isBackgroundPoll: Boolean): Promise<Unit, Exception> {
if (isPollOngoing) { return Promise.of(Unit) } if (isPollOngoing) { return Promise.of(Unit) }
isPollOngoing = true isPollOngoing = true
val deferred = deferred<Unit, Exception>() val deferred = deferred<Unit, Exception>()
// Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below // Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below
OpenGroupAPI.getMessages(openGroup.channel, openGroup.server).successBackground { messages -> OpenGroupAPI.getMessages(openGroup.channel, openGroup.server).successBackground { messages ->
// Process messages in the background // Process messages in the background
Log.d("Loki", "received ${messages.size} messages")
messages.forEach { message -> messages.forEach { message ->
val senderPublicKey = message.senderPublicKey try {
val wasSentByCurrentUser = (senderPublicKey == userHexEncodedPublicKey) val senderPublicKey = message.senderPublicKey
fun generateDisplayName(rawDisplayName: String): String { fun generateDisplayName(rawDisplayName: String): String {
return "${rawDisplayName} (${senderPublicKey.takeLast(8)})" return "${rawDisplayName} (${senderPublicKey.takeLast(8)})"
} }
val senderDisplayName = MessagingConfiguration.shared.storage.getOpenGroupDisplayName(senderPublicKey, openGroup.channel, openGroup.server) ?: generateDisplayName("Anonymous") val senderDisplayName = MessagingConfiguration.shared.storage.getOpenGroupDisplayName(senderPublicKey, openGroup.channel, openGroup.server) ?: generateDisplayName("Anonymous")
val id = openGroup.id.toByteArray() val id = openGroup.id.toByteArray()
// Main message // Main message
val dataMessageProto = DataMessage.newBuilder() val dataMessageProto = DataMessage.newBuilder()
val body = if (message.body == message.timestamp.toString()) { "" } else { message.body } val body = if (message.body == message.timestamp.toString()) { "" } else { message.body }
dataMessageProto.setBody(body) dataMessageProto.setBody(body)
dataMessageProto.setTimestamp(message.timestamp) dataMessageProto.setTimestamp(message.timestamp)
// Attachments // Attachments
val attachmentProtos = message.attachments.mapNotNull { attachment -> val attachmentProtos = message.attachments.mapNotNull { attachment ->
if (attachment.kind != OpenGroupMessage.Attachment.Kind.Attachment) { return@mapNotNull null } try {
val attachmentProto = AttachmentPointer.newBuilder() if (attachment.kind != OpenGroupMessage.Attachment.Kind.Attachment) { return@mapNotNull null }
attachmentProto.setId(attachment.serverID) val attachmentProto = AttachmentPointer.newBuilder()
attachmentProto.setContentType(attachment.contentType) attachmentProto.setId(attachment.serverID)
attachmentProto.setSize(attachment.size) attachmentProto.setContentType(attachment.contentType)
attachmentProto.setFileName(attachment.fileName) attachmentProto.setSize(attachment.size)
attachmentProto.setFlags(attachment.flags) attachmentProto.setFileName(attachment.fileName)
attachmentProto.setWidth(attachment.width) attachmentProto.setFlags(attachment.flags)
attachmentProto.setHeight(attachment.height) attachmentProto.setWidth(attachment.width)
attachment.caption.let { attachmentProto.setCaption(it) } attachmentProto.setHeight(attachment.height)
attachmentProto.setUrl(attachment.url) attachment.caption?.let { attachmentProto.setCaption(it) }
attachmentProto.build() attachmentProto.setUrl(attachment.url)
} attachmentProto.build()
dataMessageProto.addAllAttachments(attachmentProtos) } catch (e: Exception) {
// Link preview Log.e("Loki","Failed to parse attachment as proto",e)
val linkPreview = message.attachments.firstOrNull { it.kind == OpenGroupMessage.Attachment.Kind.LinkPreview } null
if (linkPreview != null) { }
val linkPreviewProto = DataMessage.Preview.newBuilder() }
linkPreviewProto.setUrl(linkPreview.linkPreviewURL!!) dataMessageProto.addAllAttachments(attachmentProtos)
linkPreviewProto.setTitle(linkPreview.linkPreviewTitle!!) // Link preview
val attachmentProto = AttachmentPointer.newBuilder() val linkPreview = message.attachments.firstOrNull { it.kind == OpenGroupMessage.Attachment.Kind.LinkPreview }
attachmentProto.setId(linkPreview.serverID) if (linkPreview != null) {
attachmentProto.setContentType(linkPreview.contentType) val linkPreviewProto = DataMessage.Preview.newBuilder()
attachmentProto.setSize(linkPreview.size) linkPreviewProto.setUrl(linkPreview.linkPreviewURL!!)
attachmentProto.setFileName(linkPreview.fileName) linkPreviewProto.setTitle(linkPreview.linkPreviewTitle!!)
attachmentProto.setFlags(linkPreview.flags) val attachmentProto = AttachmentPointer.newBuilder()
attachmentProto.setWidth(linkPreview.width) attachmentProto.setId(linkPreview.serverID)
attachmentProto.setHeight(linkPreview.height) attachmentProto.setContentType(linkPreview.contentType)
linkPreview.caption.let { attachmentProto.setCaption(it) } attachmentProto.setSize(linkPreview.size)
attachmentProto.setUrl(linkPreview.url) attachmentProto.setFileName(linkPreview.fileName)
linkPreviewProto.setImage(attachmentProto.build()) attachmentProto.setFlags(linkPreview.flags)
dataMessageProto.addPreview(linkPreviewProto.build()) attachmentProto.setWidth(linkPreview.width)
} attachmentProto.setHeight(linkPreview.height)
// Quote linkPreview.caption?.let { attachmentProto.setCaption(it) }
val quote = message.quote attachmentProto.setUrl(linkPreview.url)
if (quote != null) { linkPreviewProto.setImage(attachmentProto.build())
val quoteProto = DataMessage.Quote.newBuilder() dataMessageProto.addPreview(linkPreviewProto.build())
quoteProto.setId(quote.quotedMessageTimestamp) }
quoteProto.setAuthor(quote.quoteePublicKey) // Quote
if (quote.quotedMessageBody != quote.quotedMessageTimestamp.toString()) { quoteProto.setText(quote.quotedMessageBody) } val quote = message.quote
dataMessageProto.setQuote(quoteProto.build()) if (quote != null) {
} val quoteProto = DataMessage.Quote.newBuilder()
val messageServerID = message.serverID quoteProto.setId(quote.quotedMessageTimestamp)
// Profile quoteProto.setAuthor(quote.quoteePublicKey)
val profileProto = DataMessage.LokiProfile.newBuilder() if (quote.quotedMessageBody != quote.quotedMessageTimestamp.toString()) { quoteProto.setText(quote.quotedMessageBody) }
profileProto.setDisplayName(message.displayName) dataMessageProto.setQuote(quoteProto.build())
val profilePicture = message.profilePicture }
if (profilePicture != null) { val messageServerID = message.serverID
profileProto.setProfilePicture(profilePicture.url) // Profile
dataMessageProto.setProfileKey(ByteString.copyFrom(profilePicture.profileKey)) val profileProto = DataMessage.LokiProfile.newBuilder()
} profileProto.setDisplayName(message.displayName)
dataMessageProto.setProfile(profileProto.build()) val profilePicture = message.profilePicture
/* TODO: the signal service proto needs to be synced with iOS if (profilePicture != null) {
// Open group info profileProto.setProfilePicture(profilePicture.url)
if (messageServerID != null) { dataMessageProto.setProfileKey(ByteString.copyFrom(profilePicture.profileKey))
val openGroupProto = PublicChatInfo.newBuilder() }
openGroupProto.setServerID(messageServerID) dataMessageProto.setProfile(profileProto.build())
dataMessageProto.setPublicChatInfo(openGroupProto.build()) /* TODO: the signal service proto needs to be synced with iOS
} // Open group info
*/ if (messageServerID != null) {
// Signal group context val openGroupProto = PublicChatInfo.newBuilder()
val groupProto = GroupContext.newBuilder() openGroupProto.setServerID(messageServerID)
groupProto.setId(ByteString.copyFrom(id)) dataMessageProto.setPublicChatInfo(openGroupProto.build())
groupProto.setType(GroupContext.Type.DELIVER) }
groupProto.setName(openGroup.displayName) */
dataMessageProto.setGroup(groupProto.build()) // Signal group context
// Sync target val groupProto = GroupContext.newBuilder()
if (wasSentByCurrentUser) { groupProto.setId(ByteString.copyFrom(id))
dataMessageProto.setSyncTarget(openGroup.id) groupProto.setType(GroupContext.Type.DELIVER)
} groupProto.setName(openGroup.displayName)
// Content dataMessageProto.setGroup(groupProto.build())
val content = Content.newBuilder() // Content
content.setDataMessage(dataMessageProto.build()) val content = Content.newBuilder()
// Envelope content.setDataMessage(dataMessageProto.build())
val builder = Envelope.newBuilder() // Envelope
builder.type = Envelope.Type.UNIDENTIFIED_SENDER val builder = Envelope.newBuilder()
builder.source = senderPublicKey builder.type = Envelope.Type.UNIDENTIFIED_SENDER
builder.sourceDevice = 1 builder.source = senderPublicKey
builder.setContent(content.build().toByteString()) builder.sourceDevice = 1
builder.serverTimestamp = message.serverTimestamp builder.setContent(content.build().toByteString())
val envelope = builder.build() builder.timestamp = message.timestamp
val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, messageServerID, openGroup.id) builder.serverTimestamp = message.serverTimestamp
if (isBackgroundPoll) { val envelope = builder.build()
job.executeAsync().success { deferred.resolve(Unit) }.fail { deferred.resolve(Unit) } val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, messageServerID, openGroup.id)
// The promise is just used to keep track of when we're done Log.d("Loki", "Scheduling Job $job")
} else { if (isBackgroundPoll) {
JobQueue.shared.add(job) job.executeAsync().always { deferred.resolve(Unit) }
deferred.resolve(Unit) // The promise is just used to keep track of when we're done
} else {
JobQueue.shared.add(job)
}
} catch (e: Exception) {
Log.e("Loki", "Exception parsing message", e)
} }
} }
isCaughtUp = true isCaughtUp = true
isPollOngoing = false isPollOngoing = false
deferred.resolve(Unit)
}.fail { }.fail {
Log.d("Loki", "Failed to get messages for group chat with ID: ${openGroup.channel} on server: ${openGroup.server}.") Log.d("Loki", "Failed to get messages for group chat with ID: ${openGroup.channel} on server: ${openGroup.server}.")
isPollOngoing = false isPollOngoing = false
@ -218,16 +201,16 @@ class OpenGroupPoller(private val openGroup: OpenGroup) {
} }
private fun pollForDisplayNames() { private fun pollForDisplayNames() {
if (displayNameUpdatees.isEmpty()) { return } if (displayNameUpdates.isEmpty()) { return }
val hexEncodedPublicKeys = displayNameUpdatees val hexEncodedPublicKeys = displayNameUpdates
displayNameUpdatees = setOf() displayNameUpdates = setOf()
OpenGroupAPI.getDisplayNames(hexEncodedPublicKeys, openGroup.server).successBackground { mapping -> OpenGroupAPI.getDisplayNames(hexEncodedPublicKeys, openGroup.server).successBackground { mapping ->
for (pair in mapping.entries) { for (pair in mapping.entries) {
val senderDisplayName = "${pair.value} (...${pair.key.takeLast(8)})" val senderDisplayName = "${pair.value} (...${pair.key.takeLast(8)})"
MessagingConfiguration.shared.storage.setOpenGroupDisplayName(pair.key, openGroup.channel, openGroup.server, senderDisplayName) MessagingConfiguration.shared.storage.setOpenGroupDisplayName(pair.key, openGroup.channel, openGroup.server, senderDisplayName)
} }
}.fail { }.fail {
displayNameUpdatees = displayNameUpdatees.union(hexEncodedPublicKeys) displayNameUpdates = displayNameUpdates.union(hexEncodedPublicKeys)
} }
} }

Loading…
Cancel
Save