You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
session-desktop/ts/session/sending/MessageQueue.ts

107 lines
3.0 KiB
TypeScript

import { EventEmitter } from 'events';
import {
MessageQueueInterface,
MessageQueueInterfaceEvents,
} from './MessageQueueInterface';
5 years ago
import { ContentMessage, OpenGroupMessage, SyncMessage, SessionResetMessage } from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache';
import { JobQueue, TypedEventEmitter } from '../utils';
5 years ago
import { PubKey } from '../types';
import { ConversationController } from '../../window';
import { MessageSender } from '.';
export class MessageQueue implements MessageQueueInterface {
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
5 years ago
private readonly jobQueues: Map<PubKey, JobQueue> = new Map();
private readonly cache: PendingMessageCache;
constructor() {
this.events = new EventEmitter();
this.cache = new PendingMessageCache();
this.processAllPending();
}
public sendUsingMultiDevice(user: string, message: ContentMessage) {
5 years ago
// this.cache
// throw new Error('Method not implemented.');
}
5 years ago
public send(device: PubKey, message: ContentMessage) {
throw new Error('Method not implemented.');
}
public sendToGroup(message: ContentMessage | OpenGroupMessage) {
throw new Error('Method not implemented.');
}
public sendSyncMessage(message: ContentMessage) {
throw new Error('Method not implemented.');
}
5 years ago
public processPending(device: PubKey) {
// TODO: implement
5 years ago
const SessionManager: any = {}; // TEMP FIX
const messages = this.cache.getForDevice(device);
const conversation = ConversationController.get(device.key);
const isMediumGroup = conversation.isMediumGroup();
const hasSession = false; // TODO ; = SessionManager.hasSession(device);
if (!isMediumGroup && !hasSession) {
SessionManager.sendSessionRequestIfNeeded();
return;
}
const jobQueue = this.getJobQueue(device);
messages.forEach(message => {
if (!jobQueue.has(message.identifier)) {
const promise = jobQueue.add(message.identifier, MessageSender.send(message));
promise.then(() => {
// Message sent; remove from cache
void this.cache.remove(message);
}).catch(() => {
// Message failed to send
});
}
});
}
private processAllPending() {
// TODO: Get all devices which are pending here
5 years ago
}
5 years ago
private queue(device: PubKey, message: ContentMessage) {
// TODO: implement
5 years ago
if (message instanceof SessionResetMessage) {
return;
}
const added = this.cache.add(device, message);
// if not added?
this.processPending(device);
}
private queueOpenGroupMessage(message: OpenGroupMessage) {
// TODO: Do we need to queue open group messages?
// If so we can get open group job queue and add the send job here
}
5 years ago
private getJobQueue(device: PubKey): JobQueue {
let queue = this.jobQueues.get(device);
if (!queue) {
queue = new JobQueue();
this.jobQueues.set(device, queue);
}
return queue;
}
}