diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 9ff394f41..98411fd85 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -217,7 +217,7 @@ class LokiMessageAPI { } return true; } catch (e) { - log.warn('Loki send message:', e); + log.warn('Loki send message error:', e.code, e.message, `from ${address}`); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm); @@ -272,6 +272,8 @@ class LokiMessageAPI { try { // TODO: Revert back to using snode address instead of IP let messages = await this.retrieveNextMessages(nodeData.ip, nodeData); + // this only tracks retrieval failures + // won't include parsing failures... successiveFailures = 0; if (messages.length) { const lastMessage = _.last(messages); @@ -288,7 +290,12 @@ class LokiMessageAPI { // Execute callback even with empty array to signal online status callback(messages); } catch (e) { - log.warn('Loki retrieve messages:', e.code, e.message); + log.warn( + 'Loki retrieve messages error:', + e.code, + e.message, + `on ${nodeData.ip}:${nodeData.port}` + ); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm); @@ -312,9 +319,24 @@ class LokiMessageAPI { } } if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) { + log.warn( + `removing ${nodeData.ip}:${ + nodeData.port + } from our swarm pool. We have ${ + Object.keys(this.ourSwarmNodes).length + } usable swarm nodes left` + ); await lokiSnodeAPI.unreachableNode(this.ourKey, address); } } + // if not stopPollingResult + if (_.isEmpty(this.ourSwarmNodes)) { + log.error( + 'We no longer have any swarm nodes available to try in pool, closing retrieve connection' + ); + return false; + } + return true; } async retrieveNextMessages(nodeUrl, nodeData) { @@ -342,12 +364,31 @@ class LokiMessageAPI { } async startLongPolling(numConnections, stopPolling, callback) { + log.info('startLongPolling for', numConnections, 'connections'); this.ourSwarmNodes = {}; let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); + log.info('swarmNodes', nodes.length, 'for', this.ourKey); + Object.keys(nodes).forEach(j => { + const node = nodes[j]; + log.info(`${j} ${node.ip}:${node.port}`); + }); if (nodes.length < numConnections) { - await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); - nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); + log.warn( + 'Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain' + ); + nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); + if (nodes.length < numConnections) { + log.error( + 'Could not get enough SwarmNodes for our pubkey from blockchain' + ); + } } + log.info( + `There are currently ${ + nodes.length + } swarmNodes for pubKey in our local database` + ); + for (let i = 0; i < nodes.length; i += 1) { const lastHash = await window.Signal.Data.getLastHashBySnode( nodes[i].address @@ -364,9 +405,13 @@ class LokiMessageAPI { promises.push(this.openRetrieveConnection(stopPolling, callback)); } - // blocks until all snodes in our swarms have been removed from the list + // blocks until numConnections snodes in our swarms have been removed from the list + // less than numConnections being active is fine, only need to restart if none per Niels 20/02/13 // or if there is network issues (ENOUTFOUND due to lokinet) await Promise.all(promises); + log.error('All our long poll swarm connections have been removed'); + // should we just call ourself again? + // no, our caller already handles this... } } diff --git a/js/modules/loki_rpc.js b/js/modules/loki_rpc.js index 2579d27f0..61fb83926 100644 --- a/js/modules/loki_rpc.js +++ b/js/modules/loki_rpc.js @@ -29,10 +29,10 @@ const decryptResponse = async (response, address) => { return {}; }; -// TODO: Don't allow arbitrary URLs, only snodes and loki servers const sendToProxy = async (options = {}, targetNode) => { const randSnode = await lokiSnodeAPI.getRandomSnodeAddress(); + // Don't allow arbitrary URLs, only snodes and loki servers const url = `https://${randSnode.ip}:${randSnode.port}/proxy`; const snPubkeyHex = StringView.hexToArrayBuffer(targetNode.pubkey_x25519); @@ -67,20 +67,56 @@ const sendToProxy = async (options = {}, targetNode) => { const response = await nodeFetch(url, firstHopOptions); process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; - const ciphertext = await response.text(); + // detect SNode is not ready (not in swarm; not done syncing) + if (response.status === 503) { + const ciphertext = await response.text(); + log.error(`lokiRpc sendToProxy snode ${randSnode.ip}:${randSnode.port} error`, ciphertext); + // mark as bad for this round (should give it some time and improve success rates) + lokiSnodeAPI.markRandomNodeUnreachable(randSnode); + // retry for a new working snode + return sendToProxy(options, targetNode); + } - const ciphertextBuffer = dcodeIO.ByteBuffer.wrap( - ciphertext, - 'base64' - ).toArrayBuffer(); + // FIXME: handle nodeFetch errors/exceptions... + if (response.status !== 200) { + // let us know we need to create handlers for new unhandled codes + log.warn('lokiRpc sendToProxy fetch non-200 statusCode', response.status); + } - const plaintextBuffer = await window.libloki.crypto.DHDecrypt( - symmetricKey, - ciphertextBuffer - ); + const ciphertext = await response.text(); + if (!ciphertext) { + // avoid base64 decode failure + log.warn('Server did not return any data for', options); + } - const textDecoder = new TextDecoder(); - const plaintext = textDecoder.decode(plaintextBuffer); + let plaintext; + let ciphertextBuffer; + try { + ciphertextBuffer = dcodeIO.ByteBuffer.wrap( + ciphertext, + 'base64' + ).toArrayBuffer(); + + const plaintextBuffer = await window.libloki.crypto.DHDecrypt( + symmetricKey, + ciphertextBuffer + ); + + const textDecoder = new TextDecoder(); + plaintext = textDecoder.decode(plaintextBuffer); + } catch(e) { + log.error( + 'lokiRpc sendToProxy decode error', + e.code, + e.message, + `from ${randSnode.ip}:${randSnode.port} ciphertext:`, + ciphertext + ); + if (ciphertextBuffer) { + log.error('ciphertextBuffer', ciphertextBuffer); + } + return false; + } try { const jsonRes = JSON.parse(plaintext); @@ -90,10 +126,10 @@ const sendToProxy = async (options = {}, targetNode) => { return JSON.parse(jsonRes.body); } catch (e) { log.error( - 'lokiRpc sendToProxy error', + 'lokiRpc sendToProxy parse error', e.code, e.message, - 'json', + `from ${randSnode.ip}:${randSnode.port} json:`, jsonRes.body ); } @@ -102,10 +138,10 @@ const sendToProxy = async (options = {}, targetNode) => { return jsonRes; } catch (e) { log.error( - 'lokiRpc sendToProxy error', + 'lokiRpc sendToProxy parse error', e.code, e.message, - 'json', + `from ${randSnode.ip}:${randSnode.port} json:`, plaintext ); } @@ -150,7 +186,7 @@ const lokiFetch = async (url, options = {}, targetNode = null) => { try { if (window.lokiFeatureFlags.useSnodeProxy && targetNode) { const result = await sendToProxy(fetchOptions, targetNode); - return result.json(); + return result ? result.json() : false; } if (url.match(/https:\/\//)) { diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 3418c827f..7bc2944ed 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -4,6 +4,8 @@ const is = require('@sindresorhus/is'); const { lokiRpc } = require('./loki_rpc'); +const RANDOM_SNODES_TO_USE = 3; + class LokiSnodeAPI { constructor({ serverUrl, localUrl }) { if (!is.string(serverUrl)) { @@ -18,6 +20,7 @@ class LokiSnodeAPI { async getRandomSnodeAddress() { /* resolve random snode */ if (this.randomSnodePool.length === 0) { + // allow exceptions to pass through upwards await this.initialiseRandomPool(); } if (this.randomSnodePool.length === 0) { @@ -28,7 +31,7 @@ class LokiSnodeAPI { ]; } - async initialiseRandomPool(seedNodes = [...window.seedNodeList]) { + async initialiseRandomPool(seedNodes = [...window.seedNodeList], consecutiveErrors = 0) { const params = { limit: 20, active_only: true, @@ -43,8 +46,9 @@ class LokiSnodeAPI { Math.floor(Math.random() * seedNodes.length), 1 )[0]; + let snodes = []; try { - const result = await lokiRpc( + const response = await lokiRpc( `http://${seedNode.ip}`, seedNode.port, 'get_n_service_nodes', @@ -53,7 +57,7 @@ class LokiSnodeAPI { '/json_rpc' // Seed request endpoint ); // Filter 0.0.0.0 nodes which haven't submitted uptime proofs - const snodes = result.result.service_node_states.filter( + snodes = response.result.service_node_states.filter( snode => snode.public_ip !== '0.0.0.0' ); this.randomSnodePool = snodes.map(snode => ({ @@ -64,12 +68,20 @@ class LokiSnodeAPI { })); } catch (e) { log.warn('initialiseRandomPool error', e.code, e.message); - if (seedNodes.length === 0) { - throw new window.textsecure.SeedNodeError( - 'Failed to contact seed node' - ); + if (consecutiveErrors < 3) { + // retry after a possible delay + setTimeout(() => { + log.info('Retrying initialising random snode pool, try #', consecutiveErrors); + this.initialiseRandomPool(seedNodes, consecutiveErrors + 1); + }, consecutiveErrors * consecutiveErrors * 5000); + } else { + log.error('Giving up trying to contact seed node'); + if (snodes.length === 0) { + throw new window.textsecure.SeedNodeError( + 'Failed to contact seed node' + ); + } } - this.initialiseRandomPool(seedNodes); } } @@ -111,6 +123,7 @@ class LokiSnodeAPI { const filteredNodes = newNodes.filter(snode => snode.ip !== '0.0.0.0'); const conversation = ConversationController.get(pubKey); await conversation.updateSwarmNodes(filteredNodes); + return filteredNodes; } catch (e) { throw new window.textsecure.ReplayableError({ message: 'Could not get conversation', @@ -120,7 +133,8 @@ class LokiSnodeAPI { async refreshSwarmNodesForPubKey(pubKey) { const newNodes = await this.getFreshSwarmNodes(pubKey); - this.updateSwarmNodes(pubKey, newNodes); + const filteredNodes = this.updateSwarmNodes(pubKey, newNodes); + return filteredNodes; } async getFreshSwarmNodes(pubKey) { @@ -130,6 +144,7 @@ class LokiSnodeAPI { try { newSwarmNodes = await this.getSwarmNodes(pubKey); } catch (e) { + log.error('getFreshSwarmNodes error', e.code, e.message); // TODO: Handle these errors sensibly newSwarmNodes = []; } @@ -141,9 +156,7 @@ class LokiSnodeAPI { return newSwarmNodes; } - async getSwarmNodes(pubKey) { - // TODO: Hit multiple random nodes and merge lists? - const snode = await this.getRandomSnodeAddress(); + async getSnodesForPubkey(snode, pubKey) { try { const result = await lokiRpc( `https://${snode.ip}`, @@ -158,7 +171,7 @@ class LokiSnodeAPI { ); if (!result) { log.warn( - `getSwarmNodes lokiRpc on ${snode.ip}:${ + `getSnodesForPubkey lokiRpc on ${snode.ip}:${ snode.port } returned falsish value`, result @@ -168,11 +181,32 @@ class LokiSnodeAPI { const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0'); return snodes; } catch (e) { - log.error('getSwarmNodes error', e.code, e.message); + log.error('getSnodesForPubkey error', e.code, e.message, `for ${snode.ip}:${snode.port}`); this.markRandomNodeUnreachable(snode); - return this.getSwarmNodes(pubKey); + return []; } } + + async getSwarmNodes(pubKey) { + const snodes = []; + const questions = [...Array(RANDOM_SNODES_TO_USE).keys()]; + await Promise.all( + questions.map(async () => { + // allow exceptions to pass through upwards + const rSnode = await this.getRandomSnodeAddress(); + const resList = await this.getSnodesForPubkey(rSnode, pubKey); + // should we only activate entries that are in all results? + resList.map(item => { + const hasItem = snodes.some(hItem => item.ip === hItem.ip && item.port === hItem.port); + if (!hasItem) { + snodes.push(item); + } + return true; + }); + }) + ); + return snodes; + } } module.exports = LokiSnodeAPI; diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index 2e2b42a19..04de289cd 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -100,6 +100,7 @@ ); } catch (e) { // we'll try again anyway + window.log.error('http-resource pollServer error', e.code, e.message); } if (this.calledStop) { @@ -109,6 +110,10 @@ connected = false; // Exhausted all our snodes urls, trying again later from scratch setTimeout(() => { + window.log.info( + `Exhausted all our snodes urls, trying again in ${EXHAUSTED_SNODES_RETRY_DELAY / + 1000}s from scratch` + ); this.pollServer(); }, EXHAUSTED_SNODES_RETRY_DELAY); };