diff --git a/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts b/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts index a1e21dd2e..ccdb8f121 100644 --- a/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts +++ b/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts @@ -8,7 +8,7 @@ interface ClosedGroupMessageParams extends MessageParams { } export abstract class ClosedGroupMessage extends DataMessage { - protected readonly groupId: string; + public readonly groupId: string; constructor(params: ClosedGroupMessageParams) { super({ diff --git a/ts/session/messages/outgoing/content/sync/index.ts b/ts/session/messages/outgoing/content/sync/index.ts index 6da779ff9..53cb667a4 100644 --- a/ts/session/messages/outgoing/content/sync/index.ts +++ b/ts/session/messages/outgoing/content/sync/index.ts @@ -1,3 +1 @@ -import * as SyncMessage from './SyncMessage'; - -export { SyncMessage }; +export * from './SyncMessage'; diff --git a/ts/session/protocols/SessionProtocol.ts b/ts/session/protocols/SessionProtocol.ts index 79847da82..da0f95129 100644 --- a/ts/session/protocols/SessionProtocol.ts +++ b/ts/session/protocols/SessionProtocol.ts @@ -3,7 +3,7 @@ import { SessionRequestMessage } from '../messages/outgoing'; import { createOrUpdateItem, getItemById } from '../../../js/modules/data'; import { libloki, libsignal, textsecure } from '../../window'; import { MessageSender } from '../sending'; -import * as MessageUtils from '../utils'; +import { MessageUtils } from '../utils'; import { PubKey } from '../types'; interface StringToNumberMap { diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index 7077a2921..43746d09f 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -1,54 +1,187 @@ +import { getPairedDevicesFor } from '../../../js/modules/data'; + import { EventEmitter } from 'events'; import { MessageQueueInterface, MessageQueueInterfaceEvents, } from './MessageQueueInterface'; -import { ContentMessage, OpenGroupMessage } from '../messages/outgoing'; +import { + ClosedGroupMessage, + ContentMessage, + OpenGroupMessage, + SessionRequestMessage, +} from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; -import { JobQueue, TypedEventEmitter } from '../utils'; +import { + GroupUtils, + JobQueue, + SyncMessageUtils, + TypedEventEmitter, +} from '../utils'; +import { PubKey } from '../types'; +import { MessageSender } from '.'; +import { SessionProtocol } from '../protocols'; +import { UserUtil } from '../../util'; export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter; - private readonly jobQueues: Map = new Map(); - private readonly cache: PendingMessageCache; + private readonly jobQueues: Map = new Map(); + private readonly pendingMessageCache: PendingMessageCache; constructor() { this.events = new EventEmitter(); - this.cache = new PendingMessageCache(); - this.processAllPending(); + this.pendingMessageCache = new PendingMessageCache(); + void this.processAllPending(); } - public sendUsingMultiDevice(user: string, message: ContentMessage) { - throw new Error('Method not implemented.'); + public async sendUsingMultiDevice(user: PubKey, message: ContentMessage) { + const userLinked = await getPairedDevicesFor(user.key); + const userDevices = userLinked.map(d => new PubKey(d)); + + await this.sendMessageToDevices(userDevices, message); } - public send(device: string, message: ContentMessage) { - throw new Error('Method not implemented.'); + + public async send(device: PubKey, message: ContentMessage) { + await this.sendMessageToDevices([device], message); } - public sendToGroup(message: ContentMessage | OpenGroupMessage) { - throw new Error('Method not implemented.'); + + public async sendMessageToDevices( + devices: Array, + message: ContentMessage + ) { + let currentDevices = [...devices]; + + // Sync to our devices if syncable + if (SyncMessageUtils.canSync(message)) { + const currentDevice = await UserUtil.getCurrentDevicePubKey(); + + if (currentDevice) { + const otherDevices = await getPairedDevicesFor(currentDevice); + + const ourDevices = [currentDevice, ...otherDevices].map( + device => new PubKey(device) + ); + await this.sendSyncMessage(message, ourDevices); + + // Remove our devices from currentDevices + currentDevices = currentDevices.filter(device => + ourDevices.some(d => PubKey.isEqual(d, device)) + ); + } + } + + const promises = currentDevices.map(async device => { + await this.process(device, message); + }); + + return Promise.all(promises); } - public sendSyncMessage(message: ContentMessage) { - throw new Error('Method not implemented.'); + + public async sendToGroup( + message: OpenGroupMessage | ContentMessage + ): Promise { + if ( + !(message instanceof OpenGroupMessage) && + !(message instanceof ClosedGroupMessage) + ) { + return false; + } + + // Closed groups + if (message instanceof ClosedGroupMessage) { + // Get devices in closed group + const groupPubKey = PubKey.from(message.groupId); + if (!groupPubKey) { + return false; + } + + const recipients = await GroupUtils.getGroupMembers(groupPubKey); + await this.sendMessageToDevices(recipients, message); + + return true; + } + + // Open groups + if (message instanceof OpenGroupMessage) { + // No queue needed for Open Groups; send directly + + try { + await MessageSender.sendToOpenGroup(message); + this.events.emit('success', message); + } catch (e) { + this.events.emit('fail', message, e); + } + + return true; + } + + return false; } - public processPending(device: string) { - // TODO: implement + public async sendSyncMessage(message: ContentMessage, sendTo: Array) { + // Sync with our devices + const promises = sendTo.map(async device => { + const syncMessage = SyncMessageUtils.from(message); + + return this.process(device, syncMessage); + }); + + return Promise.all(promises); } - private processAllPending() { - // TODO: Get all devices which are pending here + public async processPending(device: PubKey) { + const messages = this.pendingMessageCache.getForDevice(device); + + const isMediumGroup = GroupUtils.isMediumGroup(device); + const hasSession = SessionProtocol.hasSession(device); + + if (!isMediumGroup && !hasSession) { + await SessionProtocol.sendSessionRequestIfNeeded(device); + + return; + } + + const jobQueue = this.getJobQueue(device); + messages.forEach(async message => { + const messageId = String(message.timestamp); + + if (!jobQueue.has(messageId)) { + try { + await jobQueue.addWithId(messageId, async () => + MessageSender.send(message) + ); + void this.pendingMessageCache.remove(message); + this.events.emit('success', message); + } catch (e) { + this.events.emit('fail', message, e); + } + } + }); } - private queue(device: string, message: ContentMessage) { - // TODO: implement + private async processAllPending() { + const devices = this.pendingMessageCache.getDevices(); + const promises = devices.map(async device => this.processPending(device)); + + return Promise.all(promises); } - private queueOpenGroupMessage(message: OpenGroupMessage) { - // TODO: Do we need to queue open group messages? - // If so we can get open group job queue and add the send job here + private async process(device: PubKey, message?: ContentMessage) { + if (!message) { + return; + } + + if (message instanceof SessionRequestMessage) { + void SessionProtocol.sendSessionRequest(message, device); + + return; + } + + await this.pendingMessageCache.add(device, message); + await this.processPending(device); } - private getJobQueue(device: string): JobQueue { + private getJobQueue(device: PubKey): JobQueue { let queue = this.jobQueues.get(device); if (!queue) { queue = new JobQueue(); diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index b160ce365..c3ee606aa 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -5,18 +5,22 @@ import { } from '../messages/outgoing'; import { RawMessage } from '../types/RawMessage'; import { TypedEventEmitter } from '../utils'; +import { PubKey } from '../types'; type GroupMessageType = OpenGroupMessage | ClosedGroupMessage; export interface MessageQueueInterfaceEvents { - success: (message: RawMessage) => void; - fail: (message: RawMessage, error: Error) => void; + success: (message: RawMessage | OpenGroupMessage) => void; + fail: (message: RawMessage | OpenGroupMessage, error: Error) => void; } export interface MessageQueueInterface { events: TypedEventEmitter; - sendUsingMultiDevice(user: string, message: ContentMessage): void; - send(device: string, message: ContentMessage): void; + sendUsingMultiDevice(user: PubKey, message: ContentMessage): void; + send(device: PubKey, message: ContentMessage): void; sendToGroup(message: GroupMessageType): void; - sendSyncMessage(message: ContentMessage): void; + sendSyncMessage( + message: ContentMessage, + sendTo: Array + ): Promise>; } diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index 30bfe35d6..074ee8174 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -2,7 +2,7 @@ import { createOrUpdateItem, getItemById } from '../../../js/modules/data'; import { PartialRawMessage, RawMessage } from '../types/RawMessage'; import { ContentMessage } from '../messages/outgoing'; import { PubKey } from '../types'; -import * as MessageUtils from '../utils'; +import { MessageUtils } from '../utils'; // This is an abstraction for storing pending messages. // Ideally we want to store pending messages in the database so that diff --git a/ts/session/types/PubKey.ts b/ts/session/types/PubKey.ts index 5653db5b5..f183cb363 100644 --- a/ts/session/types/PubKey.ts +++ b/ts/session/types/PubKey.ts @@ -1,7 +1,8 @@ export class PubKey { public static readonly PUBKEY_LEN = 66; - private static readonly regex: string = `^05[0-9a-fA-F]{${PubKey.PUBKEY_LEN - - 2}}$`; + private static readonly regex: RegExp = new RegExp( + `^05[0-9a-fA-F]{${PubKey.PUBKEY_LEN - 2}}$` + ); public readonly key: string; constructor(pubkeyString: string) { @@ -19,10 +20,14 @@ export class PubKey { } public static validate(pubkeyString: string): boolean { - if (pubkeyString.match(PubKey.regex)) { + if (this.regex.test(pubkeyString)) { return true; } return false; } + + public static isEqual(key: PubKey, comparator: PubKey) { + return key.key === comparator.key; + } } diff --git a/ts/session/utils/Groups.ts b/ts/session/utils/Groups.ts new file mode 100644 index 000000000..2a0d130ba --- /dev/null +++ b/ts/session/utils/Groups.ts @@ -0,0 +1,25 @@ +import { ConversationController } from '../../window'; +import { PubKey } from '../types'; + +export async function getGroupMembers(groupId: PubKey): Promise> { + const groupConversation = ConversationController.get(groupId.key); + const groupMembers = groupConversation + ? groupConversation.attributes.members + : undefined; + + if (!groupMembers) { + return []; + } + + return groupMembers.map((member: string) => new PubKey(member)); +} + +export function isMediumGroup(groupId: PubKey): boolean { + const conversation = ConversationController.get(groupId.key); + + if (!conversation) { + return false; + } + + return Boolean(conversation.isMediumGroup()); +} diff --git a/ts/session/utils/SyncMessageUtils.ts b/ts/session/utils/SyncMessageUtils.ts new file mode 100644 index 000000000..92c1116c0 --- /dev/null +++ b/ts/session/utils/SyncMessageUtils.ts @@ -0,0 +1,72 @@ +import * as _ from 'lodash'; +import * as UserUtils from '../../util/user'; +import { + getAllConversations, + getPrimaryDeviceFor, +} from '../../../js/modules/data'; +import { ConversationController, Whisper } from '../../window'; + +import { ContentMessage, SyncMessage } from '../messages/outgoing'; + +export function from(message: ContentMessage): SyncMessage | undefined { + // const { timestamp, identifier } = message; + + // Stubbed for now + return undefined; +} + +export async function canSync(message: ContentMessage): Promise { + // This function should be agnostic to the device; it shouldn't need + // to know about the recipient + + // Stubbed for now + return Boolean(from(message)); +} + +export async function getSyncContacts(): Promise | undefined> { + const thisDevice = await UserUtils.getCurrentDevicePubKey(); + + if (!thisDevice) { + return []; + } + + const primaryDevice = await getPrimaryDeviceFor(thisDevice); + const conversations = await getAllConversations({ + ConversationCollection: Whisper.ConversationCollection, + }); + + // We are building a set of all contacts + const primaryContacts = + conversations.filter( + c => + c.isPrivate() && + !c.isOurLocalDevice() && + c.isFriend() && + !c.attributes.secondaryStatus + ) || []; + + const secondaryContactsPartial = conversations.filter( + c => + c.isPrivate() && + !c.isOurLocalDevice() && + c.isFriend() && + c.attributes.secondaryStatus + ); + + const seondaryContactsPromise = secondaryContactsPartial.map(async c => + ConversationController.getOrCreateAndWait( + c.getPrimaryDevicePubKey(), + 'private' + ) + ); + + const secondaryContacts = (await Promise.all(seondaryContactsPromise)) + // Filter out our primary key if it was added here + .filter(c => c.id !== primaryDevice); + + // Return unique contacts + return _.uniqBy( + [...primaryContacts, ...secondaryContacts], + device => !!device + ); +} diff --git a/ts/session/utils/index.ts b/ts/session/utils/index.ts index 96904245d..aa99cb92f 100644 --- a/ts/session/utils/index.ts +++ b/ts/session/utils/index.ts @@ -1,3 +1,8 @@ +import * as MessageUtils from './Messages'; +import * as GroupUtils from './Groups'; +import * as SyncMessageUtils from './SyncMessageUtils'; + export * from './TypedEmitter'; export * from './JobQueue'; -export * from './Messages'; + +export { MessageUtils, SyncMessageUtils, GroupUtils }; diff --git a/ts/test/session/sending/PendingMessageCache_test.ts b/ts/test/session/sending/PendingMessageCache_test.ts index d732d859e..05b2d7b87 100644 --- a/ts/test/session/sending/PendingMessageCache_test.ts +++ b/ts/test/session/sending/PendingMessageCache_test.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import * as _ from 'lodash'; -import * as MessageUtils from '../../../session/utils'; +import { MessageUtils } from '../../../session/utils'; import { TestUtils } from '../../../test/test-utils'; import { PendingMessageCache } from '../../../session/sending/PendingMessageCache'; @@ -53,7 +53,7 @@ describe('PendingMessageCache', () => { it('can add to cache', async () => { const device = TestUtils.generateFakePubkey(); - const message = TestUtils.generateUniqueChatMessage(); + const message = TestUtils.generateChatMessage(); const rawMessage = MessageUtils.toRawMessage(device, message); await pendingMessageCacheStub.add(device, message); @@ -70,7 +70,7 @@ describe('PendingMessageCache', () => { it('can remove from cache', async () => { const device = TestUtils.generateFakePubkey(); - const message = TestUtils.generateUniqueChatMessage(); + const message = TestUtils.generateChatMessage(); const rawMessage = MessageUtils.toRawMessage(device, message); await pendingMessageCacheStub.add(device, message); @@ -91,15 +91,15 @@ describe('PendingMessageCache', () => { const cacheItems = [ { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, ]; @@ -123,11 +123,11 @@ describe('PendingMessageCache', () => { const cacheItems = [ { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, ]; @@ -150,7 +150,7 @@ describe('PendingMessageCache', () => { it('can find nothing when empty', async () => { const device = TestUtils.generateFakePubkey(); - const message = TestUtils.generateUniqueChatMessage(); + const message = TestUtils.generateChatMessage(); const rawMessage = MessageUtils.toRawMessage(device, message); const foundMessage = pendingMessageCacheStub.find(rawMessage); @@ -159,7 +159,7 @@ describe('PendingMessageCache', () => { it('can find message in cache', async () => { const device = TestUtils.generateFakePubkey(); - const message = TestUtils.generateUniqueChatMessage(); + const message = TestUtils.generateChatMessage(); const rawMessage = MessageUtils.toRawMessage(device, message); await pendingMessageCacheStub.add(device, message); @@ -176,15 +176,15 @@ describe('PendingMessageCache', () => { const cacheItems = [ { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, ]; @@ -206,15 +206,15 @@ describe('PendingMessageCache', () => { const cacheItems = [ { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, ]; diff --git a/ts/test/test-utils/testUtils.ts b/ts/test/test-utils/testUtils.ts index a8ced37d5..4cfbc50ac 100644 --- a/ts/test/test-utils/testUtils.ts +++ b/ts/test/test-utils/testUtils.ts @@ -55,7 +55,7 @@ export function generateFakePubkey(): PubKey { return new PubKey(pubkeyString); } -export function generateUniqueChatMessage(): ChatMessage { +export function generateChatMessage(): ChatMessage { return new ChatMessage({ body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit', identifier: uuid(),