Merge pull request #270 from sachaaaaa/concurrent_polling

Refactor long polling for better concurrent requests
pull/299/head
Beaudan Campbell-Brown 6 years ago committed by GitHub
commit fbcd49ac5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,7 +5,8 @@
"node": false
},
"globals": {
"console": true
"console": true,
"setTimeout": true
},
"parserOptions": {
"sourceType": "module"

@ -9,6 +9,28 @@ const { rpc } = require('./loki_rpc');
const MINIMUM_SUCCESSFUL_REQUESTS = 2;
const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll';
function sleepFor(time) {
return new Promise(resolve => {
setTimeout(() => resolve(), time);
});
}
const filterIncomingMessages = async messages => {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
if (newMessages.length) {
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
}
return newMessages;
};
class LokiMessageAPI {
constructor({ snodeServerPort }) {
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : '';
@ -155,6 +177,93 @@ class LokiMessageAPI {
log.info(`Successful storage message to ${pubKey}`);
}
async retrieveNextMessages(nodeUrl, nodeData, ourKey) {
const params = {
pubKey: ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
const result = await rpc(
`http://${nodeUrl}`,
this.snodeServerPort,
'retrieve',
params,
options
);
return result.messages || [];
}
async openConnection(callback) {
const ourKey = window.textsecure.storage.user.getNumber();
while (!_.isEmpty(this.ourSwarmNodes)) {
const url = Object.keys(this.ourSwarmNodes)[0];
const nodeData = this.ourSwarmNodes[url];
delete this.ourSwarmNodes[url];
let successiveFailures = 0;
while (successiveFailures < 3) {
await sleepFor(successiveFailures * 1000);
try {
let messages = await this.retrieveNextMessages(
url,
nodeData,
ourKey
);
successiveFailures = 0;
if (messages.length) {
const lastMessage = _.last(messages);
nodeData.lashHash = lastMessage.hash;
lokiSnodeAPI.updateLastHash(
url,
lastMessage.hash,
lastMessage.expiration
);
messages = await this.jobQueue.add(() =>
filterIncomingMessages(messages)
);
}
// Execute callback even with empty array to signal online status
callback(messages);
} catch (e) {
log.warn('Loki retrieve messages:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
// Try another snode
break;
} else if (e instanceof textsecure.NotFoundError) {
// DNS/Lokinet error, needs to bubble up
throw new window.textsecure.DNSResolutionError(
'Retrieving messages'
);
}
successiveFailures += 1;
}
}
}
}
async startLongPolling(numConnections, callback) {
this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
const promises = [];
for (let i = 0; i < numConnections; i += 1)
promises.push(this.openConnection(callback));
// blocks until all snodes in our swarms have been removed from the list
// or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises);
}
// stale function, kept around to reduce diff noise
// TODO: remove
async retrieveMessages(callback) {
const ourKey = window.textsecure.storage.user.getNumber();
const completedNodes = [];
@ -163,22 +272,6 @@ class LokiMessageAPI {
let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
const filterIncomingMessages = async messages => {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
if (newMessages.length) {
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
}
return newMessages;
};
const nodeComplete = nodeUrl => {
completedNodes.push(nodeUrl);
delete ourSwarmNodes[nodeUrl];

@ -3,8 +3,8 @@
// eslint-disable-next-line func-names
(function() {
let server;
const SUCCESS_POLL_TIME = 100;
const FAIL_POLL_TIME = 2000;
const EXHAUSTED_SNODES_RETRY_DELAY = 5000;
const NUM_CONCURRENT_CONNECTIONS = 3;
function stringToArrayBufferBase64(string) {
return dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer();
@ -78,24 +78,28 @@
}
};
// Note: calling callback(false) is currently not necessary
this.pollServer = async callback => {
// This blocking call will return only when all attempts
// at reaching snodes are exhausted or a DNS error occured
try {
await server.retrieveMessages(messages => {
await server.startLongPolling(NUM_CONCURRENT_CONNECTIONS, messages => {
connected = true;
callback(connected);
messages.forEach(message => {
const { data } = message;
this.handleMessage(data);
});
});
connected = true;
} catch (err) {
window.log.error('Polling error: ', err);
connected = false;
} catch (e) {
// we'll try again anyway
}
const pollTime = connected ? SUCCESS_POLL_TIME : FAIL_POLL_TIME;
callback(connected);
connected = false;
// Exhausted all our snodes urls, trying again later from scratch
setTimeout(() => {
this.pollServer(callback);
}, pollTime);
}, EXHAUSTED_SNODES_RETRY_DELAY);
};
this.isConnected = function isConnected() {

Loading…
Cancel
Save