You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
session-desktop/ts/session/sending/MessageQueue.ts

246 lines
7.8 KiB
TypeScript

import { EventEmitter } from 'events';
5 years ago
import {
ChatMessage,
ClosedGroupChatMessage,
ClosedGroupNewMessage,
5 years ago
ContentMessage,
DataMessage,
5 years ago
ExpirationTimerUpdateMessage,
5 years ago
OpenGroupMessage,
} from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache';
import { JobQueue, TypedEventEmitter, UserUtils } from '../utils';
import { PubKey, RawMessage } from '../types';
5 years ago
import { MessageSender } from '.';
import { ClosedGroupMessage } from '../messages/outgoing/content/data/group/ClosedGroupMessage';
import { ConfigurationMessage } from '../messages/outgoing/content/ConfigurationMessage';
import { ClosedGroupNameChangeMessage } from '../messages/outgoing/content/data/group/ClosedGroupNameChangeMessage';
import {
ClosedGroupAddedMembersMessage,
ClosedGroupEncryptionPairMessage,
ClosedGroupEncryptionPairRequestMessage,
ClosedGroupRemovedMembersMessage,
ClosedGroupUpdateMessage,
} from '../messages/outgoing/content/data/group';
import { ClosedGroupMemberLeftMessage } from '../messages/outgoing/content/data/group/ClosedGroupMemberLeftMessage';
export type GroupMessageType =
| OpenGroupMessage
| ClosedGroupChatMessage
| ClosedGroupAddedMembersMessage
| ClosedGroupRemovedMembersMessage
| ClosedGroupNameChangeMessage
| ClosedGroupMemberLeftMessage
| ClosedGroupUpdateMessage
| ExpirationTimerUpdateMessage
| ClosedGroupEncryptionPairMessage
| ClosedGroupEncryptionPairRequestMessage;
// ClosedGroupEncryptionPairReplyMessage must be sent to a user pubkey. Not a group.
export interface MessageQueueInterfaceEvents {
sendSuccess: (
message: RawMessage | OpenGroupMessage,
wrappedEnvelope?: Uint8Array
) => void;
sendFail: (message: RawMessage | OpenGroupMessage, error: Error) => void;
}
export class MessageQueue {
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
private readonly jobQueues: Map<string, JobQueue> = new Map();
5 years ago
private readonly pendingMessageCache: PendingMessageCache;
constructor(cache?: PendingMessageCache) {
this.events = new EventEmitter();
this.pendingMessageCache = cache ?? new PendingMessageCache();
5 years ago
void this.processAllPending();
}
4 years ago
public async sendToPubKey(
user: PubKey,
message: ContentMessage,
sentCb?: (message: RawMessage) => Promise<void>
): Promise<void> {
if (
message instanceof ConfigurationMessage ||
!!(message as any).syncTarget
) {
throw new Error('SyncMessage needs to be sent with sendSyncMessage');
}
await this.process(user, message, sentCb);
}
5 years ago
public async send(
device: PubKey,
message: ContentMessage,
sentCb?: (message: RawMessage) => Promise<void>
): Promise<void> {
if (
message instanceof ConfigurationMessage ||
!!(message as any).syncTarget
) {
throw new Error('SyncMessage needs to be sent with sendSyncMessage');
}
await this.process(device, message, sentCb);
}
5 years ago
/**
*
* @param sentCb currently only called for medium groups sent message
*/
5 years ago
public async sendToGroup(
message: GroupMessageType,
sentCb?: (message: RawMessage) => Promise<void>
): Promise<void> {
5 years ago
// Open groups
if (message instanceof OpenGroupMessage) {
// No queue needed for Open Groups; send directly
5 years ago
const error = new Error('Failed to send message to open group.');
// This is absolutely yucky ... we need to make it not use Promise<boolean>
5 years ago
try {
5 years ago
const result = await MessageSender.sendToOpenGroup(message);
// sendToOpenGroup returns -1 if failed or an id if succeeded
if (result.serverId < 0) {
this.events.emit('sendFail', message, error);
} else {
const messageEventData = {
identifier: message.identifier,
pubKey: message.group.groupId,
timestamp: message.timestamp,
serverId: result.serverId,
serverTimestamp: result.serverTimestamp,
};
this.events.emit('sendSuccess', message);
window.Whisper.events.trigger('publicMessageSent', messageEventData);
5 years ago
}
5 years ago
} catch (e) {
4 years ago
window?.log?.warn(
5 years ago
`Failed to send message to open group: ${message.group.server}`,
e
);
this.events.emit('sendFail', message, error);
5 years ago
}
return;
}
let groupId: PubKey | undefined;
if (
message instanceof ExpirationTimerUpdateMessage ||
message instanceof ClosedGroupMessage
) {
groupId = message.groupId;
}
if (!groupId) {
throw new Error('Invalid group message passed in sendToGroup.');
5 years ago
}
// if groupId is set here, it means it's for a medium group. So send it as it
return this.send(PubKey.cast(groupId), message, sentCb);
}
public async sendSyncMessage(
message?: ContentMessage,
sentCb?: (message: RawMessage) => Promise<void>
): Promise<void> {
5 years ago
if (!message) {
return;
}
if (
!(message instanceof ConfigurationMessage) &&
!(message as any)?.syncTarget
) {
throw new Error('Invalid message given to sendSyncMessage');
}
const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
4 years ago
if (!ourPubKey) {
throw new Error('ourNumber is not set');
4 years ago
}
await this.process(PubKey.cast(ourPubKey), message, sentCb);
}
5 years ago
public async processPending(device: PubKey) {
5 years ago
const messages = await this.pendingMessageCache.getForDevice(device);
5 years ago
const jobQueue = this.getJobQueue(device);
5 years ago
messages.forEach(async message => {
const messageId = String(message.timestamp);
if (!jobQueue.has(messageId)) {
// We put the event handling inside this job to avoid sending duplicate events
const job = async () => {
try {
const wrappedEnvelope = await MessageSender.send(message);
this.events.emit('sendSuccess', message, wrappedEnvelope);
const cb = this.pendingMessageCache.callbacks.get(
message.identifier
);
if (cb) {
await cb(message);
}
this.pendingMessageCache.callbacks.delete(message.identifier);
} catch (e) {
this.events.emit('sendFail', message, e);
} finally {
// Remove from the cache because retrying is done in the sender
void this.pendingMessageCache.remove(message);
}
};
await jobQueue.addWithId(messageId, job);
5 years ago
}
});
}
5 years ago
private async processAllPending() {
5 years ago
const devices = await this.pendingMessageCache.getDevices();
5 years ago
const promises = devices.map(async device => this.processPending(device));
return Promise.all(promises);
}
/**
* This method should not be called directly. Only through sendToPubKey.
*/
5 years ago
private async process(
device: PubKey,
message: ContentMessage,
sentCb?: (message: RawMessage) => Promise<void>
5 years ago
): Promise<void> {
5 years ago
// Don't send to ourselves
const currentDevice = UserUtils.getOurPubKeyFromCache();
if (currentDevice && device.isEqual(currentDevice)) {
// We allow a message for ourselve only if it's a ConfigurationMessage, a ClosedGroupNewMessage,
// or a message with a syncTarget set.
if (
message instanceof ConfigurationMessage ||
message instanceof ClosedGroupNewMessage ||
(message as any).syncTarget?.length > 0
) {
window.log.warn('Processing sync message');
} else {
window.log.warn('Dropping message in process() to be sent to ourself');
return;
}
5 years ago
}
await this.pendingMessageCache.add(device, message, sentCb);
void this.processPending(device);
}
5 years ago
private getJobQueue(device: PubKey): JobQueue {
let queue = this.jobQueues.get(device.key);
if (!queue) {
queue = new JobQueue();
this.jobQueues.set(device.key, queue);
}
return queue;
}
}