diff --git a/js/modules/loki_app_dot_net_api.js b/js/modules/loki_app_dot_net_api.js index ecc8e9d3c..4d4a1c1b7 100644 --- a/js/modules/loki_app_dot_net_api.js +++ b/js/modules/loki_app_dot_net_api.js @@ -42,7 +42,7 @@ const sendToProxy = async ( ) => { if (!srvPubKey) { log.error( - 'loki_app_dot_net: sendToProxy called without a server public key' + 'loki_app_dot_net:::sendToProxy - called without a server public key' ); return {}; } @@ -145,7 +145,7 @@ const sendToProxy = async ( randSnode ); log.warn( - `loki_app_dot_net: Marking random snode bad, internet address ${ + `loki_app_dot_net:::sendToProxy - Marking random snode bad, internet address ${ randSnode.ip }:${ randSnode.port @@ -161,7 +161,7 @@ const sendToProxy = async ( response = JSON.parse(txtResponse); } catch (e) { log.warn( - `loki_app_dot_net: sendToProxy Could not parse outer JSON [${txtResponse}]`, + `loki_app_dot_net:::sendToProxy - Could not parse outer JSON [${txtResponse}]`, endpoint, 'on', url @@ -185,7 +185,7 @@ const sendToProxy = async ( response = options.textResponse ? respStr : JSON.parse(respStr); } catch (e) { log.warn( - `loki_app_dot_net: sendToProxy Could not parse inner JSON [${respStr}]`, + `loki_app_dot_net:::sendToProxy - Could not parse inner JSON [${respStr}]`, endpoint, 'on', url @@ -193,7 +193,7 @@ const sendToProxy = async ( } } else { log.warn( - 'loki_app_dot_net: file server secure_rpc gave an non-200 response: ', + 'loki_app_dot_net:::sendToProxy - file server secure_rpc gave an non-200 response: ', response, ` txtResponse[${txtResponse}]`, endpoint @@ -239,7 +239,11 @@ const serverRequest = async (endpoint, options = {}) => { fetchOptions.agent = snodeHttpsAgent; } } catch (e) { - log.info('serverRequest set up error:', e.code, e.message); + log.error( + 'loki_app_dot_net:::serverRequest - set up error:', + e.code, + e.message + ); return { err: e, }; @@ -257,10 +261,10 @@ const serverRequest = async (endpoint, options = {}) => { FILESERVER_HOSTS.includes(host) ) { mode = 'sendToProxy'; - const search = url.search ? `?${url.search}` : ''; + // url.search automatically includes the ? part + const search = url.search || ''; // strip first slash const endpointWithQS = `${url.pathname}${search}`.replace(/^\//, ''); - // log.info('endpointWithQS', endpointWithQS) ({ response, txtResponse, result } = await sendToProxy( srvPubKey, endpointWithQS, @@ -278,11 +282,16 @@ const serverRequest = async (endpoint, options = {}) => { txtResponse = await result.text(); // cloudflare timeouts (504s) will be html... response = options.textResponse ? txtResponse : JSON.parse(txtResponse); + // result.status will always be 200 + // emulate the correct http code if available + if (response && response.meta && response.meta.code) { + result.status = response.meta.code; + } } } catch (e) { if (txtResponse) { - log.info( - `serverRequest ${mode} error`, + log.error( + `loki_app_dot_net:::serverRequest - ${mode} error`, e.code, e.message, `json: ${txtResponse}`, @@ -290,8 +299,8 @@ const serverRequest = async (endpoint, options = {}) => { url ); } else { - log.info( - `serverRequest ${mode} error`, + log.error( + `loki_app_dot_net:::serverRequest - ${mode} error`, e.code, e.message, 'attempting connection to', @@ -1609,12 +1618,12 @@ class LokiPublicChannelAPI { if (res.err || !res.response) { log.error( - 'Could not get messages from', + `app_dot_net:::pollOnceForMessages - Could not get messages from`, this.serverAPI.baseServerUrl, this.baseChannelUrl ); if (res.err) { - log.error('pollOnceForMessages receive error', res.err); + log.error(`app_dot_net:::pollOnceForMessages - receive error`, res.err); } this.messagesPollLock = false; return; diff --git a/js/modules/loki_file_server_api.js b/js/modules/loki_file_server_api.js index b6050a6be..13ca54800 100644 --- a/js/modules/loki_file_server_api.js +++ b/js/modules/loki_file_server_api.js @@ -110,7 +110,7 @@ class LokiFileServerInstance { return; } const validAuthorisations = authorisations.filter( - a => a && typeof auth === 'object' + a => a && typeof a === 'object' ); await Promise.all( validAuthorisations.map(async auth => { @@ -231,7 +231,7 @@ class LokiFileServerInstance { Object.keys(newSlavePrimaryMap).forEach(slaveKey => { if (newSlavePrimaryMap[slaveKey] === primaryPubKey) { log.warn( - `removing unverifible ${slaveKey} to ${primaryPubKey} mapping` + `removing unverifiable ${slaveKey} to ${primaryPubKey} mapping` ); delete newSlavePrimaryMap[slaveKey]; } diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index fc9352ea7..22208663d 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -4,15 +4,10 @@ const _ = require('lodash'); const { lokiRpc } = require('./loki_rpc'); +const primitives = require('./loki_primitives'); const DEFAULT_CONNECTIONS = 3; -const MAX_ACCEPTABLE_FAILURES = 1; - -function sleepFor(time) { - return new Promise(resolve => { - setTimeout(() => resolve(), time); - }); -} +const MAX_ACCEPTABLE_FAILURES = 10; const filterIncomingMessages = async messages => { const incomingHashes = messages.map(m => m.hash); @@ -113,26 +108,10 @@ class LokiMessageAPI { promises.push(connectionPromise); } - // Taken from https://stackoverflow.com/questions/51160260/clean-way-to-wait-for-first-true-returned-by-promise - // The promise returned by this function will resolve true when the first promise - // in ps resolves true *or* it will resolve false when all of ps resolve false - const firstTrue = ps => { - const newPs = ps.map( - p => - new Promise( - // eslint-disable-next-line more/no-then - (resolve, reject) => p.then(v => v && resolve(true), reject) - ) - ); - // eslint-disable-next-line more/no-then - newPs.push(Promise.all(ps).then(() => false)); - return Promise.race(newPs); - }; - - let success; + let snode; try { // eslint-disable-next-line more/no-then - success = await firstTrue(promises); + snode = await primitives.firstTrue(promises); } catch (e) { if (e instanceof textsecure.WrongDifficultyError) { // Force nonce recalculation @@ -145,33 +124,37 @@ class LokiMessageAPI { } throw e; } - if (!success) { + if (!snode) { throw new window.textsecure.EmptySwarmError( pubKey, 'Ran out of swarm nodes to query' ); } log.info( - `loki_message:::sendMessage - Successfully stored message to ${pubKey}` + `loki_message:::sendMessage - Successfully stored message to ${pubKey} via ${ + snode.ip + }:${snode.port}` ); } async refreshSendingSwarm(pubKey, timestamp) { - const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey); - await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes); + const freshNodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(pubKey); this.sendingData[timestamp].swarm = freshNodes; this.sendingData[timestamp].hasFreshList = true; return true; } async _openSendConnection(params) { + // timestamp is likely the current second... + while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) { const snode = this.sendingData[params.timestamp].swarm.shift(); // TODO: Revert back to using snode address instead of IP const successfulSend = await this._sendToNode(snode, params); if (successfulSend) { - return true; + return snode; } + // should we mark snode as bad if it can't store our message? } if (!this.sendingData[params.timestamp].hasFreshList) { @@ -194,7 +177,13 @@ class LokiMessageAPI { async _sendToNode(targetNode, params) { let successiveFailures = 0; while (successiveFailures < MAX_ACCEPTABLE_FAILURES) { - await sleepFor(successiveFailures * 500); + // the higher this is, the longer the user delay is + // we don't want to burn through all our retries quickly + // we need to give the node a chance to heal + // also failed the user quickly, just means they pound the retry faster + // this favors a lot more retries and lower delays + // but that may chew up the bandwidth... + await primitives.sleepFor(successiveFailures * 500); try { const result = await lokiRpc( `https://${targetNode.ip}`, @@ -208,10 +197,11 @@ class LokiMessageAPI { // do not return true if we get false here... if (result === false) { + // this means the node we asked for is likely down log.warn( - `loki_message:::_sendToNode - Got false from ${targetNode.ip}:${ - targetNode.port - }` + `loki_message:::_sendToNode - Try #${successiveFailures}/${MAX_ACCEPTABLE_FAILURES} ${ + targetNode.ip + }:${targetNode.port} failed` ); successiveFailures += 1; // eslint-disable-next-line no-continue @@ -273,7 +263,8 @@ class LokiMessageAPI { return false; } - async _openRetrieveConnection(stopPollingPromise, callback) { + async _openRetrieveConnection(pSwarmPool, stopPollingPromise, callback) { + const swarmPool = pSwarmPool; // lint let stopPollingResult = false; // When message_receiver restarts from onoffline/ononline events it closes @@ -285,10 +276,10 @@ class LokiMessageAPI { stopPollingResult = result; }); - while (!stopPollingResult && !_.isEmpty(this.ourSwarmNodes)) { - const address = Object.keys(this.ourSwarmNodes)[0]; - const nodeData = this.ourSwarmNodes[address]; - delete this.ourSwarmNodes[address]; + while (!stopPollingResult && !_.isEmpty(swarmPool)) { + const address = Object.keys(swarmPool)[0]; // X.snode hostname + const nodeData = swarmPool[address]; + delete swarmPool[address]; let successiveFailures = 0; while ( !stopPollingResult && @@ -300,6 +291,7 @@ class LokiMessageAPI { // so the user facing UI can report unhandled errors // except in this case of living inside http-resource pollServer // because it just restarts more connections... + let messages = await this._retrieveNextMessages(nodeData); // this only tracks retrieval failures // won't include parsing failures... @@ -328,11 +320,13 @@ class LokiMessageAPI { if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm); + // FIXME: restart all openRetrieves when this happens... + // FIXME: lokiSnode should handle this for (let i = 0; i < newSwarm.length; i += 1) { const lastHash = await window.Signal.Data.getLastHashBySnode( newSwarm[i] ); - this.ourSwarmNodes[newSwarm[i]] = { + swarmPool[newSwarm[i]] = { lastHash, }; } @@ -348,7 +342,7 @@ class LokiMessageAPI { } // Always wait a bit as we are no longer long-polling - await sleepFor(Math.max(successiveFailures, 2) * 1000); + await primitives.sleepFor(Math.max(successiveFailures, 2) * 1000); } if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) { const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode( @@ -359,15 +353,15 @@ class LokiMessageAPI { `loki_message:::_openRetrieveConnection - too many successive failures, removing ${ nodeData.ip }:${nodeData.port} from our swarm pool. We have ${ - Object.keys(this.ourSwarmNodes).length - } usable swarm nodes left (${ + Object.keys(swarmPool).length + } usable swarm nodes left for our connection (${ remainingSwarmSnodes.length } in local db)` ); } } // if not stopPollingResult - if (_.isEmpty(this.ourSwarmNodes)) { + if (_.isEmpty(swarmPool)) { log.error( 'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection' ); @@ -402,7 +396,7 @@ class LokiMessageAPI { if (result === false) { // make a note of it because of caller doesn't care... log.warn( - `loki_message:::_retrieveNextMessages - lokiRpc returned false to ${ + `loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${ nodeData.ip }:${nodeData.port}` ); @@ -413,9 +407,10 @@ class LokiMessageAPI { // we don't throw or catch here async startLongPolling(numConnections, stopPolling, callback) { - this.ourSwarmNodes = {}; // load from local DB - let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); + let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey, { + fetchHashes: true, + }); if (nodes.length < numConnections) { log.warn( 'loki_message:::startLongPolling - Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain' @@ -436,19 +431,21 @@ class LokiMessageAPI { 'for', this.ourKey ); - Object.keys(nodes).forEach(j => { - const node = nodes[j]; - log.info(`loki_message: ${j} ${node.ip}:${node.port}`); - }); - for (let i = 0; i < nodes.length; i += 1) { - const lastHash = await window.Signal.Data.getLastHashBySnode( - nodes[i].address - ); - this.ourSwarmNodes[nodes[i].address] = { - ...nodes[i], - lastHash, - }; + // ok now split up our swarm pool into numConnections number of pools + // one for each retrieve connection + + // floor or ceil probably doesn't matter, since it's likely always uneven + const poolSize = Math.floor(nodes.length / numConnections); + const pools = []; + while (nodes.length) { + const poolList = nodes.splice(0, poolSize); + const byAddressObj = poolList.reduce((result, node) => { + // eslint-disable-next-line no-param-reassign + result[node.address] = node; + return result; + }, {}); + pools.push(byAddressObj); } const promises = []; @@ -457,14 +454,15 @@ class LokiMessageAPI { for (let i = 0; i < numConnections; i += 1) { promises.push( // eslint-disable-next-line more/no-then - this._openRetrieveConnection(stopPolling, callback).then(() => { - unresolved -= 1; - log.info( - 'loki_message:::startLongPolling - There are', - unresolved, - 'open retrieve connections left' - ); - }) + this._openRetrieveConnection(pools[i], stopPolling, callback).then( + stoppedPolling => { + unresolved -= 1; + log.info( + `loki_message:::startLongPolling - There are ${unresolved}`, + `open retrieve connections left. Stopped? ${stoppedPolling}` + ); + } + ) ); } @@ -472,7 +470,7 @@ class LokiMessageAPI { // less than numConnections being active is fine, only need to restart if none per Niels 20/02/13 // or if there is network issues (ENOUTFOUND due to lokinet) await Promise.all(promises); - log.error( + log.warn( 'loki_message:::startLongPolling - All our long poll swarm connections have been removed' ); // should we just call ourself again? diff --git a/js/modules/loki_primitives.js b/js/modules/loki_primitives.js new file mode 100644 index 000000000..48c1c0568 --- /dev/null +++ b/js/modules/loki_primitives.js @@ -0,0 +1,149 @@ +/* global clearTimeout */ +// was timeoutDelay +const sleepFor = ms => new Promise(resolve => setTimeout(resolve, ms)); + +let log; +function configure(options = {}) { + ({ log } = options); +} + +// Taken from https://stackoverflow.com/questions/51160260/clean-way-to-wait-for-first-true-returned-by-promise +// The promise returned by this function will resolve true when the first promise +// in ps resolves true *or* it will resolve false when all of ps resolve false +const firstTrue = ps => { + const newPs = ps.map( + p => + new Promise( + // eslint-disable-next-line more/no-then + (resolve, reject) => p.then(v => v && resolve(v), reject) + ) + ); + // eslint-disable-next-line more/no-then + newPs.push(Promise.all(ps).then(() => false)); + return Promise.race(newPs); +}; + +// one action resolves all +const snodeGlobalLocks = {}; +async function allowOnlyOneAtATime(name, process, timeout) { + // if currently not in progress + if (snodeGlobalLocks[name] === undefined) { + // set lock + snodeGlobalLocks[name] = new Promise(async (resolve, reject) => { + // set up timeout feature + let timeoutTimer = null; + if (timeout) { + timeoutTimer = setTimeout(() => { + log.warn( + `loki_primitives:::allowOnlyOneAtATime - TIMEDOUT after ${timeout}s` + ); + delete snodeGlobalLocks[name]; // clear lock + reject(); + }, timeout); + } + // do actual work + let innerRetVal; + try { + innerRetVal = await process(); + } catch (e) { + log.error( + `loki_primitives:::allowOnlyOneAtATime - error ${e.code} ${e.message}` + ); + // clear timeout timer + if (timeout) { + if (timeoutTimer !== null) { + clearTimeout(timeoutTimer); + timeoutTimer = null; + } + } + delete snodeGlobalLocks[name]; // clear lock + throw e; + } + // clear timeout timer + if (timeout) { + if (timeoutTimer !== null) { + clearTimeout(timeoutTimer); + timeoutTimer = null; + } + } + delete snodeGlobalLocks[name]; // clear lock + // release the kraken + resolve(innerRetVal); + }); + } + let outerRetval; + // handle any timeouts + try { + outerRetval = await snodeGlobalLocks[name]; + } catch (e) { + // we will throw for each time allowOnlyOneAtATime has been called in parallel + log.error( + 'loki_primitives:::allowOnlyOneAtATime - error', + e.code, + e.message + ); + throw e; + } + return outerRetval; +} + +function abortableIterator(array, iterator) { + let abortIteration = false; + + // for the control promise + let controlResolveFunctor; + const stopPolling = new Promise(res => { + // store resolve functor + controlResolveFunctor = res; + }); + + // eslint-disable-next-line more/no-then + stopPolling.then(() => { + abortIteration = true; + }); + + const destructableList = [...array]; + const accum = []; + + return { + start: async serially => { + let item = destructableList.pop(); + while (item && !abortIteration) { + // console.log('iterating on item', item); + if (serially) { + try { + // eslint-disable-next-line no-await-in-loop + accum.push(await iterator(item)); + } catch (e) { + log.error( + `loki_primitives:::abortableIterator - error ${e.code} ${ + e.message + }` + ); + throw e; + } + } else { + accum.push(iterator(item)); + } + item = destructableList.pop(); + } + return accum; + }, + stop: () => { + /* + log.debug('loki_primitives:::abortableIterator - Stopping', + destructableList.length, '+', accum.length, '=', array.length, + 'aborted?', abortIteration); + */ + controlResolveFunctor(); + }, + }; +} + +module.exports = { + configure, + sleepFor, + allowOnlyOneAtATime, + abortableIterator, + firstTrue, +}; diff --git a/js/modules/loki_public_chat_api.js b/js/modules/loki_public_chat_api.js index aa89828a4..7d9222876 100644 --- a/js/modules/loki_public_chat_api.js +++ b/js/modules/loki_public_chat_api.js @@ -72,7 +72,9 @@ class LokiPublicChatFactoryAPI extends EventEmitter { log.warn(`Invalid server ${serverUrl}`); return null; } - log.info(`set token ${thisServer.token} for ${serverUrl}`); + if (window.isDev) { + log.info(`set token ${thisServer.token} for ${serverUrl}`); + } this.servers.push(thisServer); } diff --git a/js/modules/loki_rpc.js b/js/modules/loki_rpc.js index 5ee2794bd..561e01db3 100644 --- a/js/modules/loki_rpc.js +++ b/js/modules/loki_rpc.js @@ -3,6 +3,7 @@ const nodeFetch = require('node-fetch'); const https = require('https'); +const primitives = require('./loki_primitives'); const snodeHttpsAgent = new https.Agent({ rejectUnauthorized: false, @@ -13,8 +14,6 @@ const endpointBase = '/storage_rpc/v1'; // Request index for debugging let onionReqIdx = 0; -const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms)); - const encryptForNode = async (node, payload) => { const textEncoder = new TextEncoder(); const plaintext = textEncoder.encode(payload); @@ -195,7 +194,8 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { let snodePool = await lokiSnodeAPI.getRandomSnodePool(); if (snodePool.length < 2) { - log.error( + // this is semi-normal to happen + log.info( 'lokiRpc::sendToProxy - Not enough service nodes for a proxy request, only have:', snodePool.length, 'snode, attempting refresh' @@ -277,7 +277,31 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { return sendToProxy(options, targetNode, retryNumber + 1); } + // 504 is only present in 2.0.3 and after + // relay is fine but destination is not good + if (response.status === 504) { + const pRetryNumber = retryNumber + 1; + if (pRetryNumber > 3) { + log.warn( + `lokiRpc:::sendToProxy - snode ${randSnode.ip}:${randSnode.port}`, + `can not relay to target node ${targetNode.ip}:${targetNode.port}`, + `after 3 retries` + ); + if (options.ourPubKey) { + lokiSnodeAPI.unreachableNode(options.ourPubKey, targetNode); + } + return false; + } + // we don't have to wait here + // because we're not marking the random snode bad + + // grab a fresh random one + return sendToProxy(options, targetNode, pRetryNumber); + } + // detect SNode is not ready (not in swarm; not done syncing) + // 503 can be proxy target or destination in pre 2.0.3 + // 2.0.3 and after means target if (response.status === 503 || response.status === 500) { // this doesn't mean the random node is bad, it could be the target node // but we got a ton of randomPool nodes, let's just not worry about this one @@ -315,7 +339,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { // 500 burns through a node too fast, // let's slow the retry to give it more time to recover if (response.status === 500) { - await timeoutDelay(5000); + await primitives.sleepFor(5000); } return sendToProxy(options, targetNode, pRetryNumber); } @@ -387,10 +411,17 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { // emulate nodeFetch response... jsonRes.json = () => { try { + if (jsonRes.body === 'Timestamp error: check your clock') { + log.error( + `lokiRpc:::sendToProxy - Timestamp error: check your clock`, + Date.now() + ); + return false; + } return JSON.parse(jsonRes.body); } catch (e) { log.error( - 'lokiRpc:::sendToProxy - parse error', + 'lokiRpc:::sendToProxy - (inner) parse error', e.code, e.message, `from ${randSnode.ip}:${randSnode.port} json:`, @@ -400,7 +431,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { return false; }; if (retryNumber) { - log.info( + log.debug( `lokiRpc:::sendToProxy - request succeeded,`, `snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${ targetNode.port @@ -411,7 +442,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { return jsonRes; } catch (e) { log.error( - 'lokiRpc:::sendToProxy - parse error', + 'lokiRpc:::sendToProxy - (outer) parse error', e.code, e.message, `from ${randSnode.ip}:${randSnode.port} json:`, @@ -475,12 +506,19 @@ const lokiFetch = async (url, options = {}, targetNode = null) => { const result = await sendToProxy(fetchOptions, targetNode); if (result === false) { // should we retry? - log.warn(`lokiRpc:::lokiFetch - sendToProxy returned false`); + + // even though we can't be sure our caller is going to log or handle the failure + // we do know that sendToProxy should be logging + // so I don't think we need or want a log item here... + // log.warn(`lokiRpc:::lokiFetch - sendToProxy failed`); + // one case is: // snodePool didn't have enough // even after a refresh // likely a network disconnect? - // but not all cases... + // another is: + // failure to send to target node after 3 retries + // what else? /* log.warn( 'lokiRpc:::lokiFetch - useSnodeProxy failure, could not refresh randomPool, offline?' @@ -498,7 +536,7 @@ const lokiFetch = async (url, options = {}, targetNode = null) => { fetchOptions.agent = snodeHttpsAgent; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; } else { - log.info('lokirpc:::lokiFetch - http communication', url); + log.debug('lokirpc:::lokiFetch - http communication', url); } const response = await nodeFetch(url, fetchOptions); // restore TLS checking diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index fe7457037..dfb9d94fc 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -1,8 +1,11 @@ /* eslint-disable class-methods-use-this */ -/* global window, textsecure, ConversationController, _, log, clearTimeout, process, Buffer, StringView, dcodeIO */ +/* global window, textsecure, ConversationController, _, log, process, Buffer, StringView, dcodeIO */ -const is = require('@sindresorhus/is'); const { lokiRpc } = require('./loki_rpc'); +// not sure I like this name but it's been than util +const primitives = require('./loki_primitives'); + +const is = require('@sindresorhus/is'); const https = require('https'); const nodeFetch = require('node-fetch'); const semver = require('semver'); @@ -13,8 +16,94 @@ const snodeHttpsAgent = new https.Agent({ const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3; const SEED_NODE_RETRIES = 3; +const SNODE_VERSION_RETRIES = 3; + +const compareSnodes = (current, search) => + current.pubkey_ed25519 === search.pubkey_ed25519; + +// just get the filtered list +async function tryGetSnodeListFromLokidSeednode( + seedNodes = [...window.seedNodeList] +) { + // Removed limit until there is a way to get snode info + // for individual nodes (needed for guard nodes); this way + // we get all active nodes + const params = { + active_only: true, + fields: { + public_ip: true, + storage_port: true, + pubkey_x25519: true, + pubkey_ed25519: true, + }, + }; + // FIXME: use sample + const seedNode = seedNodes.splice( + Math.floor(Math.random() * seedNodes.length), + 1 + )[0]; + let snodes = []; + try { + const response = await lokiRpc( + `http://${seedNode.ip}`, + seedNode.port, + 'get_n_service_nodes', + params, + {}, // Options + '/json_rpc' // Seed request endpoint + ); + // Filter 0.0.0.0 nodes which haven't submitted uptime proofs + snodes = response.result.service_node_states.filter( + snode => snode.public_ip !== '0.0.0.0' + ); + // throw before clearing the lock, so the retries can kick in + if (snodes.length === 0) { + // does this error message need to be exactly this? + throw new window.textsecure.SeedNodeError('Failed to contact seed node'); + } + return snodes; + } catch (e) { + log.warn( + 'loki_snodes:::tryGetSnodeListFromLokidSeednode - error', + e.code, + e.message + ); + if (snodes.length === 0) { + throw new window.textsecure.SeedNodeError('Failed to contact seed node'); + } + } + return []; +} -const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms)); +async function getSnodeListFromLokidSeednode( + seedNodes = [...window.seedNodeList], + retries = 0 +) { + let snodes = []; + try { + snodes = await tryGetSnodeListFromLokidSeednode(seedNodes); + } catch (e) { + log.warn( + 'loki_snodes:::getSnodeListFromLokidSeednode - error', + e.code, + e.message + ); + // handle retries in case of temporary hiccups + if (retries < SEED_NODE_RETRIES) { + setTimeout(() => { + log.info( + 'loki_snodes:::refreshRandomPoolPromise - Retrying initialising random snode pool, try #', + retries + ); + getSnodeListFromLokidSeednode(seedNodes, retries + 1); + }, retries * retries * 5000); + } else { + log.error('loki_snodes:::getSnodeListFromLokidSeednode - failing'); + throw new window.textsecure.SeedNodeError('Failed to contact seed node'); + } + } + return snodes; +} class LokiSnodeAPI { constructor({ serverUrl, localUrl }) { @@ -25,10 +114,7 @@ class LokiSnodeAPI { this.localUrl = localUrl; // localhost.loki this.randomSnodePool = []; this.swarmsPendingReplenish = {}; - this.refreshRandomPoolPromise = false; - this.versionPools = {}; - this.versionMap = {}; // reverse version look up - this.versionsRetrieved = false; // to mark when it's done getting versions + this.stopGetAllVersionPromiseControl = false; this.onionPaths = []; this.guardNodes = []; @@ -36,7 +122,12 @@ class LokiSnodeAPI { async getRandomSnodePool() { if (this.randomSnodePool.length === 0) { - await this.refreshRandomPool(); + // allow exceptions to pass through upwards without the unhandled promise rejection + try { + await this.refreshRandomPool(); + } catch (e) { + throw e; + } } return this.randomSnodePool; } @@ -97,8 +188,8 @@ class LokiSnodeAPI { async selectGuardNodes() { const _ = window.Lodash; + // FIXME: handle rejections let nodePool = await this.getRandomSnodePool(); - if (nodePool.length === 0) { log.error(`Could not select guarn nodes: node pool is empty`); return []; @@ -127,7 +218,6 @@ class LokiSnodeAPI { return []; } } - // The use of await inside while is intentional: // we only want to repeat if the await fails // eslint-disable-next-line-no-await-in-loop @@ -274,13 +364,17 @@ class LokiSnodeAPI { } async getRandomSnodeAddress() { - /* resolve random snode */ - if (this.randomSnodePool.length === 0) { - // allow exceptions to pass through upwards - await this.refreshRandomPool(); - } + // resolve random snode if (this.randomSnodePool.length === 0) { - throw new window.textsecure.SeedNodeError('Invalid seed node response'); + // allow exceptions to pass through upwards without the unhandled promise rejection + try { + await this.refreshRandomPool(); + } catch (e) { + throw e; + } + if (this.randomSnodePool.length === 0) { + throw new window.textsecure.SeedNodeError('Invalid seed node response'); + } } // FIXME: _.sample? return this.randomSnodePool[ @@ -288,43 +382,51 @@ class LokiSnodeAPI { ]; } - async getNodesMinVersion(minVersion) { - const _ = window.Lodash; - - return _.flatten( - _.entries(this.versionPools) - .filter(v => semver.gte(v[0], minVersion)) - .map(v => v[1]) + // not cacheable because we write to this.randomSnodePool elsewhere + getNodesMinVersion(minVersion) { + return this.randomSnodePool.filter( + node => node.version && semver.gt(node.version, minVersion) ); } // use nodes that support more than 1mb async getRandomProxySnodeAddress() { - /* resolve random snode */ + // resolve random snode if (this.randomSnodePool.length === 0) { - // allow exceptions to pass through upwards - await this.refreshRandomPool(); - } - if (this.randomSnodePool.length === 0) { - throw new window.textsecure.SeedNodeError('Invalid seed node response'); + // allow exceptions to pass through upwards without the unhandled promise rejection + try { + await this.refreshRandomPool(); + } catch (e) { + log.error( + `loki_snode:::getRandomProxySnodeAddress - error ${e.code} ${ + e.message + }` + ); + throw e; + } + if (this.randomSnodePool.length === 0) { + throw new window.textsecure.SeedNodeError('Invalid seed node response'); + } } - const goodVersions = Object.keys(this.versionPools).filter(version => - semver.gt(version, '2.0.1') - ); - if (!goodVersions.length) { + const goodPool = this.getNodesMinVersion('2.0.1'); + if (!goodPool.length) { + // FIXME: retry + log.warn( + `loki_snode:::getRandomProxySnodeAddress - no good versions yet` + ); return false; } // FIXME: _.sample? - const goodVersion = - goodVersions[Math.floor(Math.random() * goodVersions.length)]; - const pool = this.versionPools[goodVersion]; - // FIXME: _.sample? - return pool[Math.floor(Math.random() * pool.length)]; + const goodRandomNode = + goodPool[Math.floor(Math.random() * goodPool.length)]; + return goodRandomNode; } // WARNING: this leaks our IP to all snodes but with no other identifying information - // except that a client started up or ran out of random pool snodes - async getVersion(node) { + // except "that a client started up" or "ran out of random pool snodes" + // and the order of the list is randomized, so a snode can't tell if it just started or not + async _getVersion(node, options = {}) { + const retries = options.retries || 0; try { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; const result = await nodeFetch( @@ -334,191 +436,159 @@ class LokiSnodeAPI { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1'; const data = await result.json(); if (data.version) { - if (this.versionPools[data.version] === undefined) { - this.versionPools[data.version] = [node]; + const foundNodeIdx = this.randomSnodePool.findIndex(n => + compareSnodes(n, node) + ); + if (foundNodeIdx !== -1) { + this.randomSnodePool[foundNodeIdx].version = data.version; } else { - this.versionPools[data.version].push(node); + // maybe already marked bad... + log.debug( + `loki_snode:::_getVersion - can't find ${node.ip}:${ + node.port + } in randomSnodePool` + ); } - // set up reverse mapping for removal lookup - this.versionMap[`${node.ip}:${node.port}`] = data.version; } + return data.version; } catch (e) { // ECONNREFUSED likely means it's just offline... // ECONNRESET seems to retry and fail as ECONNREFUSED (so likely a node going offline) // ETIMEDOUT not sure what to do about these // retry for now but maybe we should be marking bad... if (e.code === 'ECONNREFUSED') { - this.markRandomNodeUnreachable(node, { versionPoolFailure: true }); + this.markRandomNodeUnreachable(node); const randomNodesLeft = this.getRandomPoolLength(); // clean up these error messages to be a little neater log.warn( - `loki_snode:::getVersion - ${node.ip}:${ + `loki_snode:::_getVersion - ${node.ip}:${ node.port } is offline, removing, leaving ${randomNodesLeft} in the randomPool` ); - } else { - // mostly ECONNRESETs + // if not ECONNREFUSED, it's mostly ECONNRESETs // ENOTFOUND could mean no internet or hiccup + } else if (retries < SNODE_VERSION_RETRIES) { log.warn( - 'loki_snode:::getVersion - Error', + 'loki_snode:::_getVersion - Error', e.code, e.message, `on ${node.ip}:${node.port} retrying in 1s` ); - await timeoutDelay(1000); - await this.getVersion(node); + await primitives.sleepFor(1000); + return this._getVersion(node, { ...options, retries: retries + 1 }); + } else { + this.markRandomNodeUnreachable(node); + const randomNodesLeft = this.getRandomPoolLength(); + log.warn( + `loki_snode:::_getVersion - failing to get version for ${node.ip}:${ + node.port + }, removing, leaving ${randomNodesLeft} in the randomPool` + ); } + // maybe throw? + return false; } } - async refreshRandomPool(seedNodes = [...window.seedNodeList]) { - // if currently not in progress - if (this.refreshRandomPoolPromise === false) { - // set lock - this.refreshRandomPoolPromise = new Promise(async (resolve, reject) => { - let timeoutTimer = null; - // private retry container - const trySeedNode = async (consecutiveErrors = 0) => { - // Removed limit until there is a way to get snode info - // for individual nodes (needed for guard nodes); this way - // we get all active nodes - const params = { - active_only: true, - fields: { - public_ip: true, - storage_port: true, - pubkey_x25519: true, - pubkey_ed25519: true, - }, - }; - const seedNode = seedNodes.splice( - Math.floor(Math.random() * seedNodes.length), - 1 - )[0]; - let snodes = []; - try { - log.info( - 'loki_snodes:::refreshRandomPoolPromise - Refreshing random snode pool' - ); - const response = await lokiRpc( - `http://${seedNode.ip}`, - seedNode.port, - 'get_n_service_nodes', - params, - {}, // Options - '/json_rpc' // Seed request endpoint - ); - // Filter 0.0.0.0 nodes which haven't submitted uptime proofs - snodes = response.result.service_node_states.filter( - snode => snode.public_ip !== '0.0.0.0' - ); - // make sure order of the list is random, so we get version in a non-deterministic way - snodes = _.shuffle(snodes); - // commit changes to be live - // we'll update the version (in case they upgrade) every cycle - this.versionPools = {}; - this.versionsRetrieved = false; - this.randomSnodePool = snodes.map(snode => ({ - ip: snode.public_ip, - port: snode.storage_port, - pubkey_x25519: snode.pubkey_x25519, - pubkey_ed25519: snode.pubkey_ed25519, - })); - log.info( - 'loki_snodes:::refreshRandomPoolPromise - Refreshed random snode pool with', - this.randomSnodePool.length, - 'snodes' - ); - // clear lock - this.refreshRandomPoolPromise = null; - if (timeoutTimer !== null) { - clearTimeout(timeoutTimer); - timeoutTimer = null; - } - // start polling versions - resolve(); - // now get version for all snodes - // also acts an early online test/purge of bad nodes - let c = 0; - const verionStart = Date.now(); - const t = this.randomSnodePool.length; - const noticeEvery = parseInt(t / 10, 10); - // eslint-disable-next-line no-restricted-syntax - for (const node of this.randomSnodePool) { - c += 1; - // eslint-disable-next-line no-await-in-loop - await this.getVersion(node); - if (c % noticeEvery === 0) { - // give stats - const diff = Date.now() - verionStart; - log.info( - `${c}/${t} pool version status update, has taken ${diff.toLocaleString()}ms` - ); - Object.keys(this.versionPools).forEach(version => { - const nodes = this.versionPools[version].length; - log.info( - `version ${version} has ${nodes.toLocaleString()} snodes` - ); - }); - } - } - log.info('Versions retrieved from network!'); - this.versionsRetrieved = true; - } catch (e) { - log.warn( - 'loki_snodes:::refreshRandomPoolPromise - error', - e.code, - e.message + // now get version for all snodes + // also acts an early online test/purge of bad nodes + async _getAllVerionsForRandomSnodePool() { + // let count = 0; + // const verionStart = Date.now(); + // const total = this.randomSnodePool.length; + // const noticeEvery = parseInt(total / 10, 10); + const loop = primitives.abortableIterator( + this.randomSnodePool, + async node => { + // count += 1; + try { + await this._getVersion(node); + /* + if (count % noticeEvery === 0) { + // give stats + const diff = Date.now() - verionStart; + log.debug( + `loki_snode:::_getAllVerionsForRandomSnodePool - ${count}/${total} pool version status update, has taken ${diff.toLocaleString()}ms` ); - if (consecutiveErrors < SEED_NODE_RETRIES) { - // retry after a possible delay - setTimeout(() => { - log.info( - 'loki_snodes:::refreshRandomPoolPromise - Retrying initialising random snode pool, try #', - consecutiveErrors - ); - trySeedNode(consecutiveErrors + 1); - }, consecutiveErrors * consecutiveErrors * 5000); - } else { - log.error( - 'loki_snodes:::refreshRandomPoolPromise - Giving up trying to contact seed node' + Object.keys(this.versionPools).forEach(version => { + const nodes = this.versionPools[version].length; + log.debug( + `loki_snode:::_getAllVerionsForRandomSnodePool - version ${version} has ${nodes.toLocaleString()} snodes` ); - if (snodes.length === 0) { - this.refreshRandomPoolPromise = null; // clear lock - if (timeoutTimer !== null) { - clearTimeout(timeoutTimer); - timeoutTimer = null; - } - reject(); - } - } + }); } - }; - const delay = (SEED_NODE_RETRIES + 1) * (SEED_NODE_RETRIES + 1) * 5000; - timeoutTimer = setTimeout(() => { - log.warn( - 'loki_snodes:::refreshRandomPoolPromise - TIMEDOUT after', - delay, - 's' + */ + } catch (e) { + log.error( + `loki_snode:::_getAllVerionsForRandomSnodePool - error`, + e.code, + e.message ); - reject(); - }, delay); - trySeedNode(); - }); - } - try { - await this.refreshRandomPoolPromise; - } catch (e) { - // we will throw for each time initialiseRandomPool has been called in parallel - log.error( - 'loki_snodes:::refreshRandomPoolPromise - error', - e.code, - e.message - ); - throw new window.textsecure.SeedNodeError('Failed to contact seed node'); - } - log.info('loki_snodes:::refreshRandomPoolPromise - RESOLVED'); - delete this.refreshRandomPoolPromise; // clear any lock + throw e; + } + } + ); + // make abortable accessible outside this scope + this.stopGetAllVersionPromiseControl = loop.stop; + await loop.start(true); + this.stopGetAllVersionPromiseControl = false; // clear lock + // an array of objects + const versions = this.randomSnodePool.reduce((curVal, node) => { + if (curVal.indexOf(node.version) === -1) { + curVal.push(node.version); + } + return curVal; + }, []); + log.debug( + `loki_snode:::_getAllVerionsForRandomSnodePool - ${ + versions.length + } versions retrieved from network!:`, + versions.join(',') + ); + } + + async refreshRandomPool(seedNodes = [...window.seedNodeList]) { + return primitives.allowOnlyOneAtATime('refreshRandomPool', async () => { + // are we running any _getAllVerionsForRandomSnodePool + if (this.stopGetAllVersionPromiseControl !== false) { + // we are, stop them + this.stopGetAllVersionPromiseControl(); + } + let snodes = []; + try { + snodes = await getSnodeListFromLokidSeednode(seedNodes); + // make sure order of the list is random, so we get version in a non-deterministic way + snodes = _.shuffle(snodes); + // commit changes to be live + // we'll update the version (in case they upgrade) every cycle + this.randomSnodePool = snodes.map(snode => ({ + ip: snode.public_ip, + port: snode.storage_port, + pubkey_x25519: snode.pubkey_x25519, + pubkey_ed25519: snode.pubkey_ed25519, + })); + log.info( + 'loki_snodes:::refreshRandomPool - Refreshed random snode pool with', + this.randomSnodePool.length, + 'snodes' + ); + // start polling versions but no need to await it + this._getAllVerionsForRandomSnodePool(); + } catch (e) { + log.warn('loki_snodes:::refreshRandomPool - error', e.code, e.message); + /* + log.error( + 'loki_snodes:::refreshRandomPoolPromise - Giving up trying to contact seed node' + ); + */ + if (snodes.length === 0) { + throw new window.textsecure.SeedNodeError( + 'Failed to contact seed node' + ); + } + } + return this.randomSnodePool; + }); } // unreachableNode.url is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode @@ -536,8 +606,7 @@ class LokiSnodeAPI { // keep all but thisNode const thisNode = node.address === unreachableNode.address && - node.ip === unreachableNode.ip && - node.port === unreachableNode.port; + compareSnodes(unreachableNode, node); if (thisNode) { found = true; } @@ -550,59 +619,57 @@ class LokiSnodeAPI { } has already been marked as bad` ); } - await conversation.updateSwarmNodes(filteredNodes); + try { + await conversation.updateSwarmNodes(filteredNodes); + } catch (e) { + log.error(`loki_snodes:::unreachableNode - error ${e.code} ${e.message}`); + throw e; + } return filteredNodes; } - markRandomNodeUnreachable(snode, options = {}) { - // avoid retries when we can't get the version because they're offline - if (!options.versionPoolFailure) { - const snodeVersion = this.versionMap[`${snode.ip}:${snode.port}`]; - if (this.versionPools[snodeVersion]) { - this.versionPools[snodeVersion] = _.without( - this.versionPools[snodeVersion], - snode - ); - } else { - if (snodeVersion) { - // reverse map (versionMap) is out of sync with versionPools - log.error( - 'loki_snode:::markRandomNodeUnreachable - No snodes for version', - snodeVersion, - 'retrying in 10s' - ); - } else { - // we don't know our version yet - // and if we're offline, we'll likely not get it until it restarts if it does... - log.warn( - 'loki_snode:::markRandomNodeUnreachable - No version for snode', - `${snode.ip}:${snode.port}`, - 'retrying in 10s' - ); - } - // make sure we don't retry past 15 mins (10s * 100 ~ 1000s) - const retries = options.retries || 0; - if (retries < 100) { - setTimeout(() => { - this.markRandomNodeUnreachable(snode, { - ...options, - retries: retries + 1, - }); - }, 10000); - } - } - } + markRandomNodeUnreachable(snode) { this.randomSnodePool = _.without(this.randomSnodePool, snode); } - async updateLastHash(snode, hash, expiresAt) { - await window.Signal.Data.updateLastHash({ snode, hash, expiresAt }); + async updateLastHash(snodeAddress, hash, expiresAt) { + // FIXME: handle rejections + await window.Signal.Data.updateLastHash({ + snode: snodeAddress, + hash, + expiresAt, + }); } - getSwarmNodesForPubKey(pubKey) { + // called by loki_message:::sendMessage & loki_message:::startLongPolling + async getSwarmNodesForPubKey(pubKey, options = {}) { + const { fetchHashes } = options; try { const conversation = ConversationController.get(pubKey); const swarmNodes = [...conversation.get('swarmNodes')]; + + // always? include lashHash + if (fetchHashes) { + await Promise.all( + Object.keys(swarmNodes).map(async j => { + const node = swarmNodes[j]; + // FIXME make a batch function call + const lastHash = await window.Signal.Data.getLastHashBySnode( + node.address + ); + log.debug( + `loki_snode:::getSwarmNodesForPubKey - ${j} ${node.ip}:${ + node.port + }` + ); + swarmNodes[j] = { + ...node, + lastHash, + }; + }) + ); + } + return swarmNodes; } catch (e) { throw new window.textsecure.ReplayableError({ @@ -618,39 +685,40 @@ class LokiSnodeAPI { await conversation.updateSwarmNodes(filteredNodes); return filteredNodes; } catch (e) { + log.error( + `loki_snodes:::updateSwarmNodes - error ${e.code} ${e.message}` + ); throw new window.textsecure.ReplayableError({ message: 'Could not get conversation', }); } } + // FIXME: in it's own PR, reorder functions: put _getFreshSwarmNodes and it's callee + // only loki_message::startLongPolling calls this... async refreshSwarmNodesForPubKey(pubKey) { - const newNodes = await this.getFreshSwarmNodes(pubKey); + // FIXME: handle rejections + const newNodes = await this._getFreshSwarmNodes(pubKey); const filteredNodes = this.updateSwarmNodes(pubKey, newNodes); return filteredNodes; } - async getFreshSwarmNodes(pubKey) { - if (!(pubKey in this.swarmsPendingReplenish)) { - this.swarmsPendingReplenish[pubKey] = new Promise(async resolve => { - let newSwarmNodes; - try { - newSwarmNodes = await this.getSwarmNodes(pubKey); - } catch (e) { - log.error( - 'loki_snodes:::getFreshSwarmNodes - error', - e.code, - e.message - ); - // TODO: Handle these errors sensibly - newSwarmNodes = []; - } - resolve(newSwarmNodes); - }); - } - const newSwarmNodes = await this.swarmsPendingReplenish[pubKey]; - delete this.swarmsPendingReplenish[pubKey]; - return newSwarmNodes; + async _getFreshSwarmNodes(pubKey) { + return primitives.allowOnlyOneAtATime(`swarmRefresh${pubKey}`, async () => { + let newSwarmNodes = []; + try { + newSwarmNodes = await this._getSwarmNodes(pubKey); + } catch (e) { + log.error( + 'loki_snodes:::_getFreshSwarmNodes - error', + e.code, + e.message + ); + // TODO: Handle these errors sensibly + newSwarmNodes = []; + } + return newSwarmNodes; + }); } // helper function @@ -687,8 +755,9 @@ class LokiSnodeAPI { const nameHash = dcodeIO.ByteBuffer.wrap(output).toString('base64'); // Get nodes capable of doing LNS - let lnsNodes = await this.getNodesMinVersion('2.0.3'); - lnsNodes = _.shuffle(lnsNodes); + const lnsNodes = this.getNodesMinVersion('2.0.3'); + // randomPool should already be shuffled + // lnsNodes = _.shuffle(lnsNodes); // Loop until 3 confirmations @@ -745,8 +814,11 @@ class LokiSnodeAPI { return pubkey; } - async getSnodesForPubkey(snode, pubKey) { + // get snodes for pubkey from random snode + async _getSnodesForPubkey(pubKey) { + let snode = {}; try { + snode = await this.getRandomSnodeAddress(); const result = await lokiRpc( `https://${snode.ip}`, snode.port, @@ -760,7 +832,7 @@ class LokiSnodeAPI { ); if (!result) { log.warn( - `loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${ + `loki_snode:::_getSnodesForPubkey - lokiRpc on ${snode.ip}:${ snode.port } returned falsish value`, result @@ -770,7 +842,7 @@ class LokiSnodeAPI { if (!result.snodes) { // we hit this when snode gives 500s log.warn( - `loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${ + `loki_snode:::_getSnodesForPubkey - lokiRpc on ${snode.ip}:${ snode.port } returned falsish value for snodes`, result @@ -783,7 +855,7 @@ class LokiSnodeAPI { this.markRandomNodeUnreachable(snode); const randomPoolRemainingCount = this.getRandomPoolLength(); log.error( - 'loki_snodes:::getSnodesForPubkey - error', + 'loki_snodes:::_getSnodesForPubkey - error', e.code, e.message, `for ${snode.ip}:${ @@ -794,19 +866,17 @@ class LokiSnodeAPI { } } - async getSwarmNodes(pubKey) { + async _getSwarmNodes(pubKey) { const snodes = []; + // creates a range: [0, 1, 2] const questions = [...Array(RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM).keys()]; + // FIXME: handle rejections await Promise.all( questions.map(async () => { // allow exceptions to pass through upwards - const rSnode = await this.getRandomSnodeAddress(); - const resList = await this.getSnodesForPubkey(rSnode, pubKey); - // should we only activate entries that are in all results? + const resList = await this._getSnodesForPubkey(pubKey); resList.map(item => { - const hasItem = snodes.some( - hItem => item.ip === hItem.ip && item.port === hItem.port - ); + const hasItem = snodes.some(n => compareSnodes(n, item)); if (!hasItem) { snodes.push(item); } @@ -814,6 +884,7 @@ class LokiSnodeAPI { }); }) ); + // should we only activate entries that are in all results? return snodes; } }