Change how sync messages are handled

pull/1203/head
Mikunj 5 years ago
parent 56ee2cd843
commit a972c328c7

@ -1856,10 +1856,7 @@
const groupUpdateMessage = new libsession.Messages.Outgoing.ClosedGroupUpdateMessage( const groupUpdateMessage = new libsession.Messages.Outgoing.ClosedGroupUpdateMessage(
updateParams updateParams
); );
libsession await this.sendClosedGroupMessage(groupUpdateMessage);
.getMessageQueue()
.sendToGroup(groupUpdateMessage)
.catch(log.error);
}, },
sendGroupInfo(recipient) { sendGroupInfo(recipient) {
@ -1927,12 +1924,43 @@
quitGroup quitGroup
); );
await libsession.getMessageQueue().sendToGroup(quitGroupMessage); await this.sendClosedGroupMessage(quitGroupMessage);
this.updateTextInputState(); this.updateTextInputState();
} }
}, },
async sendClosedGroupMessage(message) {
const { ClosedGroupMessage, ClosedGroupChatMessage } = libsession.Messages.Outgoing;
if (
!(message instanceof ClosedGroupMessage)
) {
throw new Error('Invalid closed group message.');
}
// Sync messages for Chat Messages need to be constructed after confirming send was successful.
if (
message instanceof ClosedGroupChatMessage
) {
throw new Error(
'ClosedGroupChatMessage should be constructed manually and sent'
);
}
try {
await libsession.getMessageQueue().sendToGroup(message);
const syncMessage = libsession.Utils.SyncMessageUtils.fromClosedGroupMessage(
message
);
if (syncMessage) {
await libsession.getMessageQueue().sendSyncMessage(syncMessage);
}
} catch (e) {
window.log.error(e);
}
},
async markRead(newestUnreadDate, providedOptions) { async markRead(newestUnreadDate, providedOptions) {
const options = providedOptions || {}; const options = providedOptions || {};
_.defaults(options, { sendReadReceipts: true }); _.defaults(options, { sendReadReceipts: true });

@ -1,8 +1,8 @@
import { MessageQueue } from './sending/'; import { MessageQueue, MessageQueueInterface } from './sending/';
let messageQueue: MessageQueue; let messageQueue: MessageQueue;
function getMessageQueue() { function getMessageQueue(): MessageQueueInterface {
if (!messageQueue) { if (!messageQueue) {
messageQueue = new MessageQueue(); messageQueue = new MessageQueue();
} }

@ -6,7 +6,7 @@ interface ReceiptMessageParams extends MessageParams {
timestamps: Array<number>; timestamps: Array<number>;
} }
export abstract class ReceiptMessage extends ContentMessage { export abstract class ReceiptMessage extends ContentMessage {
private readonly timestamps: Array<number>; public readonly timestamps: Array<number>;
constructor({ timestamp, identifier, timestamps }: ReceiptMessageParams) { constructor({ timestamp, identifier, timestamps }: ReceiptMessageParams) {
super({ timestamp, identifier }); super({ timestamp, identifier });

@ -4,15 +4,15 @@ import { MessageParams } from '../../Message';
import { PubKey } from '../../../../types'; import { PubKey } from '../../../../types';
interface SentSyncMessageParams extends MessageParams { interface SentSyncMessageParams extends MessageParams {
dataMessage: SignalService.DataMessage; dataMessage: SignalService.IDataMessage;
expirationStartTimestamp?: number; expirationStartTimestamp?: number;
sentTo?: Array<PubKey>; sentTo?: Array<PubKey>;
unidentifiedDeliveries?: Array<PubKey>; unidentifiedDeliveries?: Array<PubKey>;
destination?: PubKey; destination?: PubKey;
} }
export abstract class SentSyncMessage extends SyncMessage { export class SentSyncMessage extends SyncMessage {
public readonly dataMessage: SignalService.DataMessage; public readonly dataMessage: SignalService.IDataMessage;
public readonly expirationStartTimestamp?: number; public readonly expirationStartTimestamp?: number;
public readonly sentTo?: Array<PubKey>; public readonly sentTo?: Array<PubKey>;
public readonly unidentifiedDeliveries?: Array<PubKey>; public readonly unidentifiedDeliveries?: Array<PubKey>;

@ -1,13 +1,14 @@
import _ from 'lodash';
import { SyncMessage } from './SyncMessage'; import { SyncMessage } from './SyncMessage';
import { SignalService } from '../../../../../protobuf'; import { SignalService } from '../../../../../protobuf';
import { MessageParams } from '../../Message'; import { MessageParams } from '../../Message';
interface SyncReadMessageParams extends MessageParams { interface SyncReadMessageParams extends MessageParams {
readMessages: any; readMessages: Array<{ sender: string; timestamp: number }>;
} }
export abstract class SyncReadMessage extends SyncMessage { export class SyncReadMessage extends SyncMessage {
public readonly readMessages: any; public readonly readMessages: Array<{ sender: string; timestamp: number }>;
constructor(params: SyncReadMessageParams) { constructor(params: SyncReadMessageParams) {
super({ timestamp: params.timestamp, identifier: params.identifier }); super({ timestamp: params.timestamp, identifier: params.identifier });
@ -19,7 +20,7 @@ export abstract class SyncReadMessage extends SyncMessage {
syncMessage.read = []; syncMessage.read = [];
for (const read of this.readMessages) { for (const read of this.readMessages) {
const readMessage = new SignalService.SyncMessage.Read(); const readMessage = new SignalService.SyncMessage.Read();
read.timestamp = readMessage.timestamp; read.timestamp = _.toNumber(readMessage.timestamp);
read.sender = readMessage.sender; read.sender = readMessage.sender;
syncMessage.read.push(readMessage); syncMessage.read.push(readMessage);
} }

@ -13,12 +13,7 @@ import {
TypingMessage, TypingMessage,
} from '../messages/outgoing'; } from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache'; import { PendingMessageCache } from './PendingMessageCache';
import { import { GroupUtils, JobQueue, TypedEventEmitter } from '../utils';
GroupUtils,
JobQueue,
SyncMessageUtils,
TypedEventEmitter,
} from '../utils';
import { PubKey } from '../types'; import { PubKey } from '../types';
import { MessageSender } from '.'; import { MessageSender } from '.';
import { MultiDeviceProtocol, SessionProtocol } from '../protocols'; import { MultiDeviceProtocol, SessionProtocol } from '../protocols';
@ -39,44 +34,20 @@ export class MessageQueue implements MessageQueueInterface {
user: PubKey, user: PubKey,
message: ContentMessage message: ContentMessage
): Promise<void> { ): Promise<void> {
const userDevices = await MultiDeviceProtocol.getAllDevices(user.key); if (message instanceof SyncMessage) {
return this.sendSyncMessage(message);
}
const userDevices = await MultiDeviceProtocol.getAllDevices(user.key);
await this.sendMessageToDevices(userDevices, message); await this.sendMessageToDevices(userDevices, message);
} }
public async send(device: PubKey, message: ContentMessage): Promise<void> { public async send(device: PubKey, message: ContentMessage): Promise<void> {
await this.sendMessageToDevices([device], message); if (message instanceof SyncMessage) {
} return this.sendSyncMessage(message);
public async sendMessageToDevices(
devices: Array<PubKey>,
message: ContentMessage
) {
let currentDevices = [...devices];
// Sync to our devices if syncable
if (SyncMessageUtils.canSync(message)) {
const syncMessage = SyncMessageUtils.from(message);
if (!syncMessage) {
throw new Error(
'MessageQueue internal error occured: failed to make sync message'
);
}
await this.sendSyncMessage(syncMessage);
const ourDevices = await MultiDeviceProtocol.getOurDevices();
// Remove our devices from currentDevices
currentDevices = currentDevices.filter(
device => !ourDevices.some(d => device.isEqual(d))
);
} }
const promises = currentDevices.map(async device => { await this.sendMessageToDevices([device], message);
await this.process(device, message);
});
return Promise.all(promises);
} }
public async sendToGroup( public async sendToGroup(
@ -120,7 +91,16 @@ export class MessageQueue implements MessageQueueInterface {
} }
// Get devices in group // Get devices in group
const recipients = await GroupUtils.getGroupMembers(groupId); let recipients = await GroupUtils.getGroupMembers(groupId);
// Don't send to our own device as they'll likely be synced across.
const ourKey = await UserUtil.getCurrentDevicePubKey();
if (!ourKey) {
throw new Error('Cannot get current user public key');
}
const ourPrimary = await MultiDeviceProtocol.getPrimaryDevice(ourKey);
recipients = recipients.filter(member => !ourPrimary.isEqual(member));
if (recipients.length === 0) { if (recipients.length === 0) {
return; return;
} }
@ -133,16 +113,15 @@ export class MessageQueue implements MessageQueueInterface {
); );
} }
public async sendSyncMessage(message: SyncMessage | undefined): Promise<any> { public async sendSyncMessage(
message: SyncMessage | undefined
): Promise<void> {
if (!message) { if (!message) {
return; return;
} }
const ourDevices = await MultiDeviceProtocol.getOurDevices(); const ourDevices = await MultiDeviceProtocol.getOurDevices();
const promises = ourDevices.map(async device => await this.sendMessageToDevices(ourDevices, message);
this.process(device, message)
);
return Promise.all(promises);
} }
public async processPending(device: PubKey) { public async processPending(device: PubKey) {
@ -179,6 +158,17 @@ export class MessageQueue implements MessageQueueInterface {
}); });
} }
public async sendMessageToDevices(
devices: Array<PubKey>,
message: ContentMessage
) {
const promises = devices.map(async device => {
await this.process(device, message);
});
return Promise.all(promises);
}
private async processAllPending() { private async processAllPending() {
const devices = await this.pendingMessageCache.getDevices(); const devices = await this.pendingMessageCache.getDevices();
const promises = devices.map(async device => this.processPending(device)); const promises = devices.map(async device => this.processPending(device));

@ -20,5 +20,5 @@ export interface MessageQueueInterface {
sendUsingMultiDevice(user: PubKey, message: ContentMessage): Promise<void>; sendUsingMultiDevice(user: PubKey, message: ContentMessage): Promise<void>;
send(device: PubKey, message: ContentMessage): Promise<void>; send(device: PubKey, message: ContentMessage): Promise<void>;
sendToGroup(message: GroupMessageType): Promise<void>; sendToGroup(message: GroupMessageType): Promise<void>;
sendSyncMessage(message: SyncMessage | undefined): Promise<any>; sendSyncMessage(message: SyncMessage | undefined): Promise<void>;
} }

@ -1,25 +1,72 @@
import * as _ from 'lodash'; import * as _ from 'lodash';
import { UserUtil } from '../../util/'; import { UserUtil } from '../../util/';
import { getAllConversations } from '../../../js/modules/data'; import { getAllConversations } from '../../../js/modules/data';
import { ContentMessage, SyncMessage } from '../messages/outgoing'; import {
ClosedGroupChatMessage,
ClosedGroupMessage,
ClosedGroupRequestInfoMessage,
ContentMessage,
ReadReceiptMessage,
SentSyncMessage,
SyncMessage,
SyncReadMessage,
} from '../messages/outgoing';
import { MultiDeviceProtocol } from '../protocols'; import { MultiDeviceProtocol } from '../protocols';
import ByteBuffer from 'bytebuffer'; import ByteBuffer from 'bytebuffer';
import { PubKey } from '../types';
import { SignalService } from '../../protobuf';
export function from(message: ContentMessage): SyncMessage | undefined { export function from(
message: ContentMessage,
destination: string | PubKey
): SyncMessage | undefined {
if (message instanceof SyncMessage) { if (message instanceof SyncMessage) {
return message; return message;
} }
// Stubbed for now if (message instanceof ClosedGroupMessage) {
return fromClosedGroupMessage(message);
}
if (message instanceof ReadReceiptMessage) {
const pubKey = PubKey.cast(destination);
const read = message.timestamps.map(timestamp => ({
sender: pubKey.key,
timestamp,
}));
return new SyncReadMessage({
timestamp: Date.now(),
readMessages: read,
});
}
return undefined; return undefined;
} }
export function canSync(message: ContentMessage): boolean { export function fromClosedGroupMessage(
// This function should be agnostic to the device; it shouldn't need message: ClosedGroupMessage
// to know about the recipient ): SyncMessage | undefined {
// Sync messages for ClosedGroupChatMessage need to be built manually
// This is because it needs the `expireStartTimestamp` field.
if (
message instanceof ClosedGroupRequestInfoMessage ||
message instanceof ClosedGroupChatMessage
) {
return undefined;
}
const pubKey = PubKey.cast(message.groupId);
const content = SignalService.Content.decode(message.plainTextBuffer());
if (!content.dataMessage) {
return undefined;
}
// Stubbed for now return new SentSyncMessage({
return Boolean(from(message)); timestamp: message.timestamp,
destination: pubKey,
dataMessage: content.dataMessage,
});
} }
export async function getSyncContacts(): Promise<Array<any> | undefined> { export async function getSyncContacts(): Promise<Array<any> | undefined> {

@ -231,6 +231,18 @@ describe('MessageQueue', () => {
expect(args[0]).to.have.same.members(devices); expect(args[0]).to.have.same.members(devices);
expect(args[1]).to.equal(message); expect(args[1]).to.equal(message);
}); });
it('should send sync message if it was passed in', async () => {
const devices = TestUtils.generateFakePubKeys(3);
sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(devices);
const stub = sandbox.stub(messageQueueStub, 'sendSyncMessage').resolves();
const message = new TestSyncMessage({ timestamp: Date.now() });
await messageQueueStub.sendUsingMultiDevice(devices[0], message);
const args = stub.lastCall.args as [ContentMessage];
expect(args[0]).to.equal(message);
});
}); });
describe('sendMessageToDevices', () => { describe('sendMessageToDevices', () => {
@ -243,51 +255,6 @@ describe('MessageQueue', () => {
await messageQueueStub.sendMessageToDevices(devices, message); await messageQueueStub.sendMessageToDevices(devices, message);
expect(pendingMessageCache.getCache()).to.have.length(devices.length); expect(pendingMessageCache.getCache()).to.have.length(devices.length);
}); });
it('should send sync message if possible', async () => {
hasSessionStub.returns(false);
sandbox.stub(SyncMessageUtils, 'canSync').returns(true);
sandbox
.stub(SyncMessageUtils, 'from')
.returns(new TestSyncMessage({ timestamp: Date.now() }));
// This stub ensures that the message won't process
const sendSyncMessageStub = sandbox
.stub(messageQueueStub, 'sendSyncMessage')
.resolves();
const ourDevices = [ourDevice, ...TestUtils.generateFakePubKeys(2)];
sandbox
.stub(MultiDeviceProtocol, 'getAllDevices')
.callsFake(async user => {
if (ourDevice.isEqual(user)) {
return ourDevices;
}
return [];
});
const devices = [...ourDevices, ...TestUtils.generateFakePubKeys(3)];
const message = TestUtils.generateChatMessage();
await messageQueueStub.sendMessageToDevices(devices, message);
expect(sendSyncMessageStub.called).to.equal(
true,
'sendSyncMessage was not called.'
);
expect(
pendingMessageCache.getCache().map(c => c.device)
).to.not.have.members(
ourDevices.map(d => d.key),
'Sending regular messages to our own device is not allowed.'
);
expect(pendingMessageCache.getCache()).to.have.length(
devices.length - ourDevices.length,
'Messages should not be sent to our devices.'
);
});
}); });
describe('sendSyncMessage', () => { describe('sendSyncMessage', () => {
@ -320,6 +287,12 @@ describe('MessageQueue', () => {
}); });
describe('closed groups', async () => { describe('closed groups', async () => {
beforeEach(() => {
sandbox
.stub(MultiDeviceProtocol, 'getPrimaryDevice')
.resolves(new PrimaryPubKey(ourNumber));
});
it('can send to closed group', async () => { it('can send to closed group', async () => {
const members = TestUtils.generateFakePubKeys(4).map( const members = TestUtils.generateFakePubKeys(4).map(
p => new PrimaryPubKey(p.key) p => new PrimaryPubKey(p.key)
@ -351,6 +324,19 @@ describe('MessageQueue', () => {
await messageQueueStub.sendToGroup(message); await messageQueueStub.sendToGroup(message);
expect(sendUsingMultiDeviceStub.callCount).to.equal(0); expect(sendUsingMultiDeviceStub.callCount).to.equal(0);
}); });
it('wont send message to our device', async () => {
sandbox
.stub(GroupUtils, 'getGroupMembers')
.resolves([new PrimaryPubKey(ourNumber)]);
const sendUsingMultiDeviceStub = sandbox
.stub(messageQueueStub, 'sendUsingMultiDevice')
.resolves();
const message = TestUtils.generateClosedGroupMessage();
await messageQueueStub.sendToGroup(message);
expect(sendUsingMultiDeviceStub.callCount).to.equal(0);
});
}); });
describe('open groups', async () => { describe('open groups', async () => {

Loading…
Cancel
Save