Update tests

pull/1185/head
Mikunj 5 years ago
parent ef76972ccb
commit d862269f8d

@ -8,6 +8,7 @@ import {
ContentMessage,
OpenGroupMessage,
SessionRequestMessage,
SyncMessage,
} from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache';
import {
@ -20,6 +21,7 @@ import { PubKey } from '../types';
import { MessageSender } from '.';
import { MultiDeviceProtocol, SessionProtocol } from '../protocols';
import { UserUtil } from '../../util';
import promise from 'redux-promise-middleware';
export class MessageQueue implements MessageQueueInterface {
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
@ -50,20 +52,18 @@ export class MessageQueue implements MessageQueueInterface {
// Sync to our devices if syncable
if (SyncMessageUtils.canSync(message)) {
const currentDevice = await UserUtil.getCurrentDevicePubKey();
if (currentDevice) {
const ourDevices = await MultiDeviceProtocol.getAllDevices(
currentDevice
const syncMessage = SyncMessageUtils.from(message);
if (!syncMessage) {
throw new Error(
'MessageQueue internal error occured: failed to make sync message'
);
}
await this.sendSyncMessage(message, ourDevices);
await this.sendSyncMessage(syncMessage);
// Remove our devices from currentDevices
currentDevices = currentDevices.filter(device =>
ourDevices.some(d => device.isEqual(d))
);
}
const ourDevices = await MultiDeviceProtocol.getOurDevices();
// Remove our devices from currentDevices
currentDevices = currentDevices.filter(device => !ourDevices.some(d => device.isEqual(d)));
}
const promises = currentDevices.map(async device => {
@ -79,30 +79,42 @@ export class MessageQueue implements MessageQueueInterface {
// Closed groups
if (message instanceof ClosedGroupMessage) {
// Get devices in closed group
const groupPubKey = PubKey.from(message.groupId);
if (!groupPubKey) {
const recipients = await GroupUtils.getGroupMembers(message.groupId);
if (recipients.length === 0) {
return false;
}
const recipients = await GroupUtils.getGroupMembers(groupPubKey);
if (recipients.length) {
await this.sendMessageToDevices(recipients, message);
// Send to all devices of members
await Promise.all(
recipients.map(async recipient =>
this.sendUsingMultiDevice(recipient, message)
)
);
return true;
}
return true;
}
// Open groups
if (message instanceof OpenGroupMessage) {
// No queue needed for Open Groups; send directly
const error = new Error('Failed to send message to open group.');
// This is absolutely yucky ... we need to make it not use Promise<boolean>
try {
await MessageSender.sendToOpenGroup(message);
this.events.emit('success', message);
const result = await MessageSender.sendToOpenGroup(message);
if (result) {
this.events.emit('success', message);
} else {
this.events.emit('fail', message, error);
}
return true;
return result;
} catch (e) {
this.events.emit('fail', message, e);
console.warn(
`Failed to send message to open group: ${message.group.server}`,
e
);
this.events.emit('fail', message, error);
return false;
}
@ -111,21 +123,20 @@ export class MessageQueue implements MessageQueueInterface {
return false;
}
public async sendSyncMessage(message: ContentMessage, sendTo: Array<PubKey>) {
// Sync with our devices
const promises = sendTo.map(async device => {
const syncMessage = SyncMessageUtils.from(message);
return this.process(device, syncMessage);
});
public async sendSyncMessage(message: SyncMessage | undefined): Promise<any> {
if (!message) {
return;
}
const ourDevices = await MultiDeviceProtocol.getOurDevices();
const promises = ourDevices.map(async device => this.process(device, message));
return Promise.all(promises);
}
public async processPending(device: PubKey) {
const messages = this.pendingMessageCache.getForDevice(device);
const messages = await this.pendingMessageCache.getForDevice(device);
const isMediumGroup = GroupUtils.isMediumGroup(device);
const isMediumGroup = GroupUtils.isMediumGroup(device.key);
const hasSession = await SessionProtocol.hasSession(device);
if (!isMediumGroup && !hasSession) {
@ -155,13 +166,16 @@ export class MessageQueue implements MessageQueueInterface {
}
private async processAllPending() {
const devices = this.pendingMessageCache.getDevices();
const devices = await this.pendingMessageCache.getDevices();
const promises = devices.map(async device => this.processPending(device));
return Promise.all(promises);
}
private async process(device: PubKey, message?: ContentMessage) {
private async process(
device: PubKey,
message?: ContentMessage
): Promise<void> {
// Don't send to ourselves
const currentDevice = await UserUtil.getCurrentDevicePubKey();
if (!message || (currentDevice && device.isEqual(currentDevice))) {

@ -2,6 +2,7 @@ import {
ClosedGroupMessage,
ContentMessage,
OpenGroupMessage,
SyncMessage,
} from '../messages/outgoing';
import { RawMessage } from '../types/RawMessage';
import { TypedEventEmitter } from '../utils';
@ -19,8 +20,5 @@ export interface MessageQueueInterface {
sendUsingMultiDevice(user: PubKey, message: ContentMessage): void;
send(device: PubKey, message: ContentMessage): void;
sendToGroup(message: GroupMessageType): void;
sendSyncMessage(
message: ContentMessage,
sendTo: Array<PubKey>
): Promise<Array<void>>;
sendSyncMessage(message: SyncMessage | undefined): Promise<any>;
}

@ -1,3 +1,4 @@
import _ from 'lodash';
import { createOrUpdateItem, getItemById } from '../../../js/modules/data';
import { PartialRawMessage, RawMessage } from '../types/RawMessage';
import { ContentMessage } from '../messages/outgoing';
@ -12,33 +13,25 @@ import { MessageUtils } from '../utils';
// memory and sync its state with the database on modification (add or remove).
export class PendingMessageCache {
public readonly isReady: Promise<boolean>;
private cache: Array<RawMessage>;
protected loadPromise: Promise<void> | undefined;
protected cache: Array<RawMessage> = [];
constructor() {
// Load pending messages from the database
// You should await isReady on making a new PendingMessageCache
// if you'd like to have instant access to the cache
this.cache = [];
this.isReady = new Promise(async resolve => {
await this.loadFromDB();
resolve(true);
});
}
public getAllPending(): Array<RawMessage> {
public async getAllPending(): Promise<Array<RawMessage>> {
await this.loadFromDBIfNeeded();
// Get all pending from cache, sorted with oldest first
return [...this.cache].sort((a, b) => a.timestamp - b.timestamp);
}
public getForDevice(device: PubKey): Array<RawMessage> {
return this.getAllPending().filter(m => m.device === device.key);
public async getForDevice(device: PubKey): Promise<Array<RawMessage>> {
const pending = await this.getAllPending();
return pending.filter(m => m.device === device.key);
}
public getDevices(): Array<PubKey> {
public async getDevices(): Promise<Array<PubKey>> {
await this.loadFromDBIfNeeded();
// Gets all unique devices with pending messages
const pubkeyStrings = [...new Set(this.cache.map(m => m.device))];
const pubkeyStrings = _.uniq(this.cache.map(m => m.device));
return pubkeyStrings.map(PubKey.from).filter((k): k is PubKey => !!k);
}
@ -47,6 +40,7 @@ export class PendingMessageCache {
device: PubKey,
message: ContentMessage
): Promise<RawMessage> {
await this.loadFromDBIfNeeded();
const rawMessage = MessageUtils.toRawMessage(device, message);
// Does it exist in cache already?
@ -63,6 +57,7 @@ export class PendingMessageCache {
public async remove(
message: RawMessage
): Promise<Array<RawMessage> | undefined> {
await this.loadFromDBIfNeeded();
// Should only be called after message is processed
// Return if message doesn't exist in cache
@ -72,7 +67,7 @@ export class PendingMessageCache {
// Remove item from cache and sync with database
const updatedCache = this.cache.filter(
m => m.identifier !== message.identifier
cached => !(cached.device === message.device && cached.timestamp === message.timestamp)
);
this.cache = updatedCache;
await this.saveToDB();
@ -93,6 +88,14 @@ export class PendingMessageCache {
await this.saveToDB();
}
protected async loadFromDBIfNeeded() {
if (!this.loadPromise) {
this.loadPromise = this.loadFromDB();
}
await this.loadPromise;
}
protected async loadFromDB() {
const messages = await this.getFromStorage();
this.cache = messages;

@ -1,7 +1,11 @@
import { PubKey } from '../types';
import _ from 'lodash';
import { PrimaryPubKey } from '../types';
import { MultiDeviceProtocol } from '../protocols';
export async function getGroupMembers(groupId: PubKey): Promise<Array<PubKey>> {
const groupConversation = window.ConversationController.get(groupId.key);
export async function getGroupMembers(
groupId: string
): Promise<Array<PrimaryPubKey>> {
const groupConversation = window.ConversationController.get(groupId);
const groupMembers = groupConversation
? groupConversation.attributes.members
: undefined;
@ -10,11 +14,16 @@ export async function getGroupMembers(groupId: PubKey): Promise<Array<PubKey>> {
return [];
}
return groupMembers.map((member: string) => new PubKey(member));
const promises = (groupMembers as Array<string>).map(async (member: string) =>
MultiDeviceProtocol.getPrimaryDevice(member)
);
const primaryDevices = await Promise.all(promises);
return _.uniqWith(primaryDevices, (a, b) => a.isEqual(b));
}
export function isMediumGroup(groupId: PubKey): boolean {
const conversation = window.ConversationController.get(groupId.key);
export function isMediumGroup(groupId: string): boolean {
const conversation = window.ConversationController.get(groupId);
if (!conversation) {
return false;

@ -5,7 +5,9 @@ import { ContentMessage, SyncMessage } from '../messages/outgoing';
import { MultiDeviceProtocol } from '../protocols';
export function from(message: ContentMessage): SyncMessage | undefined {
// const { timestamp, identifier } = message;
if (message instanceof SyncMessage) {
return message;
}
// Stubbed for now
return undefined;

@ -1,96 +1,52 @@
import { expect } from 'chai';
import chai from 'chai';
import * as sinon from 'sinon';
import * as _ from 'lodash';
import { GroupUtils, SyncMessageUtils } from '../../../session/utils';
import { Stubs, TestUtils } from '../../../test/test-utils';
import { MessageQueue } from '../../../session/sending/MessageQueue';
import {
ChatMessage,
ClosedGroupMessage,
ContentMessage,
OpenGroupMessage,
} from '../../../session/messages/outgoing';
import { PubKey, RawMessage } from '../../../session/types';
import { PrimaryPubKey, PubKey, RawMessage } from '../../../session/types';
import { UserUtil } from '../../../util';
import { MessageSender, PendingMessageCache } from '../../../session/sending';
import { toRawMessage } from '../../../session/utils/Messages';
import { MessageSender } from '../../../session/sending';
import {
MultiDeviceProtocol,
SessionProtocol,
} from '../../../session/protocols';
import { PendingMessageCacheStub } from '../../test-utils/stubs';
import { describe } from 'mocha';
import { TestSyncMessage } from '../../test-utils/stubs/messages/TestSyncMessage';
// Equivalent to Data.StorageItem
interface StorageItem {
id: string;
value: any;
}
// Helper function to force sequential on events checks
async function tick() {
return new Promise(resolve => {
// tslint:disable-next-line: no-string-based-set-timeout
setTimeout(resolve, 0);
});
}
// tslint:disable-next-line: no-require-imports no-var-requires
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
const { expect } = chai;
describe('MessageQueue', () => {
// Initialize new stubbed cache
let data: StorageItem;
const sandbox = sinon.createSandbox();
const ourDevice = TestUtils.generateFakePubKey();
const ourNumber = ourDevice.key;
const pairedDevices = TestUtils.generateFakePubKeys(2);
// Initialize new stubbed queue
let pendingMessageCache: PendingMessageCacheStub;
let messageQueueStub: MessageQueue;
// Spies
let sendMessageToDevicesSpy: sinon.SinonSpy<
[Array<PubKey>, ContentMessage],
Promise<Array<void>>
>;
let sendSyncMessageSpy: sinon.SinonSpy<
[ContentMessage, Array<PubKey>],
Promise<Array<void>>
>;
let sendToGroupSpy: sinon.SinonSpy<
[OpenGroupMessage | ClosedGroupMessage],
Promise<boolean>
>;
// Message Sender Stubs
let sendStub: sinon.SinonStub<[RawMessage, (number | undefined)?]>;
let sendToOpenGroupStub: sinon.SinonStub<[OpenGroupMessage]>;
// Utils Stubs
let groupMembersStub: sinon.SinonStub;
let canSyncStub: sinon.SinonStub<[ContentMessage], boolean>;
let isMediumGroupStub: sinon.SinonStub<[string], boolean>;
// Session Protocol Stubs
let hasSessionStub: sinon.SinonStub<[PubKey]>;
let sendSessionRequestIfNeededStub: sinon.SinonStub<[PubKey], Promise<void>>;
beforeEach(async () => {
// Stub out methods which touch the database
const storageID = 'pendingMessages';
data = {
id: storageID,
value: '[]',
};
// Pending Message Cache Data Stubs
TestUtils.stubData('getItemById')
.withArgs('pendingMessages')
.resolves(data);
TestUtils.stubData('createOrUpdateItem').callsFake((item: StorageItem) => {
if (item.id === storageID) {
data = item;
}
});
// Utils Stubs
canSyncStub = sandbox.stub(SyncMessageUtils, 'canSync');
canSyncStub.returns(false);
sandbox.stub(UserUtil, 'getCurrentDevicePubKey').resolves(ourNumber);
sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(pairedDevices);
TestUtils.stubWindow('libsignal', {
SignalProtocolAddress: sandbox.stub(),
@ -99,15 +55,11 @@ describe('MessageQueue', () => {
// Message Sender Stubs
sendStub = sandbox.stub(MessageSender, 'send').resolves();
sendToOpenGroupStub = sandbox
.stub(MessageSender, 'sendToOpenGroup')
.resolves(true);
// Group Utils Stubs
sandbox.stub(GroupUtils, 'isMediumGroup').returns(false);
groupMembersStub = sandbox
.stub(GroupUtils, 'getGroupMembers' as any)
.resolves(TestUtils.generateFakePubKeys(10));
isMediumGroupStub = sandbox
.stub(GroupUtils, 'isMediumGroup')
.returns(false);
// Session Protocol Stubs
sandbox.stub(SessionProtocol, 'sendSessionRequest').resolves();
@ -116,37 +68,9 @@ describe('MessageQueue', () => {
.stub(SessionProtocol, 'sendSessionRequestIfNeeded')
.resolves();
// Pending Mesage Cache Stubs
const chatMessages = Array.from(
{ length: 10 },
TestUtils.generateChatMessage
);
const rawMessage = toRawMessage(
TestUtils.generateFakePubKey(),
TestUtils.generateChatMessage()
);
sandbox.stub(PendingMessageCache.prototype, 'add').resolves(rawMessage);
sandbox.stub(PendingMessageCache.prototype, 'remove').resolves();
sandbox
.stub(PendingMessageCache.prototype, 'getDevices')
.returns(TestUtils.generateFakePubKeys(10));
sandbox
.stub(PendingMessageCache.prototype, 'getForDevice')
.returns(
chatMessages.map(m => toRawMessage(TestUtils.generateFakePubKey(), m))
);
// Spies
sendSyncMessageSpy = sandbox.spy(MessageQueue.prototype, 'sendSyncMessage');
sendMessageToDevicesSpy = sandbox.spy(
MessageQueue.prototype,
'sendMessageToDevices'
);
sendToGroupSpy = sandbox.spy(MessageQueue.prototype, 'sendToGroup');
// Init Queue
messageQueueStub = new MessageQueue();
pendingMessageCache = new PendingMessageCacheStub();
messageQueueStub = new MessageQueue(pendingMessageCache);
});
afterEach(() => {
@ -154,233 +78,300 @@ describe('MessageQueue', () => {
sandbox.restore();
});
describe('send', () => {
it('can send to a single device', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
const promise = messageQueueStub.send(device, message);
await expect(promise).to.be.fulfilled;
});
it('can send sync message', async () => {
const devices = TestUtils.generateFakePubKeys(3);
const message = TestUtils.generateChatMessage();
const promise = messageQueueStub.sendSyncMessage(message, devices);
expect(promise).to.be.fulfilled;
});
});
describe('processPending', () => {
it('will send session request message if no session', async () => {
hasSessionStub.resolves(false);
isMediumGroupStub.resolves(false);
const device = TestUtils.generateFakePubKey();
const promise = messageQueueStub.processPending(device);
await expect(promise).to.be.fulfilled;
expect(sendSessionRequestIfNeededStub.callCount).to.equal(1);
const stubCallPromise = TestUtils.waitUntil(() => sendSessionRequestIfNeededStub.callCount === 1);
await messageQueueStub.processPending(device);
expect(stubCallPromise).to.be.fulfilled;
});
it('will send message if session exists', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.resolves(false);
sendStub.resolves();
const device = TestUtils.generateFakePubKey();
const hasSession = await hasSessionStub(device);
await pendingMessageCache.add(device, TestUtils.generateChatMessage());
const promise = messageQueueStub.processPending(device);
await expect(promise).to.be.fulfilled;
const successPromise = TestUtils.waitForTask(done => {
messageQueueStub.events.once('success', done);
});
expect(hasSession).to.equal(true, 'session does not exist');
expect(sendSessionRequestIfNeededStub.callCount).to.equal(0);
await messageQueueStub.processPending(device);
await expect(successPromise).to.be.fulfilled;
expect(sendSessionRequestIfNeededStub.called).to.equal(
false,
'Session request triggered when we have a session.'
);
});
});
describe('sendUsingMultiDevice', () => {
it('can send using multidevice', async () => {
it('will send message if sending to medium group', async () => {
isMediumGroupStub.resolves(true);
sendStub.resolves();
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, TestUtils.generateChatMessage());
const promise = messageQueueStub.sendUsingMultiDevice(device, message);
await expect(promise).to.be.fulfilled;
const successPromise = TestUtils.waitForTask(done => {
messageQueueStub.events.once('success', done);
});
// Ensure the arguments passed into sendMessageToDevices are correct
const previousArgs = sendMessageToDevicesSpy.lastCall.args as [
Array<PubKey>,
ChatMessage
];
await messageQueueStub.processPending(device);
await expect(successPromise).to.be.fulfilled;
expect(sendSessionRequestIfNeededStub.called).to.equal(
false,
'Session request triggered on medium group'
);
});
// Check that instances are equal
expect(previousArgs).to.have.length(2);
it('should remove message from cache', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.resolves(false);
const events = ['success', 'fail'];
for (const event of events) {
if (event === 'success') {
sendStub.resolves();
} else {
sendStub.throws(new Error('fail'));
}
const device = TestUtils.generateFakePubKey();
await pendingMessageCache.add(device, TestUtils.generateChatMessage());
const initialMessages = await pendingMessageCache.getForDevice(device);
expect(initialMessages).to.have.length(1);
await messageQueueStub.processPending(device);
const promise = TestUtils.waitUntil(async () => {
const messages = await pendingMessageCache.getForDevice(device);
return messages.length === 0;
});
expect(promise).to.be.fulfilled;
}
});
const argsPairedDevices = previousArgs[0];
const argsChatMessage = previousArgs[1];
describe('events', () => {
it('should send a success event if message was sent', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.resolves(false);
sendStub.resolves();
expect(argsChatMessage instanceof ChatMessage).to.equal(
true,
'message passed into sendMessageToDevices was not a valid ChatMessage'
);
expect(argsChatMessage.isEqual(message)).to.equal(
true,
'message passed into sendMessageToDevices has been mutated'
);
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, message);
argsPairedDevices.forEach((argsPaired: PubKey, index: number) => {
expect(argsPaired instanceof PubKey).to.equal(
true,
'a device passed into sendMessageToDevices was not a PubKey'
);
expect(argsPaired.isEqual(pairedDevices[index])).to.equal(
true,
'a device passed into sendMessageToDevices did not match MessageDeviceProtocol.getAllDevices'
);
const eventPromise = TestUtils.waitForTask<RawMessage | OpenGroupMessage>(complete => {
messageQueueStub.events.once('success', complete);
});
await messageQueueStub.processPending(device);
await expect(eventPromise).to.be.fulfilled;
const rawMessage = await eventPromise;
expect(rawMessage.identifier).to.equal(message.identifier);
});
it('should send a fail event if something went wrong while sending', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.resolves(false);
sendStub.throws(new Error('failure'));
const spy = sandbox.spy();
messageQueueStub.events.on('fail', spy);
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, message);
const eventPromise = TestUtils.waitForTask<[RawMessage | OpenGroupMessage, Error]>(complete => {
messageQueueStub.events.once('fail', (...args) => {
complete(args);
});
});
await messageQueueStub.processPending(device);
await expect(eventPromise).to.be.fulfilled;
const [rawMessage, error] = await eventPromise;
expect(rawMessage.identifier).to.equal(message.identifier);
expect(error.message).to.equal('failure');
});
});
});
describe('sendUsingMultiDevice', () => {
it('should send the message to all the devices', async () => {
const devices = TestUtils.generateFakePubKeys(3);
sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(devices);
const stub = sandbox
.stub(messageQueueStub, 'sendMessageToDevices')
.resolves();
const message = TestUtils.generateChatMessage();
await messageQueueStub.sendUsingMultiDevice(devices[0], message);
const args = stub.lastCall.args as [Array<PubKey>, ContentMessage];
expect(args[0]).to.have.same.members(devices);
expect(args[1]).to.equal(message);
});
});
describe('sendMessageToDevices', () => {
it('can send to many devices', async () => {
const devices = TestUtils.generateFakePubKeys(10);
hasSessionStub.resolves(false);
const devices = TestUtils.generateFakePubKeys(5);
const message = TestUtils.generateChatMessage();
const promise = messageQueueStub.sendMessageToDevices(devices, message);
await messageQueueStub.sendMessageToDevices(devices, message);
const promise = TestUtils.waitUntil(() => pendingMessageCache.getCache().length === devices.length);
await expect(promise).to.be.fulfilled;
});
it('can send sync message and confirm canSync is valid', async () => {
canSyncStub.returns(true);
it('should send sync message if possible', async () => {
hasSessionStub.returns(false);
const devices = TestUtils.generateFakePubKeys(3);
const message = TestUtils.generateChatMessage();
const pairedDeviceKeys = pairedDevices.map(device => device.key);
sandbox.stub(SyncMessageUtils, 'canSync').returns(true);
const promise = messageQueueStub.sendMessageToDevices(devices, message);
await expect(promise).to.be.fulfilled;
sandbox
.stub(SyncMessageUtils, 'from')
.returns(new TestSyncMessage({ timestamp: Date.now() }));
// Check sendSyncMessage parameters
const previousArgs = sendSyncMessageSpy.lastCall.args as [
ChatMessage,
Array<PubKey>
];
expect(sendSyncMessageSpy.callCount).to.equal(1);
// This stub ensures that the message won't process
const sendSyncMessageStub = sandbox
.stub(messageQueueStub, 'sendSyncMessage')
.resolves();
// Check that instances are equal
expect(previousArgs).to.have.length(2);
const ourDevices = [ourDevice, ...TestUtils.generateFakePubKeys(2)];
sandbox
.stub(MultiDeviceProtocol, 'getAllDevices')
.callsFake(async user => {
if (ourDevice.isEqual(user)) {
return ourDevices;
}
const argsChatMessage = previousArgs[0];
const argsPairedKeys = [...previousArgs[1]].map(d => d.key);
return [];
});
expect(argsChatMessage instanceof ChatMessage).to.equal(
true,
'message passed into sendMessageToDevices was not a valid ChatMessage'
);
expect(argsChatMessage.isEqual(message)).to.equal(
true,
'message passed into sendMessageToDevices has been mutated'
);
const devices = [...ourDevices, ...TestUtils.generateFakePubKeys(3)];
const message = TestUtils.generateChatMessage();
// argsPairedKeys and pairedDeviceKeys should contain the same values
const keyArgsValid = _.isEmpty(_.xor(argsPairedKeys, pairedDeviceKeys));
expect(keyArgsValid).to.equal(
await messageQueueStub.sendMessageToDevices(devices, message);
expect(sendSyncMessageStub.called).to.equal(
true,
'devices passed into sendSyncMessage were invalid'
'sendSyncMessage was not called.'
);
expect(pendingMessageCache.getCache().map(c => c.device)).to.not.have.members(ourDevices.map(d => d.key), 'Sending regular messages to our own device is not allowed.');
expect(pendingMessageCache.getCache()).to.have.length(
devices.length - ourDevices.length,
'Messages should not be sent to our devices.'
);
});
});
describe('sendToGroup', () => {
it('can send to closed group', async () => {
const message = TestUtils.generateClosedGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
expect(success).to.equal(true, 'sending to group failed');
});
it('uses correct parameters for sendToGroup with ClosedGroupMessage', async () => {
const message = TestUtils.generateClosedGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
expect(success).to.equal(true, 'sending to group failed');
describe('sendSyncMessage', () => {
it('should send a message to all our devices', async () => {
hasSessionStub.resolves(false);
// Check parameters
const previousArgs = sendMessageToDevicesSpy.lastCall.args as [
Array<PubKey>,
ClosedGroupMessage
];
expect(previousArgs).to.have.length(2);
const ourOtherDevices = TestUtils.generateFakePubKeys(2);
const ourDevices = [ourDevice, ...ourOtherDevices];
sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(ourDevices);
const argsClosedGroupMessage = previousArgs[1];
expect(argsClosedGroupMessage instanceof ClosedGroupMessage).to.equal(
true,
'message passed into sendMessageToDevices was not a ClosedGroupMessage'
await messageQueueStub.sendSyncMessage(
new TestSyncMessage({ timestamp: Date.now() })
);
});
it("won't send to invalid groupId", async () => {
const message = TestUtils.generateClosedGroupMessage('invalid-group-id');
const success = await messageQueueStub.sendToGroup(message);
expect(pendingMessageCache.getCache()).to.have.length(ourOtherDevices.length);
expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members(ourOtherDevices.map(d => d.key));
});
});
// Ensure message parameter passed into sendToGroup is as expected
expect(success).to.equal(
false,
'an invalid groupId was treated as valid'
);
expect(sendToGroupSpy.callCount).to.equal(1);
describe('sendToGroup', () => {
describe('closed groups', async () => {
it('can send to closed group', async () => {
const members = TestUtils.generateFakePubKeys(4).map(
p => new PrimaryPubKey(p.key)
);
sandbox.stub(GroupUtils, 'getGroupMembers').resolves(members);
const argsMessage = sendToGroupSpy.lastCall.args[0];
expect(argsMessage instanceof ClosedGroupMessage).to.equal(
true,
'message passed into sendToGroup was not a ClosedGroupMessage'
);
expect(success).to.equal(
false,
'invalid ClosedGroupMessage was propogated through sendToGroup'
);
});
const sendUsingMultiDeviceStub = sandbox
.stub(messageQueueStub, 'sendUsingMultiDevice')
.resolves();
it('wont send message to empty closed group', async () => {
groupMembersStub.resolves(TestUtils.generateFakePubKeys(0));
const message = TestUtils.generateClosedGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
expect(success).to.equal(true, 'sending to group failed');
expect(sendUsingMultiDeviceStub.callCount).to.equal(members.length);
const message = TestUtils.generateClosedGroupMessage();
const response = await messageQueueStub.sendToGroup(message);
const arg = sendUsingMultiDeviceStub.getCall(0).args;
expect(arg[1] instanceof ClosedGroupMessage).to.equal(
true,
'message sent to group member was not a ClosedGroupMessage'
);
});
expect(response).to.equal(
false,
'sendToGroup send a message to an empty group'
);
});
it('wont send message to empty closed group', async () => {
sandbox.stub(GroupUtils, 'getGroupMembers').resolves([]);
const sendUsingMultiDeviceStub = sandbox
.stub(messageQueueStub, 'sendUsingMultiDevice')
.resolves();
it('can send to open group', async () => {
const message = TestUtils.generateOpenGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
const message = TestUtils.generateClosedGroupMessage();
const response = await messageQueueStub.sendToGroup(message);
expect(success).to.equal(true, 'sending to group failed');
expect(response).to.equal(
false,
'sendToGroup sent a message to an empty group'
);
expect(sendUsingMultiDeviceStub.callCount).to.equal(0);
});
});
});
describe('events', () => {
it('can send events on message sending success', async () => {
const successSpy = sandbox.spy();
messageQueueStub.events.on('success', successSpy);
const device = TestUtils.generateFakePubKey();
const promise = messageQueueStub.processPending(device);
await expect(promise).to.be.fulfilled;
describe('open groups', async () => {
let sendToOpenGroupStub: sinon.SinonStub<
[OpenGroupMessage],
Promise<boolean>
>;
beforeEach(() => {
sendToOpenGroupStub = sandbox
.stub(MessageSender, 'sendToOpenGroup')
.resolves(true);
});
await tick();
expect(successSpy.callCount).to.equal(1);
});
it('can send to open group', async () => {
const message = TestUtils.generateOpenGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
expect(sendToOpenGroupStub.callCount).to.equal(1);
expect(success).to.equal(true, 'Sending to open group failed');
});
it('can send events on message sending failure', async () => {
sendStub.throws(new Error('Failed to send message.'));
it('should emit a success event when send was successful', async () => {
const message = TestUtils.generateOpenGroupMessage();
const eventPromise = TestUtils.waitForTask(complete => {
messageQueueStub.events.once('success', complete);
}, 2000);
const failureSpy = sandbox.spy();
messageQueueStub.events.on('fail', failureSpy);
await messageQueueStub.sendToGroup(message);
await expect(eventPromise).to.be.fulfilled;
});
const device = TestUtils.generateFakePubKey();
const promise = messageQueueStub.processPending(device);
await expect(promise).to.be.fulfilled;
it('should emit a fail event if something went wrong', async () => {
sendToOpenGroupStub.resolves(false);
const message = TestUtils.generateOpenGroupMessage();
const eventPromise = TestUtils.waitForTask(complete => {
messageQueueStub.events.once('fail', complete);
}, 2000);
await tick();
expect(failureSpy.callCount).to.equal(1);
await messageQueueStub.sendToGroup(message);
await expect(eventPromise).to.be.fulfilled;
});
});
});
});

@ -1,8 +1,9 @@
import { expect } from 'chai';
import * as _ from 'lodash';
import { MessageUtils } from '../../../session/utils';
import { TestUtils } from '../../../test/test-utils';
import { TestUtils, timeout } from '../../../test/test-utils';
import { PendingMessageCache } from '../../../session/sending/PendingMessageCache';
import { initial } from 'lodash';
// Equivalent to Data.StorageItem
interface StorageItem {
@ -36,7 +37,6 @@ describe('PendingMessageCache', () => {
});
pendingMessageCacheStub = new PendingMessageCache();
await pendingMessageCacheStub.isReady;
});
afterEach(() => {
@ -44,7 +44,7 @@ describe('PendingMessageCache', () => {
});
it('can initialize cache', async () => {
const cache = pendingMessageCacheStub.getAllPending();
const cache = await pendingMessageCacheStub.getAllPending();
// We expect the cache to initialise as an empty array
expect(cache).to.be.instanceOf(Array);
@ -59,7 +59,7 @@ describe('PendingMessageCache', () => {
await pendingMessageCacheStub.add(device, message);
// Verify that the message is in the cache
const finalCache = pendingMessageCacheStub.getAllPending();
const finalCache = await pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(1);
@ -68,6 +68,22 @@ describe('PendingMessageCache', () => {
expect(addedMessage.timestamp).to.deep.equal(rawMessage.timestamp);
});
it('can add multiple messages belonging to the same user', async () => {
const device = TestUtils.generateFakePubKey();
await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage());
// We have to timeout here otherwise it's processed too fast and messages start having the same timestamp
await timeout(5);
await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage());
await timeout(5);
await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage());
// Verify that the message is in the cache
const finalCache = await pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(3);
});
it('can remove from cache', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
@ -75,18 +91,43 @@ describe('PendingMessageCache', () => {
await pendingMessageCacheStub.add(device, message);
const initialCache = pendingMessageCacheStub.getAllPending();
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(1);
// Remove the message
await pendingMessageCacheStub.remove(rawMessage);
const finalCache = pendingMessageCacheStub.getAllPending();
const finalCache = await pendingMessageCacheStub.getAllPending();
// Verify that the message was removed
expect(finalCache).to.have.length(0);
});
it('should only remove messages with different timestamp and device', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
const rawMessage = MessageUtils.toRawMessage(device, message);
await pendingMessageCacheStub.add(device, message);
await timeout(5);
await pendingMessageCacheStub.add(
device,
TestUtils.generateChatMessage(message.identifier)
);
await pendingMessageCacheStub.add(TestUtils.generateFakePubKey(), message);
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(3);
// Remove the message
await pendingMessageCacheStub.remove(rawMessage);
const finalCache = await pendingMessageCacheStub.getAllPending();
// Verify that the message was removed
expect(finalCache).to.have.length(2);
});
it('can get devices', async () => {
const cacheItems = [
{
@ -103,16 +144,16 @@ describe('PendingMessageCache', () => {
},
];
cacheItems.forEach(async item => {
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
});
}
const cache = pendingMessageCacheStub.getAllPending();
const cache = await pendingMessageCacheStub.getAllPending();
expect(cache).to.have.length(cacheItems.length);
// Get list of devices
const devicesKeys = cacheItems.map(item => item.device.key);
const pulledDevices = pendingMessageCacheStub.getDevices();
const pulledDevices = await pendingMessageCacheStub.getDevices();
const pulledDevicesKeys = pulledDevices.map(d => d.key);
// Verify that device list from cache is equivalent to devices added
@ -131,21 +172,21 @@ describe('PendingMessageCache', () => {
},
];
cacheItems.forEach(async item => {
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
});
}
const initialCache = pendingMessageCacheStub.getAllPending();
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(cacheItems.length);
// Get pending for each specific device
cacheItems.forEach(item => {
const pendingForDevice = pendingMessageCacheStub.getForDevice(
for (const item of cacheItems) {
const pendingForDevice = await pendingMessageCacheStub.getForDevice(
item.device
);
expect(pendingForDevice).to.have.length(1);
expect(pendingForDevice[0].device).to.equal(item.device.key);
});
}
});
it('can find nothing when empty', async () => {
@ -164,7 +205,7 @@ describe('PendingMessageCache', () => {
await pendingMessageCacheStub.add(device, message);
const finalCache = pendingMessageCacheStub.getAllPending();
const finalCache = await pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(1);
const foundMessage = pendingMessageCacheStub.find(rawMessage);
@ -188,17 +229,17 @@ describe('PendingMessageCache', () => {
},
];
cacheItems.forEach(async item => {
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
});
}
const initialCache = pendingMessageCacheStub.getAllPending();
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(cacheItems.length);
// Clear cache
await pendingMessageCacheStub.clear();
const finalCache = pendingMessageCacheStub.getAllPending();
const finalCache = await pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(0);
});
@ -218,21 +259,20 @@ describe('PendingMessageCache', () => {
},
];
cacheItems.forEach(async item => {
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
});
}
const addedMessages = pendingMessageCacheStub.getAllPending();
const addedMessages = await pendingMessageCacheStub.getAllPending();
expect(addedMessages).to.have.length(cacheItems.length);
// Rebuild from DB
const freshCache = new PendingMessageCache();
await freshCache.isReady;
// Verify messages
const rebuiltMessages = freshCache.getAllPending();
rebuiltMessages.forEach((message, index) => {
const rebuiltMessages = await freshCache.getAllPending();
// tslint:disable-next-line: no-for-in no-for-in-array
for (const [index, message] of rebuiltMessages.entries()) {
const addedMessage = addedMessages[index];
// Pull out plainTextBuffer for a separate check
@ -254,6 +294,6 @@ describe('PendingMessageCache', () => {
true,
'cached messages were not rebuilt properly'
);
});
}
});
});

@ -1 +1,2 @@
export * from './ciphers';
export * from './sending';

@ -0,0 +1,7 @@
import { SyncMessage } from '../../../../session/messages/outgoing';
import { SignalService } from '../../../../protobuf';
export class TestSyncMessage extends SyncMessage {
protected syncProto(): SignalService.SyncMessage {
return SignalService.SyncMessage.create({});
}
}

@ -0,0 +1,22 @@
import { PendingMessageCache } from '../../../../session/sending';
import { RawMessage } from '../../../../session/types';
export class PendingMessageCacheStub extends PendingMessageCache {
public dbData: Array<RawMessage>;
constructor(dbData: Array<RawMessage> = []) {
super();
this.dbData = dbData;
}
public getCache(): Readonly<Array<RawMessage>> {
return this.cache;
}
protected async getFromStorage() {
return this.dbData;
}
protected async saveToDB() {
return;
}
}

@ -0,0 +1 @@
export * from './PendingMessageCacheStub';

@ -85,10 +85,10 @@ export function generateFakePubKeys(amount: number): Array<PubKey> {
return new Array(numPubKeys).fill(0).map(() => generateFakePubKey());
}
export function generateChatMessage(): ChatMessage {
export function generateChatMessage(identifier?: string): ChatMessage {
return new ChatMessage({
body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit',
identifier: uuid(),
identifier: identifier ?? uuid(),
timestamp: Date.now(),
attachments: undefined,
quote: undefined,
@ -124,3 +124,83 @@ export function generateClosedGroupMessage(
chatMessage: generateChatMessage(),
});
}
type ArgFunction<T> = (arg: T) => void;
type MaybePromise<T> = Promise<T> | T;
/**
* Create a promise which waits until `done` is called or until timeout period is reached.
* @param task The task to wait for.
* @param timeout The timeout period.
*/
// tslint:disable-next-line: no-shadowed-variable
export async function waitForTask<T>(task: (done: ArgFunction<T>) => MaybePromise<void>, timeout: number = 2000): Promise<T> {
const timeoutPromise = new Promise<T>((_, rej) => {
const wait = setTimeout(() => {
clearTimeout(wait);
rej(new Error('Task timed out.'));
}, timeout);
});
// tslint:disable-next-line: no-shadowed-variable
const taskPromise = new Promise(async (res, rej) => {
try {
const taskReturn = task(res);
return taskReturn instanceof Promise ? taskReturn : Promise.resolve(taskReturn);
} catch (e) {
rej(e);
}
});
return Promise.race([timeoutPromise, taskPromise]) as Promise<T>;
}
/**
* Creates a promise which periodically calls the `check` until `done` is called or until timeout period is reached.
* @param check The check which runs every 100ms.
* @param timeout The time before an error is thrown.
*/
// tslint:disable-next-line: no-shadowed-variable
export async function periodicallyCheck(check: (done: ArgFunction<void>) => MaybePromise<void>, timeout: number = 1000): Promise<void> {
return waitForTask(complete => {
let interval: NodeJS.Timeout | undefined;
const cleanup = () => {
if (interval) {
clearInterval(interval);
interval = undefined;
}
};
setTimeout(cleanup, timeout);
const onDone = () => {
complete();
cleanup();
};
interval = setInterval(async () => {
try {
await toPromise(check(onDone));
} catch (e) {
cleanup();
throw e;
}
}, 100);
}, timeout);
}
/**
* Creates a promise which waits until `check` returns `true` or rejects if timeout preiod is reached.
* @param check The boolean check.
* @param timeout The time before an error is thrown.
*/
export async function waitUntil(check: () => MaybePromise<boolean>, timeout: number = 2000) {
return periodicallyCheck(async done => {
const result = await toPromise(check());
if (result) {
done();
}
}, timeout);
}
async function toPromise<T>(maybePromise: MaybePromise<T>): Promise<T> {
return maybePromise instanceof Promise ? maybePromise : Promise.resolve(maybePromise);
}

Loading…
Cancel
Save