pull/1157/head
Vincent 5 years ago
parent c3f3f30911
commit a5cfbd584b

@ -12,7 +12,6 @@ import {
import { PendingMessageCache } from './PendingMessageCache'; import { PendingMessageCache } from './PendingMessageCache';
import { JobQueue, TypedEventEmitter } from '../utils'; import { JobQueue, TypedEventEmitter } from '../utils';
// Used for ExampleMessage // Used for ExampleMessage
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
import { SignalService } from '../../protobuf'; import { SignalService } from '../../protobuf';
@ -59,18 +58,15 @@ export class MessageQueue implements MessageQueueInterface {
const userDevices = [...pairedDevices, user]; const userDevices = [...pairedDevices, user];
console.log('[vince] userDevices:', userDevices); console.log('[vince] userDevices:', userDevices);
} }
public send(device: string, message: ContentMessage) { public send(device: string, message: ContentMessage) {
// throw new Error('Method not implemented.'); // throw new Error('Method not implemented.');
// Validation; early exists? // Validation; early exists?
// TESTING // TESTING
console.log(`[vince] send: Queueing message`, message); console.log(`[vince] send: Queueing message`, message);
this.queue(device, message); this.queue(device, message);
} }
public sendToGroup(message: ContentMessage | OpenGroupMessage) { public sendToGroup(message: ContentMessage | OpenGroupMessage) {
throw new Error('Method not implemented.'); throw new Error('Method not implemented.');
@ -82,38 +78,27 @@ export class MessageQueue implements MessageQueueInterface {
// PSEDUOCODE // PSEDUOCODE
// if message is undefined // if message is undefined
// returnt // returnt
// for each of our device excluding current device: // for each of our device excluding current device:
// queue(device, syncMessage) // queue(device, syncMessage)
// throw new Error('Method not implemented.'); // throw new Error('Method not implemented.');
} }
public async processPending(device: string) { public async processPending(device: string) {
// TODO: implement // TODO: implement
// PSEDUDOCODE // PSEDUDOCODE
// messages = PendingMessageCache.getPendingMessages(device) // messages = PendingMessageCache.getPendingMessages(device)
// isMediumGroup = device is medium group // isMediumGroup = device is medium group
// hasSession = SessionManager.hasSession(device) // hasSession = SessionManager.hasSession(device)
// if !isMediumGroup && !hasSession // if !isMediumGroup && !hasSession
// SessionManager.sendSessionRequestIfNeeded() // SessionManager.sendSessionRequestIfNeeded()
// return // Don't process any more messages // return // Don't process any more messages
// jobQueue = getJobQueue(device) // jobQueue = getJobQueue(device)
// for each message: // for each message:
// if !jobQueue.has(message.uuid) // if !jobQueue.has(message.uuid)
// promise = jobQueue.queue(message.uuid, MessageSender.send(message)) // promise = jobQueue.queue(message.uuid, MessageSender.send(message))
// promise.then().catch() // Add or remove from pending message cache on success and failure // promise.then().catch() // Add or remove from pending message cache on success and failure
// Promise shouldn't be returned; we're firing an event when processed. // Promise shouldn't be returned; we're firing an event when processed.
} }
private processAllPending() { private processAllPending() {
// TODO: Get all devices which are pending here // TODO: Get all devices which are pending here
@ -123,23 +108,21 @@ export class MessageQueue implements MessageQueueInterface {
// This should simply add to the queue. No processing // This should simply add to the queue. No processing
// TODO: implement // TODO: implement
// PSEUDOCODE // PSEUDOCODE
// if message is Session Request // if message is Session Request
// SessionManager.sendSessionRequest(device, message) // SessionManager.sendSessionRequest(device, message)
// return // return
// PendingMessageCache.addPendingMessage(device, message)
// processPending(device)
// PendingMessageCache.addPendingMessage(device, message)
// processPending(device)
if (message instanceof SessionResetMessage) { if (message instanceof SessionResetMessage) {
return; return;
} }
console.log(`[vince] queue: Message added to the queue`, message); console.log(`[vince] queue: Message added to the queue`, message);
// Add the item to the queue // Add the item to the queue
const queue = this.getJobQueue(device); const queue = this.getJobQueue(device);
const job = new Promise(resolve => { const job = new Promise(resolve => {
@ -154,7 +137,6 @@ export class MessageQueue implements MessageQueueInterface {
// Saving offline and stuff // Saving offline and stuff
// Attach to event // Attach to event
} }
private queueOpenGroupMessage(message: OpenGroupMessage) { private queueOpenGroupMessage(message: OpenGroupMessage) {
@ -171,4 +153,4 @@ export class MessageQueue implements MessageQueueInterface {
return queue; return queue;
} }
} }

@ -1,4 +1,8 @@
import { OpenGroupMessage, ContentMessage, SyncMessage } from '../messages/outgoing'; import {
OpenGroupMessage,
ContentMessage,
SyncMessage,
} from '../messages/outgoing';
import { RawMessage } from '../types/RawMessage'; import { RawMessage } from '../types/RawMessage';
import { TypedEventEmitter } from '../utils'; import { TypedEventEmitter } from '../utils';
@ -16,4 +20,4 @@ export interface MessageQueueInterface {
send(device: string, message: ContentMessage): void; send(device: string, message: ContentMessage): void;
sendToGroup(message: GroupMessageType): void; sendToGroup(message: GroupMessageType): void;
sendSyncMessage(message: SyncMessage): void; sendSyncMessage(message: SyncMessage): void;
} }

@ -5,7 +5,6 @@ import { MessageUtils, PubKey } from '../utils';
// TODO: We should be able to import functions straight from the db here without going through the window object // TODO: We should be able to import functions straight from the db here without going through the window object
// This is an abstraction for storing pending messages. // This is an abstraction for storing pending messages.
// Ideally we want to store pending messages in the database so that // Ideally we want to store pending messages in the database so that
// on next launch we can re-send the pending messages, but we don't want // on next launch we can re-send the pending messages, but we don't want
@ -25,11 +24,14 @@ export class PendingMessageCache {
this.cache = []; this.cache = [];
} }
public async add(device: PubKey, message: ContentMessage): Promise<RawMessage> { public async add(
device: PubKey,
message: ContentMessage
): Promise<RawMessage> {
const rawMessage = MessageUtils.toRawMessage(device, message); const rawMessage = MessageUtils.toRawMessage(device, message);
// Does it exist in cache already? // Does it exist in cache already?
if(this.find(rawMessage)) { if (this.find(rawMessage)) {
return rawMessage; return rawMessage;
} }
@ -39,7 +41,9 @@ export class PendingMessageCache {
return rawMessage; return rawMessage;
} }
public async remove(message: RawMessage): Promise<Array<RawMessage> | undefined> { public async remove(
message: RawMessage
): Promise<Array<RawMessage> | undefined> {
// Should only be called after message is processed // Should only be called after message is processed
// Return if message doesn't exist in cache // Return if message doesn't exist in cache
@ -100,7 +104,6 @@ export class PendingMessageCache {
// Set pubkey from string to PubKey.from() // Set pubkey from string to PubKey.from()
// TODO: // TODO:
// Build up Uint8Array from painTextBuffer in JSON // Build up Uint8Array from painTextBuffer in JSON
return encodedPendingMessages; return encodedPendingMessages;
@ -109,7 +112,9 @@ export class PendingMessageCache {
public async syncCacheWithDB() { public async syncCacheWithDB() {
// Only call when adding / removing from cache. // Only call when adding / removing from cache.
const encodedPendingMessages = JSON.stringify(this.cache) || ''; const encodedPendingMessages = JSON.stringify(this.cache) || '';
await Data.createOrUpdateItem({id: 'pendingMessages', value: encodedPendingMessages}); await Data.createOrUpdateItem({
id: 'pendingMessages',
value: encodedPendingMessages,
});
} }
}
}

@ -3,7 +3,6 @@ import { ContentMessage } from '../messages/outgoing';
import { EncryptionType } from '../types/EncryptionType'; import { EncryptionType } from '../types/EncryptionType';
import * as crypto from 'crypto'; import * as crypto from 'crypto';
function toRawMessage(device: PubKey, message: ContentMessage): RawMessage { function toRawMessage(device: PubKey, message: ContentMessage): RawMessage {
const ttl = message.ttl(); const ttl = message.ttl();
const timestamp = message.timestamp; const timestamp = message.timestamp;
@ -11,7 +10,7 @@ function toRawMessage(device: PubKey, message: ContentMessage): RawMessage {
// Get EncryptionType depending on message type. // Get EncryptionType depending on message type.
// let encryption: EncryptionType; // let encryption: EncryptionType;
// switch (message.constructor.name) { // switch (message.constructor.name) {
// case MessageType.Chat: // case MessageType.Chat:
// encryption = EncryptionType.Signal; // encryption = EncryptionType.Signal;
@ -39,7 +38,6 @@ function toRawMessage(device: PubKey, message: ContentMessage): RawMessage {
return rawMessage; return rawMessage;
} }
export enum PubKeyType { export enum PubKeyType {
Primary = 'priamry', Primary = 'priamry',
Secondary = 'secondary', Secondary = 'secondary',
@ -88,4 +86,4 @@ export class PubKey {
// Functions / Tools // Functions / Tools
export const MessageUtils = { export const MessageUtils = {
toRawMessage, toRawMessage,
}; };

@ -19,7 +19,6 @@
// identityKey: new TextEncoder().encode('identityKey'), // identityKey: new TextEncoder().encode('identityKey'),
// }; // };
// // queue with session reset message. // // queue with session reset message.
// // should return undefined // // should return undefined
// // TOOD: Send me to MESSAGE QUEUE TEST // // TOOD: Send me to MESSAGE QUEUE TEST
@ -27,7 +26,6 @@
// const timestamp = Date.now(); // const timestamp = Date.now();
// sessionResetMessage = new SessionResetMessage({timestamp, preKeyBundle}); // sessionResetMessage = new SessionResetMessage({timestamp, preKeyBundle});
// }); // });
// }); // });

@ -16,9 +16,10 @@ describe('PendingMessageCache', () => {
let pendingMessageCacheStub: PendingMessageCache; let pendingMessageCacheStub: PendingMessageCache;
// tslint:disable-next-line: promise-function-async // tslint:disable-next-line: promise-function-async
const wrapInPromise = (value: any) => new Promise(r => { const wrapInPromise = (value: any) =>
r(value); new Promise(r => {
}); r(value);
});
const generateUniqueMessage = (): ChatMessage => { const generateUniqueMessage = (): ChatMessage => {
return new ChatMessage({ return new ChatMessage({
@ -38,8 +39,12 @@ describe('PendingMessageCache', () => {
const voidPromise = wrapInPromise(undefined); const voidPromise = wrapInPromise(undefined);
// Stub out methods which touch the database. // Stub out methods which touch the database.
sandbox.stub(PendingMessageCache.prototype, 'getFromStorage').returns(mockStorageObject); sandbox
sandbox.stub(PendingMessageCache.prototype, 'syncCacheWithDB').returns(voidPromise); .stub(PendingMessageCache.prototype, 'getFromStorage')
.returns(mockStorageObject);
sandbox
.stub(PendingMessageCache.prototype, 'syncCacheWithDB')
.returns(voidPromise);
// Initialize new stubbed cache // Initialize new stubbed cache
pendingMessageCacheStub = new PendingMessageCache(); pendingMessageCacheStub = new PendingMessageCache();
@ -50,7 +55,6 @@ describe('PendingMessageCache', () => {
sandbox.restore(); sandbox.restore();
}); });
it('can initialize cache', async () => { it('can initialize cache', async () => {
const { cache } = pendingMessageCacheStub; const { cache } = pendingMessageCacheStub;
@ -59,7 +63,6 @@ describe('PendingMessageCache', () => {
expect(cache).to.have.length(0); expect(cache).to.have.length(0);
}); });
it('can add to cache', async () => { it('can add to cache', async () => {
const device = PubKey.generate(); const device = PubKey.generate();
const message = generateUniqueMessage(); const message = generateUniqueMessage();
@ -149,7 +152,9 @@ describe('PendingMessageCache', () => {
// Get pending for each specific device // Get pending for each specific device
cacheItems.forEach(item => { cacheItems.forEach(item => {
const pendingForDevice = pendingMessageCacheStub.getForDevice(item.device); const pendingForDevice = pendingMessageCacheStub.getForDevice(
item.device
);
expect(pendingForDevice).to.have.length(1); expect(pendingForDevice).to.have.length(1);
expect(pendingForDevice[0].device).to.equal(item.device.key); expect(pendingForDevice[0].device).to.equal(item.device.key);
}); });
@ -208,5 +213,4 @@ describe('PendingMessageCache', () => {
const finalCache = pendingMessageCacheStub.cache; const finalCache = pendingMessageCacheStub.cache;
expect(finalCache).to.have.length(0); expect(finalCache).to.have.length(0);
}); });
}); });

@ -5,68 +5,70 @@ import { RawMessage } from '../../../session/types/RawMessage';
import { MessageUtils, PubKey, PubKeyType } from '../../../session/utils'; import { MessageUtils, PubKey, PubKeyType } from '../../../session/utils';
describe('MessageUtils', () => { describe('MessageUtils', () => {
it('can convert to RawMessage', () => { it('can convert to RawMessage', () => {
// TOOD: MOVE ME TO MESSAGE UTILS TEST // TOOD: MOVE ME TO MESSAGE UTILS TEST
const pubkey = "0582fe8822c684999663cc6636148328fbd47c0836814c118af4e326bb4f0e1000"; const pubkey =
const messageText = "This is some message content"; '0582fe8822c684999663cc6636148328fbd47c0836814c118af4e326bb4f0e1000';
const messageText = 'This is some message content';
const isRawMessage = (object: any): object is RawMessage => { const isRawMessage = (object: any): object is RawMessage => {
return ( return (
'identifier' in object && 'identifier' in object &&
'plainTextBuffer' in object && 'plainTextBuffer' in object &&
'timestamp' in object && 'timestamp' in object &&
'device' in object && 'device' in object &&
'ttl' in object && 'ttl' in object &&
'encryption' in object 'encryption' in object
); );
} };
const message = new ChatMessage({ const message = new ChatMessage({
body: messageText, body: messageText,
identifier: '1234567890', identifier: '1234567890',
timestamp: Date.now(), timestamp: Date.now(),
attachments: undefined, attachments: undefined,
quote: undefined, quote: undefined,
expireTimer: undefined, expireTimer: undefined,
lokiProfile: undefined, lokiProfile: undefined,
preview: undefined, preview: undefined,
}); });
// Explicitly check that it's a RawMessage // Explicitly check that it's a RawMessage
const rawMessage = MessageUtils.toRawMessage(pubkey, message); const rawMessage = MessageUtils.toRawMessage(pubkey, message);
expect(isRawMessage(rawMessage)).to.be.equal(true); expect(isRawMessage(rawMessage)).to.be.equal(true);
// console.log('[vince] isRawMessage(rawMessage):', isRawMessage(rawMessage)); // console.log('[vince] isRawMessage(rawMessage):', isRawMessage(rawMessage));
// Check plaintext // Check plaintext
const plainText = message.plainTextBuffer(); const plainText = message.plainTextBuffer();
const decoded = SignalService.Content.decode(plainText); const decoded = SignalService.Content.decode(plainText);
expect(decoded.dataMessage?.body).to.be.equal(messageText); expect(decoded.dataMessage?.body).to.be.equal(messageText);
}); });
// Pubkeys // Pubkeys
it('can create new valid pubkey', () => { it('can create new valid pubkey', () => {
const validPubkey = '0582fe8822c684999663cc6636148328fbd47c0836814c118af4e326bb4f0e1000'; const validPubkey =
should().not.Throw(() => new PubKey(validPubkey), Error); '0582fe8822c684999663cc6636148328fbd47c0836814c118af4e326bb4f0e1000';
should().not.Throw(() => new PubKey(validPubkey), Error);
const pubkey = new PubKey(validPubkey);
expect(pubkey instanceof PubKey).to.be.equal(true);
});
it('invalid pubkey should throw error', () => { const pubkey = new PubKey(validPubkey);
const invalidPubkey = 'Lorem Ipsum'; expect(pubkey instanceof PubKey).to.be.equal(true);
});
should().Throw(() => new PubKey(invalidPubkey), Error); it('invalid pubkey should throw error', () => {
}); const invalidPubkey = 'Lorem Ipsum';
it('can set pubkey type', () => { should().Throw(() => new PubKey(invalidPubkey), Error);
const validPubkey = '0582fe8822c684999663cc6636148328fbd47c0836814c118af4e326bb4f0e1000'; });
const pubkeyType = PubKeyType.Primary;
should().not.Throw(() => new PubKey(validPubkey, pubkeyType), Error); it('can set pubkey type', () => {
const validPubkey =
const pubkey = new PubKey(validPubkey, pubkeyType); '0582fe8822c684999663cc6636148328fbd47c0836814c118af4e326bb4f0e1000';
expect(pubkey.type).to.be.equal(PubKeyType.Primary); const pubkeyType = PubKeyType.Primary;
});
should().not.Throw(() => new PubKey(validPubkey, pubkeyType), Error);
}); const pubkey = new PubKey(validPubkey, pubkeyType);
expect(pubkey.type).to.be.equal(PubKeyType.Primary);
});
});

@ -16,12 +16,18 @@ export class SignalProtocolAddressStub extends SignalProtocolAddress {
return new SignalProtocolAddressStub(values[0], Number(values[1])); return new SignalProtocolAddressStub(values[0], Number(values[1]));
} }
public getName(): string { return this.hexEncodedPublicKey; } public getName(): string {
public getDeviceId(): number { return this.deviceId; } return this.hexEncodedPublicKey;
}
public getDeviceId(): number {
return this.deviceId;
}
public equals(other: SignalProtocolAddress): boolean { public equals(other: SignalProtocolAddress): boolean {
return other.getName() === this.hexEncodedPublicKey; return other.getName() === this.hexEncodedPublicKey;
} }
public toString(): string { return this.hexEncodedPublicKey; } public toString(): string {
return this.hexEncodedPublicKey;
}
} }

Loading…
Cancel
Save