From 09a9cfbf371f05fedce048e75a2a00cdaa074493 Mon Sep 17 00:00:00 2001 From: sachaaaaa Date: Thu, 18 Apr 2019 18:26:23 +1000 Subject: [PATCH 1/4] Refactor long polling for better concurrent requests --- js/modules/loki_message_api.js | 106 ++++++++++++++++++++++++++++++++ libtextsecure/http-resources.js | 37 ++++++----- 2 files changed, 128 insertions(+), 15 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 102ea3090..dedb833aa 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -9,6 +9,28 @@ const { rpc } = require('./loki_rpc'); const MINIMUM_SUCCESSFUL_REQUESTS = 2; const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll'; +async function sleep_for(time) { + return new Promise(resolve => { + setTimeout(() => resolve(), time); + }); +} + +const filterIncomingMessages = async messages => { + const incomingHashes = messages.map(m => m.hash); + const dupHashes = await window.Signal.Data.getSeenMessagesByHashList( + incomingHashes + ); + const newMessages = messages.filter(m => !dupHashes.includes(m.hash)); + if (newMessages.length) { + const newHashes = newMessages.map(m => ({ + expiresAt: m.expiration, + hash: m.hash, + })); + await window.Signal.Data.saveSeenMessageHashes(newHashes); + } + return newMessages; +}; + class LokiMessageAPI { constructor({ snodeServerPort }) { this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; @@ -155,6 +177,90 @@ class LokiMessageAPI { log.info(`Successful storage message to ${pubKey}`); } + async *retrieveNextMessage(nodeUrl) { + const params = { + pubKey: ourKey, + lastHash: nodeData.lastHash || '', + }; + const options = { + timeout: 40000, + headers: { + [LOKI_LONGPOLL_HEADER]: true, + }, + }; + while (true) { + const result = await rpc( + `http://${nodeUrl}`, + this.snodeServerPort, + 'retrieve', + params, + options + ); + if (Array.isArray(result.messages) && result.messages.length) { + const filteredMessages = await this.jobQueue.add(() => + filterIncomingMessages(result.messages) + ); + if (filteredMessages.length) { + yield filteredMessages; + } + } + } + } + + async openConnection(callback) { + while (this.ourSwarmNodes.length > 0) { + const url = this.ourSwarmNodes.pop(); + const successive_failures = 0; + while (true) { + // loop breaks upon error + try { + for await (let messages of retrieveNextMessages(url)) { + const lastMessage = _.last(message.messages); + lokiSnodeAPI.updateLastHash( + url, + lastMessage.hash, + lastMessage.expiration + ); + callback(messages); + successive_failures = 0; + } + } catch (e) { + log.warn('Loki retrieve messages:', e); + if (e instanceof textsecure.WrongSwarmError) { + const { newSwarm } = e; + await lokiSnodeAPI.updateOurSwarmNodes(newSwarm); + // Try another snode + break; + } else if (e instanceof textsecure.NotFoundError) { + // DNS/Lokinet error, needs to bubble up + throw new window.textsecure.DNSResolutionError('Retrieving messages'); + } + } + + successive_failures += 1; + + if (successive_failures >= 3) + // Try another snode + break; + + await sleep_for(successive_failures * 1000); + } + } + } + + async startLongPolling(numConnections, callback) { + this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); + + for (let i = 0; i < numConnections; i += 1) + promises.push(this.openConnection(callback)); + + // blocks until all snodes in our swarms have been removed from the list + // or if there is network issues (ENOUTFOUND due to lokinet) + await Promise.all(promises); + } + + // stale function, kept around to reduce diff noise + // TODO: remove async retrieveMessages(callback) { const ourKey = window.textsecure.storage.user.getNumber(); const completedNodes = []; diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index e70901349..c3b009740 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -3,8 +3,8 @@ // eslint-disable-next-line func-names (function() { let server; - const SUCCESS_POLL_TIME = 100; - const FAIL_POLL_TIME = 2000; + const EXHAUSTED_SNODES_RETRY_DELAY = 5000; + const NUM_CONCURRENT_CONNECTIONS = 3; function stringToArrayBufferBase64(string) { return dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer(); @@ -78,24 +78,31 @@ } }; + // Note: calling callback(false) is currently not necessary this.pollServer = async callback => { + // This blocking call will return only when all attempts + // at reaching snodes are exhausted or a DNS error occured try { - await server.retrieveMessages(messages => { - messages.forEach(message => { - const { data } = message; - this.handleMessage(data); - }); - }); - connected = true; - } catch (err) { - window.log.error('Polling error: ', err); - connected = false; + await server.startLongPolling( + NUM_CONCURRENT_CONNECTIONS, + messages => { + connected = true; + callback(connected); + messages.forEach(message => { + const { data } = message; + this.handleMessage(data); + }); + } + ); + } catch(e) { + // we'll try again anyway } - const pollTime = connected ? SUCCESS_POLL_TIME : FAIL_POLL_TIME; - callback(connected); + + connected = false; + // Exhausted all our snodes urls, trying again later from scratch setTimeout(() => { this.pollServer(callback); - }, pollTime); + }, EXHAUSTED_SNODES_RETRY_DELAY); }; this.isConnected = function isConnected() { From 5f77f751d917e209d502d700204a87b9515fd5b3 Mon Sep 17 00:00:00 2001 From: sachaaaaa Date: Tue, 23 Apr 2019 14:27:47 +1000 Subject: [PATCH 2/4] Replace generator function and other various fixes --- js/modules/.eslintrc | 3 +- js/modules/loki_message_api.js | 89 +++++++++++++++------------------ libtextsecure/http-resources.js | 21 ++++---- 3 files changed, 51 insertions(+), 62 deletions(-) diff --git a/js/modules/.eslintrc b/js/modules/.eslintrc index b540ae7df..705203794 100644 --- a/js/modules/.eslintrc +++ b/js/modules/.eslintrc @@ -5,7 +5,8 @@ "node": false }, "globals": { - "console": true + "console": true, + "setTimeout": true }, "parserOptions": { "sourceType": "module" diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index dedb833aa..eb9f8e3b2 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -9,7 +9,7 @@ const { rpc } = require('./loki_rpc'); const MINIMUM_SUCCESSFUL_REQUESTS = 2; const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll'; -async function sleep_for(time) { +function sleepFor(time) { return new Promise(resolve => { setTimeout(() => resolve(), time); }); @@ -177,7 +177,7 @@ class LokiMessageAPI { log.info(`Successful storage message to ${pubKey}`); } - async *retrieveNextMessage(nodeUrl) { + async retrieveNextMessages(nodeUrl, nodeData, ourKey) { const params = { pubKey: ourKey, lastHash: nodeData.lastHash || '', @@ -188,42 +188,52 @@ class LokiMessageAPI { [LOKI_LONGPOLL_HEADER]: true, }, }; - while (true) { - const result = await rpc( - `http://${nodeUrl}`, - this.snodeServerPort, - 'retrieve', - params, - options + + const result = await rpc( + `http://${nodeUrl}`, + this.snodeServerPort, + 'retrieve', + params, + options + ); + if (Array.isArray(result.messages) && result.messages.length) { + const filteredMessages = await this.jobQueue.add(() => + filterIncomingMessages(result.messages) ); - if (Array.isArray(result.messages) && result.messages.length) { - const filteredMessages = await this.jobQueue.add(() => - filterIncomingMessages(result.messages) - ); - if (filteredMessages.length) { - yield filteredMessages; - } + if (filteredMessages.length) { + return filteredMessages; } } + return []; } async openConnection(callback) { + const ourKey = window.textsecure.storage.user.getNumber(); while (this.ourSwarmNodes.length > 0) { - const url = this.ourSwarmNodes.pop(); - const successive_failures = 0; - while (true) { - // loop breaks upon error + const url = Object.keys(this.ourSwarmNodes)[0]; + const nodeData = this.ourSwarmNodes[url]; + delete this.ourSwarmNodes[url]; + let successiveFailures = 0; + while (successiveFailures < 3) { + await sleepFor(successiveFailures * 1000); + try { - for await (let messages of retrieveNextMessages(url)) { - const lastMessage = _.last(message.messages); + const messages = await this.retrieveNextMessages( + url, + nodeData, + ourKey + ); + successiveFailures = 0; + if (messages.length) { + const lastMessage = _.last(messages); + nodeData.lashHash = lastMessage.hash; lokiSnodeAPI.updateLastHash( url, lastMessage.hash, lastMessage.expiration ); - callback(messages); - successive_failures = 0; } + callback(messages); } catch (e) { log.warn('Loki retrieve messages:', e); if (e instanceof textsecure.WrongSwarmError) { @@ -233,17 +243,12 @@ class LokiMessageAPI { break; } else if (e instanceof textsecure.NotFoundError) { // DNS/Lokinet error, needs to bubble up - throw new window.textsecure.DNSResolutionError('Retrieving messages'); + throw new window.textsecure.DNSResolutionError( + 'Retrieving messages' + ); } + successiveFailures += 1; } - - successive_failures += 1; - - if (successive_failures >= 3) - // Try another snode - break; - - await sleep_for(successive_failures * 1000); } } } @@ -251,6 +256,8 @@ class LokiMessageAPI { async startLongPolling(numConnections, callback) { this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); + const promises = []; + for (let i = 0; i < numConnections; i += 1) promises.push(this.openConnection(callback)); @@ -269,22 +276,6 @@ class LokiMessageAPI { let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); - const filterIncomingMessages = async messages => { - const incomingHashes = messages.map(m => m.hash); - const dupHashes = await window.Signal.Data.getSeenMessagesByHashList( - incomingHashes - ); - const newMessages = messages.filter(m => !dupHashes.includes(m.hash)); - if (newMessages.length) { - const newHashes = newMessages.map(m => ({ - expiresAt: m.expiration, - hash: m.hash, - })); - await window.Signal.Data.saveSeenMessageHashes(newHashes); - } - return newMessages; - }; - const nodeComplete = nodeUrl => { completedNodes.push(nodeUrl); delete ourSwarmNodes[nodeUrl]; diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index c3b009740..9f3314c6b 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -83,18 +83,15 @@ // This blocking call will return only when all attempts // at reaching snodes are exhausted or a DNS error occured try { - await server.startLongPolling( - NUM_CONCURRENT_CONNECTIONS, - messages => { - connected = true; - callback(connected); - messages.forEach(message => { - const { data } = message; - this.handleMessage(data); - }); - } - ); - } catch(e) { + await server.startLongPolling(NUM_CONCURRENT_CONNECTIONS, messages => { + connected = true; + callback(connected); + messages.forEach(message => { + const { data } = message; + this.handleMessage(data); + }); + }); + } catch (e) { // we'll try again anyway } From edd5915bf2d3b49f01b76b5b711e6e03743b0fc3 Mon Sep 17 00:00:00 2001 From: sachaaaaa Date: Fri, 26 Apr 2019 11:17:32 +1000 Subject: [PATCH 3/4] Fix use .length on object --- js/modules/loki_message_api.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index eb9f8e3b2..2c369d472 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -209,7 +209,7 @@ class LokiMessageAPI { async openConnection(callback) { const ourKey = window.textsecure.storage.user.getNumber(); - while (this.ourSwarmNodes.length > 0) { + while (!_.isEmpty(this.ourSwarmNodes)) { const url = Object.keys(this.ourSwarmNodes)[0]; const nodeData = this.ourSwarmNodes[url]; delete this.ourSwarmNodes[url]; From 1b1c18b9284ebddf66ec630da269178cd8c3deed Mon Sep 17 00:00:00 2001 From: sachaaaaa Date: Mon, 27 May 2019 12:09:11 +1000 Subject: [PATCH 4/4] Patch from Beau --- js/modules/loki_message_api.js | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 2c369d472..8b442dfb7 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -196,15 +196,7 @@ class LokiMessageAPI { params, options ); - if (Array.isArray(result.messages) && result.messages.length) { - const filteredMessages = await this.jobQueue.add(() => - filterIncomingMessages(result.messages) - ); - if (filteredMessages.length) { - return filteredMessages; - } - } - return []; + return result.messages || []; } async openConnection(callback) { @@ -218,7 +210,7 @@ class LokiMessageAPI { await sleepFor(successiveFailures * 1000); try { - const messages = await this.retrieveNextMessages( + let messages = await this.retrieveNextMessages( url, nodeData, ourKey @@ -232,7 +224,11 @@ class LokiMessageAPI { lastMessage.hash, lastMessage.expiration ); + messages = await this.jobQueue.add(() => + filterIncomingMessages(messages) + ); } + // Execute callback even with empty array to signal online status callback(messages); } catch (e) { log.warn('Loki retrieve messages:', e);