diff --git a/ts/session/messages/outgoing/content/sync/ContactSyncMessage.ts b/ts/session/messages/outgoing/content/sync/ContactSyncMessage.ts new file mode 100644 index 000000000..2c9cdd3a0 --- /dev/null +++ b/ts/session/messages/outgoing/content/sync/ContactSyncMessage.ts @@ -0,0 +1,31 @@ +import { SignalService } from '../../../../../protobuf'; +import { MessageParams } from '../../Message'; +import { SyncMessage } from '../'; +import { PubKey } from '../../../../types'; +import { DataMessage } from '../data'; + + +interface ContactSyncMessageParams extends MessageParams { + // Send to our devices + contacts: Array; + dataMessage?: DataMessage; +} + +export class ContactSyncMessage extends SyncMessage { + private readonly contacts: Array; + private readonly dataMessage?: DataMessage; + + constructor(params: ContactSyncMessageParams) { + super(params); + + // Stubbed for now + this.contacts = params.contacts; + this.dataMessage = params.dataMessage; + + this.syncProto(); + } + + protected syncProto(): SignalService.SyncMessage { + return new SignalService.SyncMessage(); + } +} diff --git a/ts/session/messages/outgoing/content/sync/index.ts b/ts/session/messages/outgoing/content/sync/index.ts index 6da779ff9..d17d62a83 100644 --- a/ts/session/messages/outgoing/content/sync/index.ts +++ b/ts/session/messages/outgoing/content/sync/index.ts @@ -1,3 +1,4 @@ -import * as SyncMessage from './SyncMessage'; +import { SyncMessage } from './SyncMessage'; +import { ContactSyncMessage } from './ContactSyncMessage'; -export { SyncMessage }; +export { ContactSyncMessage, SyncMessage }; diff --git a/ts/session/protocols/SessionProtocol.ts b/ts/session/protocols/SessionProtocol.ts index 79847da82..da0f95129 100644 --- a/ts/session/protocols/SessionProtocol.ts +++ b/ts/session/protocols/SessionProtocol.ts @@ -3,7 +3,7 @@ import { SessionRequestMessage } from '../messages/outgoing'; import { createOrUpdateItem, getItemById } from '../../../js/modules/data'; import { libloki, libsignal, textsecure } from '../../window'; import { MessageSender } from '../sending'; -import * as MessageUtils from '../utils'; +import { MessageUtils } from '../utils'; import { PubKey } from '../types'; interface StringToNumberMap { diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index 7077a2921..7b029a933 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -1,54 +1,167 @@ +import * as _ from 'lodash'; +import * as Data from '../../../js/modules/data'; +import { ConversationController } from '../../window'; + import { EventEmitter } from 'events'; import { MessageQueueInterface, MessageQueueInterfaceEvents, } from './MessageQueueInterface'; -import { ContentMessage, OpenGroupMessage } from '../messages/outgoing'; +import { + ClosedGroupMessage, + ContentMessage, + OpenGroupMessage, + SessionRequestMessage, +} from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; -import { JobQueue, TypedEventEmitter } from '../utils'; +import { + JobQueue, + SyncMessageUtils, + TypedEventEmitter, +} from '../utils'; +import { PubKey } from '../types'; +import { MessageSender } from '.'; +import { SessionProtocol } from '../protocols'; export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter; - private readonly jobQueues: Map = new Map(); - private readonly cache: PendingMessageCache; + private readonly jobQueues: Map = new Map(); + private readonly pendingMessageCache: PendingMessageCache; constructor() { this.events = new EventEmitter(); - this.cache = new PendingMessageCache(); - this.processAllPending(); + this.pendingMessageCache = new PendingMessageCache(); + void this.processAllPending(); } - public sendUsingMultiDevice(user: string, message: ContentMessage) { - throw new Error('Method not implemented.'); + public async sendUsingMultiDevice(user: PubKey, message: ContentMessage) { + const userLinked = await Data.getPairedDevicesFor(user.key); + const userDevices = userLinked.map(d => new PubKey(d)); + + await this.sendMessageToDevices(userDevices, message); } - public send(device: string, message: ContentMessage) { - throw new Error('Method not implemented.'); + + public async send(device: PubKey, message: ContentMessage) { + await this.sendMessageToDevices([device], message); } - public sendToGroup(message: ContentMessage | OpenGroupMessage) { - throw new Error('Method not implemented.'); + + public async sendMessageToDevices( + devices: Array, + message: ContentMessage + ) { + let currentDevices = [...devices]; + + // Sync to our devices if syncable + if (SyncMessageUtils.canSync(message)) { + + const ourDevices = await SyncMessageUtils.getOurPairedDevices(); + await this.sendSyncMessage(message, ourDevices); + + // Remove our devices from currentDevices + const ourDeviceContacts = ourDevices.map(device => ConversationController.get(device.key)); + currentDevices = _.xor(currentDevices, ourDeviceContacts); + } + + const promises = currentDevices.map(async device => { + await this.queue(device, message); + }); + + return Promise.all(promises); } - public sendSyncMessage(message: ContentMessage) { - throw new Error('Method not implemented.'); + + public async sendToGroup(message: OpenGroupMessage | ContentMessage): Promise { + if ( + !(message instanceof OpenGroupMessage) && + !(message instanceof ClosedGroupMessage) + ) { + return false; + } + + // Closed groups + if (message instanceof ClosedGroupMessage) { + // Get devices in closed group + const conversation = ConversationController.get(message.groupId); + const recipientsModels = conversation.contactCollection.models; + const recipients: Array = recipientsModels.map( + (recipient: any) => new PubKey(recipient.id) + ); + + await this.sendMessageToDevices(recipients, message); + + return true; + } + + // Open groups + if (message instanceof OpenGroupMessage) { + // No queue needed for Open Groups; send directly + await MessageSender.sendToOpenGroup(message); + + return true; + } + + return false; } - public processPending(device: string) { - // TODO: implement + public async sendSyncMessage( + message: ContentMessage, + sendTo: Array + ) { + // Sync with our devices + const promises = sendTo.map(async device => { + const syncMessage = await SyncMessageUtils.from(message, device); + + return this.queue(device, syncMessage); + }); + + return Promise.all(promises); } - private processAllPending() { - // TODO: Get all devices which are pending here + public async processPending(device: PubKey) { + const messages = this.pendingMessageCache.getForDevice(device); + + const hasSession = SessionProtocol.hasSession(device); + const conversation = ConversationController.get(device.key); + const isMediumGroup = conversation.isMediumGroup(); + + if (!isMediumGroup && !hasSession) { + await SessionProtocol.sendSessionRequestIfNeeded(device); + + return; + } + + const jobQueue = this.getJobQueue(device); + messages.forEach(message => { + if (!jobQueue.has(message.identifier)) { + const promise = jobQueue.add(async () => MessageSender.send(message)); + + promise + .then(() => { + // Message sent; remove from cache + void this.pendingMessageCache.remove(message); + }) + // Message failed to send + .catch(() => null); + } + }); } - private queue(device: string, message: ContentMessage) { - // TODO: implement + private async processAllPending() { + const devices = this.pendingMessageCache.getDevices(); + const promises = devices.map(async device => this.processPending(device)); + + return Promise.all(promises); } - private queueOpenGroupMessage(message: OpenGroupMessage) { - // TODO: Do we need to queue open group messages? - // If so we can get open group job queue and add the send job here + private async queue(device: PubKey, message: ContentMessage) { + if (message instanceof SessionRequestMessage) { + return; + } + + await this.pendingMessageCache.add(device, message); + await this.processPending(device); } - private getJobQueue(device: string): JobQueue { + private getJobQueue(device: PubKey): JobQueue { let queue = this.jobQueues.get(device); if (!queue) { queue = new JobQueue(); @@ -57,4 +170,5 @@ export class MessageQueue implements MessageQueueInterface { return queue; } + } diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index b160ce365..b4b374569 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -5,6 +5,7 @@ import { } from '../messages/outgoing'; import { RawMessage } from '../types/RawMessage'; import { TypedEventEmitter } from '../utils'; +import { PubKey } from '../types'; type GroupMessageType = OpenGroupMessage | ClosedGroupMessage; @@ -15,8 +16,8 @@ export interface MessageQueueInterfaceEvents { export interface MessageQueueInterface { events: TypedEventEmitter; - sendUsingMultiDevice(user: string, message: ContentMessage): void; - send(device: string, message: ContentMessage): void; + sendUsingMultiDevice(user: PubKey, message: ContentMessage): void; + send(device: PubKey, message: ContentMessage): void; sendToGroup(message: GroupMessageType): void; - sendSyncMessage(message: ContentMessage): void; + sendSyncMessage(message: ContentMessage, sendTo: Array): Promise>; } diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index 30bfe35d6..074ee8174 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -2,7 +2,7 @@ import { createOrUpdateItem, getItemById } from '../../../js/modules/data'; import { PartialRawMessage, RawMessage } from '../types/RawMessage'; import { ContentMessage } from '../messages/outgoing'; import { PubKey } from '../types'; -import * as MessageUtils from '../utils'; +import { MessageUtils } from '../utils'; // This is an abstraction for storing pending messages. // Ideally we want to store pending messages in the database so that diff --git a/ts/session/types/PubKey.ts b/ts/session/types/PubKey.ts index 5653db5b5..fc49b5472 100644 --- a/ts/session/types/PubKey.ts +++ b/ts/session/types/PubKey.ts @@ -1,7 +1,7 @@ export class PubKey { public static readonly PUBKEY_LEN = 66; - private static readonly regex: string = `^05[0-9a-fA-F]{${PubKey.PUBKEY_LEN - - 2}}$`; + private static readonly regex: RegExp = new RegExp(`^05[0-9a-fA-F]{${PubKey.PUBKEY_LEN - + 2}}$`); public readonly key: string; constructor(pubkeyString: string) { @@ -19,7 +19,7 @@ export class PubKey { } public static validate(pubkeyString: string): boolean { - if (pubkeyString.match(PubKey.regex)) { + if (this.regex.test(pubkeyString)) { return true; } diff --git a/ts/session/utils/SyncMessageUtils.ts b/ts/session/utils/SyncMessageUtils.ts new file mode 100644 index 000000000..6052a69cc --- /dev/null +++ b/ts/session/utils/SyncMessageUtils.ts @@ -0,0 +1,80 @@ +import { + ContactSyncMessage, + ContentMessage, + SyncMessageEnum, +} from '../messages/outgoing'; +import { PubKey } from '../types'; + +import * as _ from 'lodash'; +import * as Data from '../../../js/modules/data'; +import { ConversationController, textsecure, Whisper } from '../../window'; +import { OpenGroup } from '../types/OpenGroup'; + + +export async function from( + message: ContentMessage, + sendTo: PubKey | OpenGroup, + syncType: SyncMessageEnum.CONTACTS | SyncMessageEnum.GROUPS = SyncMessageEnum.CONTACTS +): Promise { + const { timestamp, identifier } = message; + + // Stubbed for now + return new ContactSyncMessage({ + timestamp, + identifier, + contacts: [], + }); +} + +export async function canSync(message: ContentMessage): Promise { + // This function should be agnostic to the device; it shouldn't need + // to know about the recipient + // return Boolean(from(message, device)); + // Stubbed for now + return true; +} + +export async function getSyncContacts(): Promise> { + const thisDevice = textsecure.storage.user.getNumber(); + const primaryDevice = await Data.getPrimaryDeviceFor(thisDevice); + const conversations = await Data.getAllConversations({ ConversationCollection: Whisper.ConversationCollection }); + + // We are building a set of all contacts + const primaryContacts = conversations.filter(c => + c.isPrivate() && + !c.isOurLocalDevice() && + c.isFriend() && + !c.attributes.secondaryStatus + ) || []; + + const secondaryContactsPartial = conversations.filter(c => + c.isPrivate() && + !c.isOurLocalDevice() && + c.isFriend() && + c.attributes.secondaryStatus + ); + + const seondaryContactsPromise = secondaryContactsPartial.map(async c => + ConversationController.getOrCreateAndWait( + c.getPrimaryDevicePubKey(), + 'private' + ) + ); + + const secondaryContacts = (await Promise.all(seondaryContactsPromise)) + // Filter out our primary key if it was added here + .filter(c => c.id !== primaryDevice); + + // Return unique contacts + return _.uniqBy([ + ...primaryContacts, + ...secondaryContacts, + ], device => !!device); +} + +export async function getOurPairedDevices(): Promise> { + const ourPubKey = textsecure.storage.user.getNumber(); + const ourDevices = await Data.getPairedDevicesFor(ourPubKey); + + return ourDevices.map(device => new PubKey(device)); +} diff --git a/ts/session/utils/index.ts b/ts/session/utils/index.ts index 96904245d..b5f9dd93e 100644 --- a/ts/session/utils/index.ts +++ b/ts/session/utils/index.ts @@ -1,3 +1,7 @@ +import * as MessageUtils from './Messages'; +import * as SyncMessageUtils from './SyncMessageUtils'; + export * from './TypedEmitter'; export * from './JobQueue'; -export * from './Messages'; + +export { MessageUtils, SyncMessageUtils };