Allow passing a cache to the queue

pull/1185/head
Mikunj 5 years ago
parent fe7aaa0aaa
commit ef76972ccb

@ -26,9 +26,9 @@ export class MessageQueue implements MessageQueueInterface {
private readonly jobQueues: Map<PubKey, JobQueue> = new Map(); private readonly jobQueues: Map<PubKey, JobQueue> = new Map();
private readonly pendingMessageCache: PendingMessageCache; private readonly pendingMessageCache: PendingMessageCache;
constructor() { constructor(cache?: PendingMessageCache) {
this.events = new EventEmitter(); this.events = new EventEmitter();
this.pendingMessageCache = new PendingMessageCache(); this.pendingMessageCache = cache ?? new PendingMessageCache();
void this.processAllPending(); void this.processAllPending();
} }
@ -143,10 +143,12 @@ export class MessageQueue implements MessageQueueInterface {
await jobQueue.addWithId(messageId, async () => await jobQueue.addWithId(messageId, async () =>
MessageSender.send(message) MessageSender.send(message)
); );
void this.pendingMessageCache.remove(message);
this.events.emit('success', message); this.events.emit('success', message);
} catch (e) { } catch (e) {
this.events.emit('fail', message, e); this.events.emit('fail', message, e);
} finally {
// Remove from the cache because retrying is done in the sender
void this.pendingMessageCache.remove(message);
} }
} }
}); });
@ -173,7 +175,7 @@ export class MessageQueue implements MessageQueueInterface {
} }
await this.pendingMessageCache.add(device, message); await this.pendingMessageCache.add(device, message);
await this.processPending(device); void this.processPending(device);
} }
private getJobQueue(device: PubKey): JobQueue { private getJobQueue(device: PubKey): JobQueue {

@ -93,12 +93,12 @@ export class PendingMessageCache {
await this.saveToDB(); await this.saveToDB();
} }
private async loadFromDB() { protected async loadFromDB() {
const messages = await this.getFromStorage(); const messages = await this.getFromStorage();
this.cache = messages; this.cache = messages;
} }
private async getFromStorage(): Promise<Array<RawMessage>> { protected async getFromStorage(): Promise<Array<RawMessage>> {
const data = await getItemById('pendingMessages'); const data = await getItemById('pendingMessages');
if (!data || !data.value) { if (!data || !data.value) {
return []; return [];
@ -117,7 +117,7 @@ export class PendingMessageCache {
}); });
} }
private async saveToDB() { protected async saveToDB() {
// For each plainTextBuffer in cache, save in as a simple Array<number> to avoid // For each plainTextBuffer in cache, save in as a simple Array<number> to avoid
// Node issues with JSON stringifying Buffer without strict typing // Node issues with JSON stringifying Buffer without strict typing
const encodedCache = [...this.cache].map(item => { const encodedCache = [...this.cache].map(item => {

Loading…
Cancel
Save