From fb53f9ff36224fc9f474c904394d1121e3f56f82 Mon Sep 17 00:00:00 2001 From: Mikunj Date: Thu, 25 Jun 2020 12:13:53 +1000 Subject: [PATCH] Fix message queue --- ts/session/sending/MessageQueue.ts | 30 ++++++++++++++++-------------- ts/session/types/PubKey.ts | 4 ++-- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index b675b857a..cbb2e817e 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -26,7 +26,7 @@ import { UserUtil } from '../../util'; export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter; - private readonly jobQueues: Map = new Map(); + private readonly jobQueues: Map = new Map(); private readonly pendingMessageCache: PendingMessageCache; constructor(cache?: PendingMessageCache) { @@ -162,17 +162,19 @@ export class MessageQueue implements MessageQueueInterface { const messageId = String(message.timestamp); if (!jobQueue.has(messageId)) { - try { - await jobQueue.addWithId(messageId, async () => - MessageSender.send(message) - ); - this.events.emit('success', message); - } catch (e) { - this.events.emit('fail', message, e); - } finally { - // Remove from the cache because retrying is done in the sender - void this.pendingMessageCache.remove(message); - } + // We put the event handling inside this job to avoid sending duplicate events + const job = async () => { + try { + await MessageSender.send(message); + this.events.emit('success', message); + } catch (e) { + this.events.emit('fail', message, e); + } finally { + // Remove from the cache because retrying is done in the sender + void this.pendingMessageCache.remove(message); + } + }; + await jobQueue.addWithId(messageId, job); } }); } @@ -203,10 +205,10 @@ export class MessageQueue implements MessageQueueInterface { } private getJobQueue(device: PubKey): JobQueue { - let queue = this.jobQueues.get(device); + let queue = this.jobQueues.get(device.key); if (!queue) { queue = new JobQueue(); - this.jobQueues.set(device, queue); + this.jobQueues.set(device.key, queue); } return queue; diff --git a/ts/session/types/PubKey.ts b/ts/session/types/PubKey.ts index 0d83ec958..a76e4bf6d 100644 --- a/ts/session/types/PubKey.ts +++ b/ts/session/types/PubKey.ts @@ -15,7 +15,7 @@ export class PubKey { if (!PubKey.validate(pubkeyString)) { throw new Error(`Invalid pubkey string passed: ${pubkeyString}`); } - this.key = pubkeyString; + this.key = pubkeyString.toLowerCase(); } /** @@ -54,7 +54,7 @@ export class PubKey { public isEqual(comparator: PubKey | string) { return comparator instanceof PubKey ? this.key === comparator.key - : this.key === comparator; + : this.key === comparator.toLowerCase(); } }