|
|
|
@ -26,7 +26,7 @@ import { UserUtil } from '../../util';
|
|
|
|
|
|
|
|
|
|
export class MessageQueue implements MessageQueueInterface {
|
|
|
|
|
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
|
|
|
|
|
private readonly jobQueues: Map<PubKey, JobQueue> = new Map();
|
|
|
|
|
private readonly jobQueues: Map<string, JobQueue> = new Map();
|
|
|
|
|
private readonly pendingMessageCache: PendingMessageCache;
|
|
|
|
|
|
|
|
|
|
constructor(cache?: PendingMessageCache) {
|
|
|
|
@ -162,17 +162,19 @@ export class MessageQueue implements MessageQueueInterface {
|
|
|
|
|
const messageId = String(message.timestamp);
|
|
|
|
|
|
|
|
|
|
if (!jobQueue.has(messageId)) {
|
|
|
|
|
try {
|
|
|
|
|
await jobQueue.addWithId(messageId, async () =>
|
|
|
|
|
MessageSender.send(message)
|
|
|
|
|
);
|
|
|
|
|
this.events.emit('success', message);
|
|
|
|
|
} catch (e) {
|
|
|
|
|
this.events.emit('fail', message, e);
|
|
|
|
|
} finally {
|
|
|
|
|
// Remove from the cache because retrying is done in the sender
|
|
|
|
|
void this.pendingMessageCache.remove(message);
|
|
|
|
|
}
|
|
|
|
|
// We put the event handling inside this job to avoid sending duplicate events
|
|
|
|
|
const job = async () => {
|
|
|
|
|
try {
|
|
|
|
|
await MessageSender.send(message);
|
|
|
|
|
this.events.emit('success', message);
|
|
|
|
|
} catch (e) {
|
|
|
|
|
this.events.emit('fail', message, e);
|
|
|
|
|
} finally {
|
|
|
|
|
// Remove from the cache because retrying is done in the sender
|
|
|
|
|
void this.pendingMessageCache.remove(message);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
await jobQueue.addWithId(messageId, job);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
@ -203,10 +205,10 @@ export class MessageQueue implements MessageQueueInterface {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private getJobQueue(device: PubKey): JobQueue {
|
|
|
|
|
let queue = this.jobQueues.get(device);
|
|
|
|
|
let queue = this.jobQueues.get(device.key);
|
|
|
|
|
if (!queue) {
|
|
|
|
|
queue = new JobQueue();
|
|
|
|
|
this.jobQueues.set(device, queue);
|
|
|
|
|
this.jobQueues.set(device.key, queue);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return queue;
|
|
|
|
|