diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index 7077a2921..cecbe589d 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -3,13 +3,16 @@ import { MessageQueueInterface, MessageQueueInterfaceEvents, } from './MessageQueueInterface'; -import { ContentMessage, OpenGroupMessage } from '../messages/outgoing'; +import { ContentMessage, OpenGroupMessage, SyncMessage, SessionResetMessage } from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; import { JobQueue, TypedEventEmitter } from '../utils'; +import { PubKey } from '../types'; +import { ConversationController } from '../../window'; +import { MessageSender } from '.'; export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter; - private readonly jobQueues: Map = new Map(); + private readonly jobQueues: Map = new Map(); private readonly cache: PendingMessageCache; constructor() { @@ -19,9 +22,11 @@ export class MessageQueue implements MessageQueueInterface { } public sendUsingMultiDevice(user: string, message: ContentMessage) { - throw new Error('Method not implemented.'); + // this.cache + + // throw new Error('Method not implemented.'); } - public send(device: string, message: ContentMessage) { + public send(device: PubKey, message: ContentMessage) { throw new Error('Method not implemented.'); } public sendToGroup(message: ContentMessage | OpenGroupMessage) { @@ -31,16 +36,57 @@ export class MessageQueue implements MessageQueueInterface { throw new Error('Method not implemented.'); } - public processPending(device: string) { + public processPending(device: PubKey) { // TODO: implement + const SessionManager: any = {}; // TEMP FIX + + const messages = this.cache.getForDevice(device); + + const conversation = ConversationController.get(device.key); + const isMediumGroup = conversation.isMediumGroup(); + + const hasSession = false; // TODO ; = SessionManager.hasSession(device); + + if (!isMediumGroup && !hasSession) { + SessionManager.sendSessionRequestIfNeeded(); + + return; + } + + const jobQueue = this.getJobQueue(device); + messages.forEach(message => { + if (!jobQueue.has(message.identifier)) { + const promise = jobQueue.add(message.identifier, MessageSender.send(message)); + + promise.then(() => { + // Message sent; remove from cache + void this.cache.remove(message); + }).catch(() => { + // Message failed to send + }); + } + }); + + + } private processAllPending() { // TODO: Get all devices which are pending here + } - private queue(device: string, message: ContentMessage) { + private queue(device: PubKey, message: ContentMessage) { // TODO: implement + if (message instanceof SessionResetMessage) { + return; + } + + const added = this.cache.add(device, message); + + // if not added? + + this.processPending(device); } private queueOpenGroupMessage(message: OpenGroupMessage) { @@ -48,7 +94,7 @@ export class MessageQueue implements MessageQueueInterface { // If so we can get open group job queue and add the send job here } - private getJobQueue(device: string): JobQueue { + private getJobQueue(device: PubKey): JobQueue { let queue = this.jobQueues.get(device); if (!queue) { queue = new JobQueue(); diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index b160ce365..21a018cbf 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -5,6 +5,7 @@ import { } from '../messages/outgoing'; import { RawMessage } from '../types/RawMessage'; import { TypedEventEmitter } from '../utils'; +import { PubKey } from '../types'; type GroupMessageType = OpenGroupMessage | ClosedGroupMessage; @@ -16,7 +17,7 @@ export interface MessageQueueInterfaceEvents { export interface MessageQueueInterface { events: TypedEventEmitter; sendUsingMultiDevice(user: string, message: ContentMessage): void; - send(device: string, message: ContentMessage): void; + send(device: PubKey, message: ContentMessage): void; sendToGroup(message: GroupMessageType): void; sendSyncMessage(message: ContentMessage): void; } diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index 1f57a5f0c..8f2eb0711 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -102,7 +102,7 @@ export class PendingMessageCache { await this.saveToDB(); } - public async loadFromDB() { + private async loadFromDB() { const messages = await this.getFromStorage(); this.cache = messages; }