From 774c468c3956f24ccc3ac3cdba2930157a8ad4e9 Mon Sep 17 00:00:00 2001 From: Ryan Tharp Date: Sun, 16 Feb 2020 21:47:34 -0800 Subject: [PATCH 1/6] handle non-base64 responses appropriately, include which server failed in logs --- js/modules/loki_rpc.js | 56 +++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/js/modules/loki_rpc.js b/js/modules/loki_rpc.js index 2579d27f0..1e806db45 100644 --- a/js/modules/loki_rpc.js +++ b/js/modules/loki_rpc.js @@ -29,10 +29,10 @@ const decryptResponse = async (response, address) => { return {}; }; -// TODO: Don't allow arbitrary URLs, only snodes and loki servers const sendToProxy = async (options = {}, targetNode) => { const randSnode = await lokiSnodeAPI.getRandomSnodeAddress(); + // Don't allow arbitrary URLs, only snodes and loki servers const url = `https://${randSnode.ip}:${randSnode.port}/proxy`; const snPubkeyHex = StringView.hexToArrayBuffer(targetNode.pubkey_x25519); @@ -67,20 +67,42 @@ const sendToProxy = async (options = {}, targetNode) => { const response = await nodeFetch(url, firstHopOptions); process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; - const ciphertext = await response.text(); - - const ciphertextBuffer = dcodeIO.ByteBuffer.wrap( - ciphertext, - 'base64' - ).toArrayBuffer(); + // FIXME: handle nodeFetch errors/exceptions... - const plaintextBuffer = await window.libloki.crypto.DHDecrypt( - symmetricKey, - ciphertextBuffer - ); + const ciphertext = await response.text(); + let plaintext + try { + // removed the following part to match the check we have in loki_app_dot_net_api.js + // ; not done syncing; + if (ciphertext.match(/Service node is not ready: not in any swarm/)) { + log.error(`lokiRpc sendToProxy snode ${randSnode.ip}:${randSnode.port} error`, ciphertext); + // mark as bad + lokiSnodeAPI.markRandomNodeUnreachable(randSnode); + // retry + return sendToProxy(options, targetNode); + } + const ciphertextBuffer = dcodeIO.ByteBuffer.wrap( + ciphertext, + 'base64' + ).toArrayBuffer(); + + const plaintextBuffer = await window.libloki.crypto.DHDecrypt( + symmetricKey, + ciphertextBuffer + ); - const textDecoder = new TextDecoder(); - const plaintext = textDecoder.decode(plaintextBuffer); + const textDecoder = new TextDecoder(); + plaintext = textDecoder.decode(plaintextBuffer); + } catch(e) { + log.error( + 'lokiRpc sendToProxy decode error', + e.code, + e.message, + `from ${randSnode.ip}:${randSnode.port} ciphertext:`, + ciphertext + ); + return false; + } try { const jsonRes = JSON.parse(plaintext); @@ -90,10 +112,10 @@ const sendToProxy = async (options = {}, targetNode) => { return JSON.parse(jsonRes.body); } catch (e) { log.error( - 'lokiRpc sendToProxy error', + 'lokiRpc sendToProxy parse error', e.code, e.message, - 'json', + `from ${randSnode.ip}:${randSnode.port} json:`, jsonRes.body ); } @@ -102,10 +124,10 @@ const sendToProxy = async (options = {}, targetNode) => { return jsonRes; } catch (e) { log.error( - 'lokiRpc sendToProxy error', + 'lokiRpc sendToProxy parse error', e.code, e.message, - 'json', + `from ${randSnode.ip}:${randSnode.port} json:`, plaintext ); } From 69dcfa2845cb791560de1090ef074659b55440f6 Mon Sep 17 00:00:00 2001 From: Ryan Tharp Date: Sun, 16 Feb 2020 21:50:43 -0800 Subject: [PATCH 2/6] getSwarmNodes refactor to include results from RANDOM_SNODES_TO_USE nodes, make refreshSwarmNodesForPubKey return filteredNodes, initialiseRandomPool() retries 3 times with delays --- js/modules/loki_snode_api.js | 67 +++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 16 deletions(-) diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 3418c827f..60f89b562 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -4,6 +4,8 @@ const is = require('@sindresorhus/is'); const { lokiRpc } = require('./loki_rpc'); +const RANDOM_SNODES_TO_USE = 3; + class LokiSnodeAPI { constructor({ serverUrl, localUrl }) { if (!is.string(serverUrl)) { @@ -18,6 +20,7 @@ class LokiSnodeAPI { async getRandomSnodeAddress() { /* resolve random snode */ if (this.randomSnodePool.length === 0) { + // allow exceptions to pass through upwards await this.initialiseRandomPool(); } if (this.randomSnodePool.length === 0) { @@ -28,7 +31,7 @@ class LokiSnodeAPI { ]; } - async initialiseRandomPool(seedNodes = [...window.seedNodeList]) { + async initialiseRandomPool(seedNodes = [...window.seedNodeList], consecutiveErrors = 0) { const params = { limit: 20, active_only: true, @@ -43,8 +46,9 @@ class LokiSnodeAPI { Math.floor(Math.random() * seedNodes.length), 1 )[0]; + let snodes = []; try { - const result = await lokiRpc( + const response = await lokiRpc( `http://${seedNode.ip}`, seedNode.port, 'get_n_service_nodes', @@ -53,7 +57,7 @@ class LokiSnodeAPI { '/json_rpc' // Seed request endpoint ); // Filter 0.0.0.0 nodes which haven't submitted uptime proofs - const snodes = result.result.service_node_states.filter( + snodes = response.result.service_node_states.filter( snode => snode.public_ip !== '0.0.0.0' ); this.randomSnodePool = snodes.map(snode => ({ @@ -64,12 +68,20 @@ class LokiSnodeAPI { })); } catch (e) { log.warn('initialiseRandomPool error', e.code, e.message); - if (seedNodes.length === 0) { - throw new window.textsecure.SeedNodeError( - 'Failed to contact seed node' - ); + if (consecutiveErrors < 3) { + // retry after a possible delay + setTimeout(() => { + log.info('Retrying initialising random snode pool, try #', consecutiveErrors); + this.initialiseRandomPool(seedNodes, consecutiveErrors + 1); + }, consecutiveErrors * consecutiveErrors * 5000); + } else { + log.error('Giving up trying to contact seed node'); + if (snodes.length === 0) { + throw new window.textsecure.SeedNodeError( + 'Failed to contact seed node' + ); + } } - this.initialiseRandomPool(seedNodes); } } @@ -107,8 +119,9 @@ class LokiSnodeAPI { } async updateSwarmNodes(pubKey, newNodes) { + let filteredNodes = []; try { - const filteredNodes = newNodes.filter(snode => snode.ip !== '0.0.0.0'); + filteredNodes = newNodes.filter(snode => snode.ip !== '0.0.0.0'); const conversation = ConversationController.get(pubKey); await conversation.updateSwarmNodes(filteredNodes); } catch (e) { @@ -116,11 +129,13 @@ class LokiSnodeAPI { message: 'Could not get conversation', }); } + return filteredNodes; } async refreshSwarmNodesForPubKey(pubKey) { const newNodes = await this.getFreshSwarmNodes(pubKey); - this.updateSwarmNodes(pubKey, newNodes); + const filteredNodes = this.updateSwarmNodes(pubKey, newNodes); + return filteredNodes; } async getFreshSwarmNodes(pubKey) { @@ -130,6 +145,7 @@ class LokiSnodeAPI { try { newSwarmNodes = await this.getSwarmNodes(pubKey); } catch (e) { + log.error('getFreshSwarmNodes error', e.code, e.message); // TODO: Handle these errors sensibly newSwarmNodes = []; } @@ -141,9 +157,7 @@ class LokiSnodeAPI { return newSwarmNodes; } - async getSwarmNodes(pubKey) { - // TODO: Hit multiple random nodes and merge lists? - const snode = await this.getRandomSnodeAddress(); + async getSnodesForPubkey(snode, pubKey) { try { const result = await lokiRpc( `https://${snode.ip}`, @@ -158,7 +172,7 @@ class LokiSnodeAPI { ); if (!result) { log.warn( - `getSwarmNodes lokiRpc on ${snode.ip}:${ + `getSnodesForPubkey lokiRpc on ${snode.ip}:${ snode.port } returned falsish value`, result @@ -168,11 +182,32 @@ class LokiSnodeAPI { const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0'); return snodes; } catch (e) { - log.error('getSwarmNodes error', e.code, e.message); + log.error('getSnodesForPubkey error', e.code, e.message, `for ${snode.ip}:${snode.port}`); this.markRandomNodeUnreachable(snode); - return this.getSwarmNodes(pubKey); + return []; } } + + async getSwarmNodes(pubKey) { + const snodes = []; + const questions = [...Array(RANDOM_SNODES_TO_USE).keys()]; + await Promise.all( + questions.map(async () => { + // allow exceptions to pass through upwards + const rSnode = await this.getRandomSnodeAddress(); + const resList = await this.getSnodesForPubkey(rSnode, pubKey); + // should we only activate entries that are in all results? + resList.map(item => { + const hasItem = snodes.some(hItem => item.ip === hItem.ip && item.port === hItem.port); + if (!hasItem) { + snodes.push(item); + } + return true; + }); + }) + ); + return snodes; + } } module.exports = LokiSnodeAPI; From c404d1c7297826a4584cbb11750bd2be3e396468 Mon Sep 17 00:00:00 2001 From: Ryan Tharp Date: Sun, 16 Feb 2020 21:53:13 -0800 Subject: [PATCH 3/6] log exception and when we exhausted long polling pool --- libtextsecure/http-resources.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index 2e2b42a19..04de289cd 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -100,6 +100,7 @@ ); } catch (e) { // we'll try again anyway + window.log.error('http-resource pollServer error', e.code, e.message); } if (this.calledStop) { @@ -109,6 +110,10 @@ connected = false; // Exhausted all our snodes urls, trying again later from scratch setTimeout(() => { + window.log.info( + `Exhausted all our snodes urls, trying again in ${EXHAUSTED_SNODES_RETRY_DELAY / + 1000}s from scratch` + ); this.pollServer(); }, EXHAUSTED_SNODES_RETRY_DELAY); }; From 4ba4b8bb541f4d09c00c8035d2fa6e70d3c2f429 Mon Sep 17 00:00:00 2001 From: Ryan Tharp Date: Sun, 16 Feb 2020 21:56:37 -0800 Subject: [PATCH 4/6] improve logging, add one retry if not enough snodes in the swarm on long poll start --- js/modules/loki_message_api.js | 55 ++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 9ff394f41..98411fd85 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -217,7 +217,7 @@ class LokiMessageAPI { } return true; } catch (e) { - log.warn('Loki send message:', e); + log.warn('Loki send message error:', e.code, e.message, `from ${address}`); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm); @@ -272,6 +272,8 @@ class LokiMessageAPI { try { // TODO: Revert back to using snode address instead of IP let messages = await this.retrieveNextMessages(nodeData.ip, nodeData); + // this only tracks retrieval failures + // won't include parsing failures... successiveFailures = 0; if (messages.length) { const lastMessage = _.last(messages); @@ -288,7 +290,12 @@ class LokiMessageAPI { // Execute callback even with empty array to signal online status callback(messages); } catch (e) { - log.warn('Loki retrieve messages:', e.code, e.message); + log.warn( + 'Loki retrieve messages error:', + e.code, + e.message, + `on ${nodeData.ip}:${nodeData.port}` + ); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm); @@ -312,9 +319,24 @@ class LokiMessageAPI { } } if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) { + log.warn( + `removing ${nodeData.ip}:${ + nodeData.port + } from our swarm pool. We have ${ + Object.keys(this.ourSwarmNodes).length + } usable swarm nodes left` + ); await lokiSnodeAPI.unreachableNode(this.ourKey, address); } } + // if not stopPollingResult + if (_.isEmpty(this.ourSwarmNodes)) { + log.error( + 'We no longer have any swarm nodes available to try in pool, closing retrieve connection' + ); + return false; + } + return true; } async retrieveNextMessages(nodeUrl, nodeData) { @@ -342,12 +364,31 @@ class LokiMessageAPI { } async startLongPolling(numConnections, stopPolling, callback) { + log.info('startLongPolling for', numConnections, 'connections'); this.ourSwarmNodes = {}; let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); + log.info('swarmNodes', nodes.length, 'for', this.ourKey); + Object.keys(nodes).forEach(j => { + const node = nodes[j]; + log.info(`${j} ${node.ip}:${node.port}`); + }); if (nodes.length < numConnections) { - await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); - nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); + log.warn( + 'Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain' + ); + nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); + if (nodes.length < numConnections) { + log.error( + 'Could not get enough SwarmNodes for our pubkey from blockchain' + ); + } } + log.info( + `There are currently ${ + nodes.length + } swarmNodes for pubKey in our local database` + ); + for (let i = 0; i < nodes.length; i += 1) { const lastHash = await window.Signal.Data.getLastHashBySnode( nodes[i].address @@ -364,9 +405,13 @@ class LokiMessageAPI { promises.push(this.openRetrieveConnection(stopPolling, callback)); } - // blocks until all snodes in our swarms have been removed from the list + // blocks until numConnections snodes in our swarms have been removed from the list + // less than numConnections being active is fine, only need to restart if none per Niels 20/02/13 // or if there is network issues (ENOUTFOUND due to lokinet) await Promise.all(promises); + log.error('All our long poll swarm connections have been removed'); + // should we just call ourself again? + // no, our caller already handles this... } } From 4a55040688f9112268f66898c7ebd66ce61b3b29 Mon Sep 17 00:00:00 2001 From: Ryan Tharp Date: Mon, 17 Feb 2020 15:09:44 -0800 Subject: [PATCH 5/6] improve code quality --- js/modules/loki_snode_api.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 60f89b562..7bc2944ed 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -119,17 +119,16 @@ class LokiSnodeAPI { } async updateSwarmNodes(pubKey, newNodes) { - let filteredNodes = []; try { - filteredNodes = newNodes.filter(snode => snode.ip !== '0.0.0.0'); + const filteredNodes = newNodes.filter(snode => snode.ip !== '0.0.0.0'); const conversation = ConversationController.get(pubKey); await conversation.updateSwarmNodes(filteredNodes); + return filteredNodes; } catch (e) { throw new window.textsecure.ReplayableError({ message: 'Could not get conversation', }); } - return filteredNodes; } async refreshSwarmNodesForPubKey(pubKey) { From a02fe9555603b977741b9dec9462f85d6556ae06 Mon Sep 17 00:00:00 2001 From: Ryan Tharp Date: Mon, 17 Feb 2020 15:45:16 -0800 Subject: [PATCH 6/6] detect not ready through statusCode instead of string, log any non-200 statusCode, warn if no reply at all, try to debug iv errors, don't call .json() on falsish values --- js/modules/loki_rpc.js | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/js/modules/loki_rpc.js b/js/modules/loki_rpc.js index 1e806db45..61fb83926 100644 --- a/js/modules/loki_rpc.js +++ b/js/modules/loki_rpc.js @@ -67,21 +67,32 @@ const sendToProxy = async (options = {}, targetNode) => { const response = await nodeFetch(url, firstHopOptions); process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; + // detect SNode is not ready (not in swarm; not done syncing) + if (response.status === 503) { + const ciphertext = await response.text(); + log.error(`lokiRpc sendToProxy snode ${randSnode.ip}:${randSnode.port} error`, ciphertext); + // mark as bad for this round (should give it some time and improve success rates) + lokiSnodeAPI.markRandomNodeUnreachable(randSnode); + // retry for a new working snode + return sendToProxy(options, targetNode); + } + // FIXME: handle nodeFetch errors/exceptions... + if (response.status !== 200) { + // let us know we need to create handlers for new unhandled codes + log.warn('lokiRpc sendToProxy fetch non-200 statusCode', response.status); + } const ciphertext = await response.text(); - let plaintext + if (!ciphertext) { + // avoid base64 decode failure + log.warn('Server did not return any data for', options); + } + + let plaintext; + let ciphertextBuffer; try { - // removed the following part to match the check we have in loki_app_dot_net_api.js - // ; not done syncing; - if (ciphertext.match(/Service node is not ready: not in any swarm/)) { - log.error(`lokiRpc sendToProxy snode ${randSnode.ip}:${randSnode.port} error`, ciphertext); - // mark as bad - lokiSnodeAPI.markRandomNodeUnreachable(randSnode); - // retry - return sendToProxy(options, targetNode); - } - const ciphertextBuffer = dcodeIO.ByteBuffer.wrap( + ciphertextBuffer = dcodeIO.ByteBuffer.wrap( ciphertext, 'base64' ).toArrayBuffer(); @@ -101,6 +112,9 @@ const sendToProxy = async (options = {}, targetNode) => { `from ${randSnode.ip}:${randSnode.port} ciphertext:`, ciphertext ); + if (ciphertextBuffer) { + log.error('ciphertextBuffer', ciphertextBuffer); + } return false; } @@ -172,7 +186,7 @@ const lokiFetch = async (url, options = {}, targetNode = null) => { try { if (window.lokiFeatureFlags.useSnodeProxy && targetNode) { const result = await sendToProxy(fetchOptions, targetNode); - return result.json(); + return result ? result.json() : false; } if (url.match(/https:\/\//)) {