Merge pull request #295 from BeaudanBrown/refactor-sendmessage

Refactor sendmessage
pull/299/head
Beaudan Campbell-Brown 6 years ago committed by GitHub
commit e9b1359bab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,8 +5,7 @@
const _ = require('lodash'); const _ = require('lodash');
const { rpc } = require('./loki_rpc'); const { rpc } = require('./loki_rpc');
// Will be raised (to 3?) when we get more nodes const DEFAULT_CONNECTIONS = 2;
const MINIMUM_SUCCESSFUL_REQUESTS = 2;
const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll'; const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll';
function sleepFor(time) { function sleepFor(time) {
@ -31,24 +30,18 @@ const filterIncomingMessages = async messages => {
return newMessages; return newMessages;
}; };
class LokiMessageAPI { const calcNonce = (messageEventData, pubKey, data64, timestamp, ttl) => {
constructor({ snodeServerPort }) { // Nonce is returned as a base64 string to include in header
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; window.Whisper.events.trigger('calculatingPoW', messageEventData);
this.jobQueue = new window.JobQueue(); const development = window.getEnvironment() !== 'production';
} return callWorker('calcPoW', timestamp, ttl, pubKey, data64, development);
};
async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) {
const timestamp = Date.now();
// Data required to identify a message in a conversation
const messageEventData = {
pubKey,
timestamp: messageTimeStamp,
};
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); const trySendP2p = async (pubKey, data64, isPing, messageEventData) => {
const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey); const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey);
if (p2pDetails && (isPing || p2pDetails.isOnline)) { if (!p2pDetails || (!isPing && !p2pDetails.isOnline)) {
return false;
}
try { try {
const port = p2pDetails.port ? `:${p2pDetails.port}` : ''; const port = p2pDetails.port ? `:${p2pDetails.port}` : '';
@ -62,49 +55,63 @@ class LokiMessageAPI {
} else { } else {
log.info(`Successful p2p message to ${pubKey}`); log.info(`Successful p2p message to ${pubKey}`);
} }
return; return true;
} catch (e) { } catch (e) {
lokiP2pAPI.setContactOffline(pubKey); lokiP2pAPI.setContactOffline(pubKey);
if (isPing) { if (isPing) {
// If this was just a ping, we don't bother sending to storage server // If this was just a ping, we don't bother sending to storage server
log.warn('Ping failed, contact marked offline', e); log.warn('Ping failed, contact marked offline', e);
return; return true;
} }
log.warn('Failed to send P2P message, falling back to storage', e); log.warn('Failed to send P2P message, falling back to storage', e);
return false;
} }
};
class LokiMessageAPI {
constructor({ snodeServerPort }) {
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : '';
this.jobQueue = new window.JobQueue();
this.sendingSwarmNodes = {};
} }
// Nonce is returned as a base64 string to include in header async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) {
let nonce; const { isPing = false, numConnections = DEFAULT_CONNECTIONS } = options;
try { // Data required to identify a message in a conversation
window.Whisper.events.trigger('calculatingPoW', messageEventData); const messageEventData = {
const development = window.getEnvironment() !== 'production'; pubKey,
nonce = await callWorker( timestamp: messageTimeStamp,
'calcPoW', };
timestamp,
ttl, const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const p2pSuccess = await trySendP2p(
pubKey, pubKey,
data64, data64,
development isPing,
messageEventData
); );
} catch (err) { if (p2pSuccess) {
// Something went horribly wrong return;
throw err;
} }
const completedNodes = []; const timestamp = Date.now();
const failedNodes = []; const nonce = await calcNonce(
let successfulRequests = 0; messageEventData,
let canResolve = true; pubKey,
data64,
let swarmNodes = await lokiSnodeAPI.getSwarmNodesForPubKey(pubKey); timestamp,
ttl
const nodeComplete = nodeUrl => { );
completedNodes.push(nodeUrl); // Using timestamp as a unique identifier
swarmNodes = swarmNodes.filter(node => node !== nodeUrl); this.sendingSwarmNodes[timestamp] = lokiSnodeAPI.getSwarmNodesForPubKey(
}; pubKey
);
if (this.sendingSwarmNodes[timestamp].length < numConnections) {
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes);
this.sendingSwarmNodes[timestamp] = freshNodes;
}
const doRequest = async nodeUrl => {
const params = { const params = {
pubKey, pubKey,
ttl: ttl.toString(), ttl: ttl.toString(),
@ -112,69 +119,65 @@ class LokiMessageAPI {
timestamp: timestamp.toString(), timestamp: timestamp.toString(),
data: data64, data: data64,
}; };
const promises = [];
for (let i = 0; i < numConnections; i += 1) {
promises.push(this.openSendConnection(params));
}
try { const results = await Promise.all(promises);
await rpc(`http://${nodeUrl}`, this.snodeServerPort, 'store', params); delete this.sendingSwarmNodes[timestamp];
if (results.every(value => value === false)) {
nodeComplete(nodeUrl); throw new window.textsecure.EmptySwarmError(
successfulRequests += 1;
} catch (e) {
log.warn('Loki send message:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(pubKey, newSwarm);
completedNodes.push(nodeUrl);
} else if (e instanceof textsecure.NotFoundError) {
canResolve = false;
} else if (e instanceof textsecure.HTTPError) {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
} else {
const removeNode = await lokiSnodeAPI.unreachableNode(
pubKey, pubKey,
nodeUrl 'Ran out of swarm nodes to query'
); );
if (removeNode) {
log.error('Loki send message:', e);
nodeComplete(nodeUrl);
failedNodes.push(nodeUrl);
} }
if (results.every(value => value === true)) {
log.info(`Successful storage message to ${pubKey}`);
} else {
log.warn(`Partially successful storage message to ${pubKey}`);
} }
} }
};
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) { async openSendConnection(params) {
if (!canResolve) { while (!_.isEmpty(this.sendingSwarmNodes[params.timestamp])) {
throw new window.textsecure.DNSResolutionError('Sending messages'); const url = this.sendingSwarmNodes[params.timestamp].shift();
const successfulSend = await this.sendToNode(url, params);
if (successfulSend) {
return true;
} }
if (swarmNodes.length === 0) {
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
const goodNodes = _.difference(freshNodes, failedNodes);
await lokiSnodeAPI.updateSwarmNodes(pubKey, goodNodes);
swarmNodes = _.difference(freshNodes, completedNodes);
if (swarmNodes.length === 0) {
if (successfulRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
log.warn(`Partially successful storage message to ${pubKey}`);
return;
} }
throw new window.textsecure.EmptySwarmError( return false;
pubKey,
'Ran out of swarm nodes to query'
);
} }
}
const remainingRequests =
MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests;
await Promise.all( async sendToNode(url, params) {
swarmNodes let successiveFailures = 0;
.splice(0, remainingRequests) while (successiveFailures < 3) {
.map(nodeUrl => doRequest(nodeUrl)) await sleepFor(successiveFailures * 500);
); try {
await rpc(`http://${url}`, this.snodeServerPort, 'store', params);
return true;
} catch (e) {
log.warn('Loki send message:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);
this.sendingSwarmNodes[params.timestamp] = newSwarm;
return false;
} else if (e instanceof textsecure.NotFoundError) {
// TODO: Handle resolution error
successiveFailures += 1;
} else if (e instanceof textsecure.HTTPError) {
// TODO: Handle working connection but error response
successiveFailures += 1;
} else {
successiveFailures += 1;
} }
log.info(`Successful storage message to ${pubKey}`); }
}
log.error(`Failed to send to node: ${url}`);
await lokiSnodeAPI.unreachableNode(params.pubKey, url);
return false;
} }
async retrieveNextMessages(nodeUrl, nodeData, ourKey) { async retrieveNextMessages(nodeUrl, nodeData, ourKey) {
@ -210,11 +213,7 @@ class LokiMessageAPI {
await sleepFor(successiveFailures * 1000); await sleepFor(successiveFailures * 1000);
try { try {
let messages = await this.retrieveNextMessages( let messages = await this.retrieveNextMessages(url, nodeData, ourKey);
url,
nodeData,
ourKey
);
successiveFailures = 0; successiveFailures = 0;
if (messages.length) { if (messages.length) {
const lastMessage = _.last(messages); const lastMessage = _.last(messages);
@ -261,115 +260,6 @@ class LokiMessageAPI {
// or if there is network issues (ENOUTFOUND due to lokinet) // or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises); await Promise.all(promises);
} }
// stale function, kept around to reduce diff noise
// TODO: remove
async retrieveMessages(callback) {
const ourKey = window.textsecure.storage.user.getNumber();
const completedNodes = [];
let canResolve = true;
let successfulRequests = 0;
let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
const nodeComplete = nodeUrl => {
completedNodes.push(nodeUrl);
delete ourSwarmNodes[nodeUrl];
};
const doRequest = async (nodeUrl, nodeData) => {
const params = {
pubKey: ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
try {
const result = await rpc(
`http://${nodeUrl}`,
this.snodeServerPort,
'retrieve',
params,
options
);
nodeComplete(nodeUrl);
successfulRequests += 1;
if (Array.isArray(result.messages) && result.messages.length) {
const lastMessage = _.last(result.messages);
lokiSnodeAPI.updateLastHash(
nodeUrl,
lastMessage.hash,
lastMessage.expiration
);
const filteredMessages = await this.jobQueue.add(() =>
filterIncomingMessages(result.messages)
);
if (filteredMessages.length) {
callback(filteredMessages);
}
}
} catch (e) {
log.warn('Loki retrieve messages:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
completedNodes.push(nodeUrl);
} else if (e instanceof textsecure.NotFoundError) {
canResolve = false;
} else if (e instanceof textsecure.HTTPError) {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
} else {
const removeNode = await lokiSnodeAPI.unreachableNode(
ourKey,
nodeUrl
);
if (removeNode) {
log.error('Loki retrieve messages:', e);
nodeComplete(nodeUrl);
}
}
}
};
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
if (!canResolve) {
throw new window.textsecure.DNSResolutionError('Retrieving messages');
}
if (Object.keys(ourSwarmNodes).length === 0) {
ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
// Filter out the nodes we have already got responses from
completedNodes.forEach(nodeUrl => delete ourSwarmNodes[nodeUrl]);
if (Object.keys(ourSwarmNodes).length === 0) {
if (successfulRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
}
throw new window.textsecure.EmptySwarmError(
ourKey,
'Ran out of swarm nodes to query'
);
}
}
const remainingRequests =
MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests;
await Promise.all(
Object.entries(ourSwarmNodes)
.splice(0, remainingRequests)
.map(([nodeUrl, nodeData]) => doRequest(nodeUrl, nodeData))
);
}
}
} }
module.exports = LokiMessageAPI; module.exports = LokiMessageAPI;

@ -8,7 +8,6 @@ const { rpc } = require('./loki_rpc');
// Will be raised (to 3?) when we get more nodes // Will be raised (to 3?) when we get more nodes
const MINIMUM_SWARM_NODES = 1; const MINIMUM_SWARM_NODES = 1;
const FAILURE_THRESHOLD = 3;
const resolve4 = url => const resolve4 = url =>
new Promise((resolve, reject) => { new Promise((resolve, reject) => {
@ -73,29 +72,10 @@ class LokiSnodeAPI {
async unreachableNode(pubKey, nodeUrl) { async unreachableNode(pubKey, nodeUrl) {
if (pubKey === window.textsecure.storage.user.getNumber()) { if (pubKey === window.textsecure.storage.user.getNumber()) {
if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = {
failureCount: 1,
};
} else {
this.ourSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.ourSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) {
return false;
}
delete this.ourSwarmNodes[nodeUrl]; delete this.ourSwarmNodes[nodeUrl];
return true; return;
}
if (!this.contactSwarmNodes[nodeUrl]) {
this.contactSwarmNodes[nodeUrl] = {
failureCount: 1,
};
} else {
this.contactSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.contactSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) {
return false;
} }
const conversation = ConversationController.get(pubKey); const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')]; const swarmNodes = [...conversation.get('swarmNodes')];
if (swarmNodes.includes(nodeUrl)) { if (swarmNodes.includes(nodeUrl)) {
@ -103,14 +83,12 @@ class LokiSnodeAPI {
await conversation.updateSwarmNodes(filteredNodes); await conversation.updateSwarmNodes(filteredNodes);
delete this.contactSwarmNodes[nodeUrl]; delete this.contactSwarmNodes[nodeUrl];
} }
return true;
} }
async updateLastHash(nodeUrl, lastHash, expiresAt) { async updateLastHash(nodeUrl, lastHash, expiresAt) {
await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt }); await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt });
if (!this.ourSwarmNodes[nodeUrl]) { if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = { this.ourSwarmNodes[nodeUrl] = {
failureCount: 0,
lastHash, lastHash,
}; };
} else { } else {
@ -118,7 +96,7 @@ class LokiSnodeAPI {
} }
} }
async getSwarmNodesForPubKey(pubKey) { getSwarmNodesForPubKey(pubKey) {
try { try {
const conversation = ConversationController.get(pubKey); const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')]; const swarmNodes = [...conversation.get('swarmNodes')];
@ -146,7 +124,6 @@ class LokiSnodeAPI {
const ps = newNodes.map(async url => { const ps = newNodes.map(async url => {
const lastHash = await window.Signal.Data.getLastHashBySnode(url); const lastHash = await window.Signal.Data.getLastHashBySnode(url);
this.ourSwarmNodes[url] = { this.ourSwarmNodes[url] = {
failureCount: 0,
lastHash, lastHash,
}; };
}); });

@ -12,6 +12,7 @@
/* eslint-disable more/no-then */ /* eslint-disable more/no-then */
/* eslint-disable no-unreachable */ /* eslint-disable no-unreachable */
const NUM_SEND_CONNECTIONS = 2;
function OutgoingMessage( function OutgoingMessage(
server, server,
@ -187,13 +188,12 @@ OutgoingMessage.prototype = {
async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60 * 1000) { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60 * 1000) {
const pubKey = number; const pubKey = number;
try { try {
await lokiMessageAPI.sendMessage( // TODO: Make NUM_CONCURRENT_CONNECTIONS a global constant
pubKey, const options = {
data, numConnections: NUM_SEND_CONNECTIONS,
timestamp, isPing: this.isPing,
ttl, };
this.isPing await lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl, options);
);
} catch (e) { } catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {
// 409 and 410 should bubble and be handled by doSendMessage // 409 and 410 should bubble and be handled by doSendMessage

Loading…
Cancel
Save