Merge pull request #1061 from neuroscr/patchopensnapps

snode communication refactor part 4
pull/1086/head
Ryan Tharp 6 years ago committed by GitHub
commit 167ae0e0bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -42,7 +42,7 @@ const sendToProxy = async (
) => {
if (!srvPubKey) {
log.error(
'loki_app_dot_net: sendToProxy called without a server public key'
'loki_app_dot_net:::sendToProxy - called without a server public key'
);
return {};
}
@ -145,7 +145,7 @@ const sendToProxy = async (
randSnode
);
log.warn(
`loki_app_dot_net: Marking random snode bad, internet address ${
`loki_app_dot_net:::sendToProxy - Marking random snode bad, internet address ${
randSnode.ip
}:${
randSnode.port
@ -161,7 +161,7 @@ const sendToProxy = async (
response = JSON.parse(txtResponse);
} catch (e) {
log.warn(
`loki_app_dot_net: sendToProxy Could not parse outer JSON [${txtResponse}]`,
`loki_app_dot_net:::sendToProxy - Could not parse outer JSON [${txtResponse}]`,
endpoint,
'on',
url
@ -185,7 +185,7 @@ const sendToProxy = async (
response = options.textResponse ? respStr : JSON.parse(respStr);
} catch (e) {
log.warn(
`loki_app_dot_net: sendToProxy Could not parse inner JSON [${respStr}]`,
`loki_app_dot_net:::sendToProxy - Could not parse inner JSON [${respStr}]`,
endpoint,
'on',
url
@ -193,7 +193,7 @@ const sendToProxy = async (
}
} else {
log.warn(
'loki_app_dot_net: file server secure_rpc gave an non-200 response: ',
'loki_app_dot_net:::sendToProxy - file server secure_rpc gave an non-200 response: ',
response,
` txtResponse[${txtResponse}]`,
endpoint
@ -239,7 +239,11 @@ const serverRequest = async (endpoint, options = {}) => {
fetchOptions.agent = snodeHttpsAgent;
}
} catch (e) {
log.info('serverRequest set up error:', e.code, e.message);
log.error(
'loki_app_dot_net:::serverRequest - set up error:',
e.code,
e.message
);
return {
err: e,
};
@ -257,10 +261,10 @@ const serverRequest = async (endpoint, options = {}) => {
FILESERVER_HOSTS.includes(host)
) {
mode = 'sendToProxy';
const search = url.search ? `?${url.search}` : '';
// url.search automatically includes the ? part
const search = url.search || '';
// strip first slash
const endpointWithQS = `${url.pathname}${search}`.replace(/^\//, '');
// log.info('endpointWithQS', endpointWithQS)
({ response, txtResponse, result } = await sendToProxy(
srvPubKey,
endpointWithQS,
@ -278,11 +282,16 @@ const serverRequest = async (endpoint, options = {}) => {
txtResponse = await result.text();
// cloudflare timeouts (504s) will be html...
response = options.textResponse ? txtResponse : JSON.parse(txtResponse);
// result.status will always be 200
// emulate the correct http code if available
if (response && response.meta && response.meta.code) {
result.status = response.meta.code;
}
}
} catch (e) {
if (txtResponse) {
log.info(
`serverRequest ${mode} error`,
log.error(
`loki_app_dot_net:::serverRequest - ${mode} error`,
e.code,
e.message,
`json: ${txtResponse}`,
@ -290,8 +299,8 @@ const serverRequest = async (endpoint, options = {}) => {
url
);
} else {
log.info(
`serverRequest ${mode} error`,
log.error(
`loki_app_dot_net:::serverRequest - ${mode} error`,
e.code,
e.message,
'attempting connection to',
@ -1609,12 +1618,12 @@ class LokiPublicChannelAPI {
if (res.err || !res.response) {
log.error(
'Could not get messages from',
`app_dot_net:::pollOnceForMessages - Could not get messages from`,
this.serverAPI.baseServerUrl,
this.baseChannelUrl
);
if (res.err) {
log.error('pollOnceForMessages receive error', res.err);
log.error(`app_dot_net:::pollOnceForMessages - receive error`, res.err);
}
this.messagesPollLock = false;
return;

@ -110,7 +110,7 @@ class LokiFileServerInstance {
return;
}
const validAuthorisations = authorisations.filter(
a => a && typeof auth === 'object'
a => a && typeof a === 'object'
);
await Promise.all(
validAuthorisations.map(async auth => {
@ -231,7 +231,7 @@ class LokiFileServerInstance {
Object.keys(newSlavePrimaryMap).forEach(slaveKey => {
if (newSlavePrimaryMap[slaveKey] === primaryPubKey) {
log.warn(
`removing unverifible ${slaveKey} to ${primaryPubKey} mapping`
`removing unverifiable ${slaveKey} to ${primaryPubKey} mapping`
);
delete newSlavePrimaryMap[slaveKey];
}

@ -4,15 +4,10 @@
const _ = require('lodash');
const { lokiRpc } = require('./loki_rpc');
const primitives = require('./loki_primitives');
const DEFAULT_CONNECTIONS = 3;
const MAX_ACCEPTABLE_FAILURES = 1;
function sleepFor(time) {
return new Promise(resolve => {
setTimeout(() => resolve(), time);
});
}
const MAX_ACCEPTABLE_FAILURES = 10;
const filterIncomingMessages = async messages => {
const incomingHashes = messages.map(m => m.hash);
@ -113,26 +108,10 @@ class LokiMessageAPI {
promises.push(connectionPromise);
}
// Taken from https://stackoverflow.com/questions/51160260/clean-way-to-wait-for-first-true-returned-by-promise
// The promise returned by this function will resolve true when the first promise
// in ps resolves true *or* it will resolve false when all of ps resolve false
const firstTrue = ps => {
const newPs = ps.map(
p =>
new Promise(
// eslint-disable-next-line more/no-then
(resolve, reject) => p.then(v => v && resolve(true), reject)
)
);
// eslint-disable-next-line more/no-then
newPs.push(Promise.all(ps).then(() => false));
return Promise.race(newPs);
};
let success;
let snode;
try {
// eslint-disable-next-line more/no-then
success = await firstTrue(promises);
snode = await primitives.firstTrue(promises);
} catch (e) {
if (e instanceof textsecure.WrongDifficultyError) {
// Force nonce recalculation
@ -145,33 +124,37 @@ class LokiMessageAPI {
}
throw e;
}
if (!success) {
if (!snode) {
throw new window.textsecure.EmptySwarmError(
pubKey,
'Ran out of swarm nodes to query'
);
}
log.info(
`loki_message:::sendMessage - Successfully stored message to ${pubKey}`
`loki_message:::sendMessage - Successfully stored message to ${pubKey} via ${
snode.ip
}:${snode.port}`
);
}
async refreshSendingSwarm(pubKey, timestamp) {
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes);
const freshNodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(pubKey);
this.sendingData[timestamp].swarm = freshNodes;
this.sendingData[timestamp].hasFreshList = true;
return true;
}
async _openSendConnection(params) {
// timestamp is likely the current second...
while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) {
const snode = this.sendingData[params.timestamp].swarm.shift();
// TODO: Revert back to using snode address instead of IP
const successfulSend = await this._sendToNode(snode, params);
if (successfulSend) {
return true;
return snode;
}
// should we mark snode as bad if it can't store our message?
}
if (!this.sendingData[params.timestamp].hasFreshList) {
@ -194,7 +177,13 @@ class LokiMessageAPI {
async _sendToNode(targetNode, params) {
let successiveFailures = 0;
while (successiveFailures < MAX_ACCEPTABLE_FAILURES) {
await sleepFor(successiveFailures * 500);
// the higher this is, the longer the user delay is
// we don't want to burn through all our retries quickly
// we need to give the node a chance to heal
// also failed the user quickly, just means they pound the retry faster
// this favors a lot more retries and lower delays
// but that may chew up the bandwidth...
await primitives.sleepFor(successiveFailures * 500);
try {
const result = await lokiRpc(
`https://${targetNode.ip}`,
@ -208,10 +197,11 @@ class LokiMessageAPI {
// do not return true if we get false here...
if (result === false) {
// this means the node we asked for is likely down
log.warn(
`loki_message:::_sendToNode - Got false from ${targetNode.ip}:${
targetNode.port
}`
`loki_message:::_sendToNode - Try #${successiveFailures}/${MAX_ACCEPTABLE_FAILURES} ${
targetNode.ip
}:${targetNode.port} failed`
);
successiveFailures += 1;
// eslint-disable-next-line no-continue
@ -273,7 +263,8 @@ class LokiMessageAPI {
return false;
}
async _openRetrieveConnection(stopPollingPromise, callback) {
async _openRetrieveConnection(pSwarmPool, stopPollingPromise, callback) {
const swarmPool = pSwarmPool; // lint
let stopPollingResult = false;
// When message_receiver restarts from onoffline/ononline events it closes
@ -285,10 +276,10 @@ class LokiMessageAPI {
stopPollingResult = result;
});
while (!stopPollingResult && !_.isEmpty(this.ourSwarmNodes)) {
const address = Object.keys(this.ourSwarmNodes)[0];
const nodeData = this.ourSwarmNodes[address];
delete this.ourSwarmNodes[address];
while (!stopPollingResult && !_.isEmpty(swarmPool)) {
const address = Object.keys(swarmPool)[0]; // X.snode hostname
const nodeData = swarmPool[address];
delete swarmPool[address];
let successiveFailures = 0;
while (
!stopPollingResult &&
@ -300,6 +291,7 @@ class LokiMessageAPI {
// so the user facing UI can report unhandled errors
// except in this case of living inside http-resource pollServer
// because it just restarts more connections...
let messages = await this._retrieveNextMessages(nodeData);
// this only tracks retrieval failures
// won't include parsing failures...
@ -328,11 +320,13 @@ class LokiMessageAPI {
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm);
// FIXME: restart all openRetrieves when this happens...
// FIXME: lokiSnode should handle this
for (let i = 0; i < newSwarm.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode(
newSwarm[i]
);
this.ourSwarmNodes[newSwarm[i]] = {
swarmPool[newSwarm[i]] = {
lastHash,
};
}
@ -348,7 +342,7 @@ class LokiMessageAPI {
}
// Always wait a bit as we are no longer long-polling
await sleepFor(Math.max(successiveFailures, 2) * 1000);
await primitives.sleepFor(Math.max(successiveFailures, 2) * 1000);
}
if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
@ -359,15 +353,15 @@ class LokiMessageAPI {
`loki_message:::_openRetrieveConnection - too many successive failures, removing ${
nodeData.ip
}:${nodeData.port} from our swarm pool. We have ${
Object.keys(this.ourSwarmNodes).length
} usable swarm nodes left (${
Object.keys(swarmPool).length
} usable swarm nodes left for our connection (${
remainingSwarmSnodes.length
} in local db)`
);
}
}
// if not stopPollingResult
if (_.isEmpty(this.ourSwarmNodes)) {
if (_.isEmpty(swarmPool)) {
log.error(
'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection'
);
@ -402,7 +396,7 @@ class LokiMessageAPI {
if (result === false) {
// make a note of it because of caller doesn't care...
log.warn(
`loki_message:::_retrieveNextMessages - lokiRpc returned false to ${
`loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${
nodeData.ip
}:${nodeData.port}`
);
@ -413,9 +407,10 @@ class LokiMessageAPI {
// we don't throw or catch here
async startLongPolling(numConnections, stopPolling, callback) {
this.ourSwarmNodes = {};
// load from local DB
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey, {
fetchHashes: true,
});
if (nodes.length < numConnections) {
log.warn(
'loki_message:::startLongPolling - Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
@ -436,19 +431,21 @@ class LokiMessageAPI {
'for',
this.ourKey
);
Object.keys(nodes).forEach(j => {
const node = nodes[j];
log.info(`loki_message: ${j} ${node.ip}:${node.port}`);
});
for (let i = 0; i < nodes.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode(
nodes[i].address
);
this.ourSwarmNodes[nodes[i].address] = {
...nodes[i],
lastHash,
};
// ok now split up our swarm pool into numConnections number of pools
// one for each retrieve connection
// floor or ceil probably doesn't matter, since it's likely always uneven
const poolSize = Math.floor(nodes.length / numConnections);
const pools = [];
while (nodes.length) {
const poolList = nodes.splice(0, poolSize);
const byAddressObj = poolList.reduce((result, node) => {
// eslint-disable-next-line no-param-reassign
result[node.address] = node;
return result;
}, {});
pools.push(byAddressObj);
}
const promises = [];
@ -457,14 +454,15 @@ class LokiMessageAPI {
for (let i = 0; i < numConnections; i += 1) {
promises.push(
// eslint-disable-next-line more/no-then
this._openRetrieveConnection(stopPolling, callback).then(() => {
unresolved -= 1;
log.info(
'loki_message:::startLongPolling - There are',
unresolved,
'open retrieve connections left'
);
})
this._openRetrieveConnection(pools[i], stopPolling, callback).then(
stoppedPolling => {
unresolved -= 1;
log.info(
`loki_message:::startLongPolling - There are ${unresolved}`,
`open retrieve connections left. Stopped? ${stoppedPolling}`
);
}
)
);
}
@ -472,7 +470,7 @@ class LokiMessageAPI {
// less than numConnections being active is fine, only need to restart if none per Niels 20/02/13
// or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises);
log.error(
log.warn(
'loki_message:::startLongPolling - All our long poll swarm connections have been removed'
);
// should we just call ourself again?

@ -0,0 +1,149 @@
/* global clearTimeout */
// was timeoutDelay
const sleepFor = ms => new Promise(resolve => setTimeout(resolve, ms));
let log;
function configure(options = {}) {
({ log } = options);
}
// Taken from https://stackoverflow.com/questions/51160260/clean-way-to-wait-for-first-true-returned-by-promise
// The promise returned by this function will resolve true when the first promise
// in ps resolves true *or* it will resolve false when all of ps resolve false
const firstTrue = ps => {
const newPs = ps.map(
p =>
new Promise(
// eslint-disable-next-line more/no-then
(resolve, reject) => p.then(v => v && resolve(v), reject)
)
);
// eslint-disable-next-line more/no-then
newPs.push(Promise.all(ps).then(() => false));
return Promise.race(newPs);
};
// one action resolves all
const snodeGlobalLocks = {};
async function allowOnlyOneAtATime(name, process, timeout) {
// if currently not in progress
if (snodeGlobalLocks[name] === undefined) {
// set lock
snodeGlobalLocks[name] = new Promise(async (resolve, reject) => {
// set up timeout feature
let timeoutTimer = null;
if (timeout) {
timeoutTimer = setTimeout(() => {
log.warn(
`loki_primitives:::allowOnlyOneAtATime - TIMEDOUT after ${timeout}s`
);
delete snodeGlobalLocks[name]; // clear lock
reject();
}, timeout);
}
// do actual work
let innerRetVal;
try {
innerRetVal = await process();
} catch (e) {
log.error(
`loki_primitives:::allowOnlyOneAtATime - error ${e.code} ${e.message}`
);
// clear timeout timer
if (timeout) {
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
}
delete snodeGlobalLocks[name]; // clear lock
throw e;
}
// clear timeout timer
if (timeout) {
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
}
delete snodeGlobalLocks[name]; // clear lock
// release the kraken
resolve(innerRetVal);
});
}
let outerRetval;
// handle any timeouts
try {
outerRetval = await snodeGlobalLocks[name];
} catch (e) {
// we will throw for each time allowOnlyOneAtATime has been called in parallel
log.error(
'loki_primitives:::allowOnlyOneAtATime - error',
e.code,
e.message
);
throw e;
}
return outerRetval;
}
function abortableIterator(array, iterator) {
let abortIteration = false;
// for the control promise
let controlResolveFunctor;
const stopPolling = new Promise(res => {
// store resolve functor
controlResolveFunctor = res;
});
// eslint-disable-next-line more/no-then
stopPolling.then(() => {
abortIteration = true;
});
const destructableList = [...array];
const accum = [];
return {
start: async serially => {
let item = destructableList.pop();
while (item && !abortIteration) {
// console.log('iterating on item', item);
if (serially) {
try {
// eslint-disable-next-line no-await-in-loop
accum.push(await iterator(item));
} catch (e) {
log.error(
`loki_primitives:::abortableIterator - error ${e.code} ${
e.message
}`
);
throw e;
}
} else {
accum.push(iterator(item));
}
item = destructableList.pop();
}
return accum;
},
stop: () => {
/*
log.debug('loki_primitives:::abortableIterator - Stopping',
destructableList.length, '+', accum.length, '=', array.length,
'aborted?', abortIteration);
*/
controlResolveFunctor();
},
};
}
module.exports = {
configure,
sleepFor,
allowOnlyOneAtATime,
abortableIterator,
firstTrue,
};

@ -72,7 +72,9 @@ class LokiPublicChatFactoryAPI extends EventEmitter {
log.warn(`Invalid server ${serverUrl}`);
return null;
}
log.info(`set token ${thisServer.token} for ${serverUrl}`);
if (window.isDev) {
log.info(`set token ${thisServer.token} for ${serverUrl}`);
}
this.servers.push(thisServer);
}

@ -3,6 +3,7 @@
const nodeFetch = require('node-fetch');
const https = require('https');
const primitives = require('./loki_primitives');
const snodeHttpsAgent = new https.Agent({
rejectUnauthorized: false,
@ -13,8 +14,6 @@ const endpointBase = '/storage_rpc/v1';
// Request index for debugging
let onionReqIdx = 0;
const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms));
const encryptForNode = async (node, payload) => {
const textEncoder = new TextEncoder();
const plaintext = textEncoder.encode(payload);
@ -195,7 +194,8 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
let snodePool = await lokiSnodeAPI.getRandomSnodePool();
if (snodePool.length < 2) {
log.error(
// this is semi-normal to happen
log.info(
'lokiRpc::sendToProxy - Not enough service nodes for a proxy request, only have:',
snodePool.length,
'snode, attempting refresh'
@ -277,7 +277,31 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
return sendToProxy(options, targetNode, retryNumber + 1);
}
// 504 is only present in 2.0.3 and after
// relay is fine but destination is not good
if (response.status === 504) {
const pRetryNumber = retryNumber + 1;
if (pRetryNumber > 3) {
log.warn(
`lokiRpc:::sendToProxy - snode ${randSnode.ip}:${randSnode.port}`,
`can not relay to target node ${targetNode.ip}:${targetNode.port}`,
`after 3 retries`
);
if (options.ourPubKey) {
lokiSnodeAPI.unreachableNode(options.ourPubKey, targetNode);
}
return false;
}
// we don't have to wait here
// because we're not marking the random snode bad
// grab a fresh random one
return sendToProxy(options, targetNode, pRetryNumber);
}
// detect SNode is not ready (not in swarm; not done syncing)
// 503 can be proxy target or destination in pre 2.0.3
// 2.0.3 and after means target
if (response.status === 503 || response.status === 500) {
// this doesn't mean the random node is bad, it could be the target node
// but we got a ton of randomPool nodes, let's just not worry about this one
@ -315,7 +339,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
// 500 burns through a node too fast,
// let's slow the retry to give it more time to recover
if (response.status === 500) {
await timeoutDelay(5000);
await primitives.sleepFor(5000);
}
return sendToProxy(options, targetNode, pRetryNumber);
}
@ -387,10 +411,17 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
// emulate nodeFetch response...
jsonRes.json = () => {
try {
if (jsonRes.body === 'Timestamp error: check your clock') {
log.error(
`lokiRpc:::sendToProxy - Timestamp error: check your clock`,
Date.now()
);
return false;
}
return JSON.parse(jsonRes.body);
} catch (e) {
log.error(
'lokiRpc:::sendToProxy - parse error',
'lokiRpc:::sendToProxy - (inner) parse error',
e.code,
e.message,
`from ${randSnode.ip}:${randSnode.port} json:`,
@ -400,7 +431,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
return false;
};
if (retryNumber) {
log.info(
log.debug(
`lokiRpc:::sendToProxy - request succeeded,`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
@ -411,7 +442,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
return jsonRes;
} catch (e) {
log.error(
'lokiRpc:::sendToProxy - parse error',
'lokiRpc:::sendToProxy - (outer) parse error',
e.code,
e.message,
`from ${randSnode.ip}:${randSnode.port} json:`,
@ -475,12 +506,19 @@ const lokiFetch = async (url, options = {}, targetNode = null) => {
const result = await sendToProxy(fetchOptions, targetNode);
if (result === false) {
// should we retry?
log.warn(`lokiRpc:::lokiFetch - sendToProxy returned false`);
// even though we can't be sure our caller is going to log or handle the failure
// we do know that sendToProxy should be logging
// so I don't think we need or want a log item here...
// log.warn(`lokiRpc:::lokiFetch - sendToProxy failed`);
// one case is:
// snodePool didn't have enough
// even after a refresh
// likely a network disconnect?
// but not all cases...
// another is:
// failure to send to target node after 3 retries
// what else?
/*
log.warn(
'lokiRpc:::lokiFetch - useSnodeProxy failure, could not refresh randomPool, offline?'
@ -498,7 +536,7 @@ const lokiFetch = async (url, options = {}, targetNode = null) => {
fetchOptions.agent = snodeHttpsAgent;
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
} else {
log.info('lokirpc:::lokiFetch - http communication', url);
log.debug('lokirpc:::lokiFetch - http communication', url);
}
const response = await nodeFetch(url, fetchOptions);
// restore TLS checking

@ -1,8 +1,11 @@
/* eslint-disable class-methods-use-this */
/* global window, textsecure, ConversationController, _, log, clearTimeout, process, Buffer, StringView, dcodeIO */
/* global window, textsecure, ConversationController, _, log, process, Buffer, StringView, dcodeIO */
const is = require('@sindresorhus/is');
const { lokiRpc } = require('./loki_rpc');
// not sure I like this name but it's been than util
const primitives = require('./loki_primitives');
const is = require('@sindresorhus/is');
const https = require('https');
const nodeFetch = require('node-fetch');
const semver = require('semver');
@ -13,8 +16,94 @@ const snodeHttpsAgent = new https.Agent({
const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3;
const SEED_NODE_RETRIES = 3;
const SNODE_VERSION_RETRIES = 3;
const compareSnodes = (current, search) =>
current.pubkey_ed25519 === search.pubkey_ed25519;
// just get the filtered list
async function tryGetSnodeListFromLokidSeednode(
seedNodes = [...window.seedNodeList]
) {
// Removed limit until there is a way to get snode info
// for individual nodes (needed for guard nodes); this way
// we get all active nodes
const params = {
active_only: true,
fields: {
public_ip: true,
storage_port: true,
pubkey_x25519: true,
pubkey_ed25519: true,
},
};
// FIXME: use sample
const seedNode = seedNodes.splice(
Math.floor(Math.random() * seedNodes.length),
1
)[0];
let snodes = [];
try {
const response = await lokiRpc(
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
// throw before clearing the lock, so the retries can kick in
if (snodes.length === 0) {
// does this error message need to be exactly this?
throw new window.textsecure.SeedNodeError('Failed to contact seed node');
}
return snodes;
} catch (e) {
log.warn(
'loki_snodes:::tryGetSnodeListFromLokidSeednode - error',
e.code,
e.message
);
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError('Failed to contact seed node');
}
}
return [];
}
const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms));
async function getSnodeListFromLokidSeednode(
seedNodes = [...window.seedNodeList],
retries = 0
) {
let snodes = [];
try {
snodes = await tryGetSnodeListFromLokidSeednode(seedNodes);
} catch (e) {
log.warn(
'loki_snodes:::getSnodeListFromLokidSeednode - error',
e.code,
e.message
);
// handle retries in case of temporary hiccups
if (retries < SEED_NODE_RETRIES) {
setTimeout(() => {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Retrying initialising random snode pool, try #',
retries
);
getSnodeListFromLokidSeednode(seedNodes, retries + 1);
}, retries * retries * 5000);
} else {
log.error('loki_snodes:::getSnodeListFromLokidSeednode - failing');
throw new window.textsecure.SeedNodeError('Failed to contact seed node');
}
}
return snodes;
}
class LokiSnodeAPI {
constructor({ serverUrl, localUrl }) {
@ -25,10 +114,7 @@ class LokiSnodeAPI {
this.localUrl = localUrl; // localhost.loki
this.randomSnodePool = [];
this.swarmsPendingReplenish = {};
this.refreshRandomPoolPromise = false;
this.versionPools = {};
this.versionMap = {}; // reverse version look up
this.versionsRetrieved = false; // to mark when it's done getting versions
this.stopGetAllVersionPromiseControl = false;
this.onionPaths = [];
this.guardNodes = [];
@ -36,7 +122,12 @@ class LokiSnodeAPI {
async getRandomSnodePool() {
if (this.randomSnodePool.length === 0) {
await this.refreshRandomPool();
// allow exceptions to pass through upwards without the unhandled promise rejection
try {
await this.refreshRandomPool();
} catch (e) {
throw e;
}
}
return this.randomSnodePool;
}
@ -97,8 +188,8 @@ class LokiSnodeAPI {
async selectGuardNodes() {
const _ = window.Lodash;
// FIXME: handle rejections
let nodePool = await this.getRandomSnodePool();
if (nodePool.length === 0) {
log.error(`Could not select guarn nodes: node pool is empty`);
return [];
@ -127,7 +218,6 @@ class LokiSnodeAPI {
return [];
}
}
// The use of await inside while is intentional:
// we only want to repeat if the await fails
// eslint-disable-next-line-no-await-in-loop
@ -274,13 +364,17 @@ class LokiSnodeAPI {
}
async getRandomSnodeAddress() {
/* resolve random snode */
if (this.randomSnodePool.length === 0) {
// allow exceptions to pass through upwards
await this.refreshRandomPool();
}
// resolve random snode
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
// allow exceptions to pass through upwards without the unhandled promise rejection
try {
await this.refreshRandomPool();
} catch (e) {
throw e;
}
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}
}
// FIXME: _.sample?
return this.randomSnodePool[
@ -288,43 +382,51 @@ class LokiSnodeAPI {
];
}
async getNodesMinVersion(minVersion) {
const _ = window.Lodash;
return _.flatten(
_.entries(this.versionPools)
.filter(v => semver.gte(v[0], minVersion))
.map(v => v[1])
// not cacheable because we write to this.randomSnodePool elsewhere
getNodesMinVersion(minVersion) {
return this.randomSnodePool.filter(
node => node.version && semver.gt(node.version, minVersion)
);
}
// use nodes that support more than 1mb
async getRandomProxySnodeAddress() {
/* resolve random snode */
// resolve random snode
if (this.randomSnodePool.length === 0) {
// allow exceptions to pass through upwards
await this.refreshRandomPool();
}
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
// allow exceptions to pass through upwards without the unhandled promise rejection
try {
await this.refreshRandomPool();
} catch (e) {
log.error(
`loki_snode:::getRandomProxySnodeAddress - error ${e.code} ${
e.message
}`
);
throw e;
}
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}
}
const goodVersions = Object.keys(this.versionPools).filter(version =>
semver.gt(version, '2.0.1')
);
if (!goodVersions.length) {
const goodPool = this.getNodesMinVersion('2.0.1');
if (!goodPool.length) {
// FIXME: retry
log.warn(
`loki_snode:::getRandomProxySnodeAddress - no good versions yet`
);
return false;
}
// FIXME: _.sample?
const goodVersion =
goodVersions[Math.floor(Math.random() * goodVersions.length)];
const pool = this.versionPools[goodVersion];
// FIXME: _.sample?
return pool[Math.floor(Math.random() * pool.length)];
const goodRandomNode =
goodPool[Math.floor(Math.random() * goodPool.length)];
return goodRandomNode;
}
// WARNING: this leaks our IP to all snodes but with no other identifying information
// except that a client started up or ran out of random pool snodes
async getVersion(node) {
// except "that a client started up" or "ran out of random pool snodes"
// and the order of the list is randomized, so a snode can't tell if it just started or not
async _getVersion(node, options = {}) {
const retries = options.retries || 0;
try {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
const result = await nodeFetch(
@ -334,191 +436,159 @@ class LokiSnodeAPI {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
const data = await result.json();
if (data.version) {
if (this.versionPools[data.version] === undefined) {
this.versionPools[data.version] = [node];
const foundNodeIdx = this.randomSnodePool.findIndex(n =>
compareSnodes(n, node)
);
if (foundNodeIdx !== -1) {
this.randomSnodePool[foundNodeIdx].version = data.version;
} else {
this.versionPools[data.version].push(node);
// maybe already marked bad...
log.debug(
`loki_snode:::_getVersion - can't find ${node.ip}:${
node.port
} in randomSnodePool`
);
}
// set up reverse mapping for removal lookup
this.versionMap[`${node.ip}:${node.port}`] = data.version;
}
return data.version;
} catch (e) {
// ECONNREFUSED likely means it's just offline...
// ECONNRESET seems to retry and fail as ECONNREFUSED (so likely a node going offline)
// ETIMEDOUT not sure what to do about these
// retry for now but maybe we should be marking bad...
if (e.code === 'ECONNREFUSED') {
this.markRandomNodeUnreachable(node, { versionPoolFailure: true });
this.markRandomNodeUnreachable(node);
const randomNodesLeft = this.getRandomPoolLength();
// clean up these error messages to be a little neater
log.warn(
`loki_snode:::getVersion - ${node.ip}:${
`loki_snode:::_getVersion - ${node.ip}:${
node.port
} is offline, removing, leaving ${randomNodesLeft} in the randomPool`
);
} else {
// mostly ECONNRESETs
// if not ECONNREFUSED, it's mostly ECONNRESETs
// ENOTFOUND could mean no internet or hiccup
} else if (retries < SNODE_VERSION_RETRIES) {
log.warn(
'loki_snode:::getVersion - Error',
'loki_snode:::_getVersion - Error',
e.code,
e.message,
`on ${node.ip}:${node.port} retrying in 1s`
);
await timeoutDelay(1000);
await this.getVersion(node);
await primitives.sleepFor(1000);
return this._getVersion(node, { ...options, retries: retries + 1 });
} else {
this.markRandomNodeUnreachable(node);
const randomNodesLeft = this.getRandomPoolLength();
log.warn(
`loki_snode:::_getVersion - failing to get version for ${node.ip}:${
node.port
}, removing, leaving ${randomNodesLeft} in the randomPool`
);
}
// maybe throw?
return false;
}
}
async refreshRandomPool(seedNodes = [...window.seedNodeList]) {
// if currently not in progress
if (this.refreshRandomPoolPromise === false) {
// set lock
this.refreshRandomPoolPromise = new Promise(async (resolve, reject) => {
let timeoutTimer = null;
// private retry container
const trySeedNode = async (consecutiveErrors = 0) => {
// Removed limit until there is a way to get snode info
// for individual nodes (needed for guard nodes); this way
// we get all active nodes
const params = {
active_only: true,
fields: {
public_ip: true,
storage_port: true,
pubkey_x25519: true,
pubkey_ed25519: true,
},
};
const seedNode = seedNodes.splice(
Math.floor(Math.random() * seedNodes.length),
1
)[0];
let snodes = [];
try {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshing random snode pool'
);
const response = await lokiRpc(
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
// make sure order of the list is random, so we get version in a non-deterministic way
snodes = _.shuffle(snodes);
// commit changes to be live
// we'll update the version (in case they upgrade) every cycle
this.versionPools = {};
this.versionsRetrieved = false;
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshed random snode pool with',
this.randomSnodePool.length,
'snodes'
);
// clear lock
this.refreshRandomPoolPromise = null;
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
// start polling versions
resolve();
// now get version for all snodes
// also acts an early online test/purge of bad nodes
let c = 0;
const verionStart = Date.now();
const t = this.randomSnodePool.length;
const noticeEvery = parseInt(t / 10, 10);
// eslint-disable-next-line no-restricted-syntax
for (const node of this.randomSnodePool) {
c += 1;
// eslint-disable-next-line no-await-in-loop
await this.getVersion(node);
if (c % noticeEvery === 0) {
// give stats
const diff = Date.now() - verionStart;
log.info(
`${c}/${t} pool version status update, has taken ${diff.toLocaleString()}ms`
);
Object.keys(this.versionPools).forEach(version => {
const nodes = this.versionPools[version].length;
log.info(
`version ${version} has ${nodes.toLocaleString()} snodes`
);
});
}
}
log.info('Versions retrieved from network!');
this.versionsRetrieved = true;
} catch (e) {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - error',
e.code,
e.message
// now get version for all snodes
// also acts an early online test/purge of bad nodes
async _getAllVerionsForRandomSnodePool() {
// let count = 0;
// const verionStart = Date.now();
// const total = this.randomSnodePool.length;
// const noticeEvery = parseInt(total / 10, 10);
const loop = primitives.abortableIterator(
this.randomSnodePool,
async node => {
// count += 1;
try {
await this._getVersion(node);
/*
if (count % noticeEvery === 0) {
// give stats
const diff = Date.now() - verionStart;
log.debug(
`loki_snode:::_getAllVerionsForRandomSnodePool - ${count}/${total} pool version status update, has taken ${diff.toLocaleString()}ms`
);
if (consecutiveErrors < SEED_NODE_RETRIES) {
// retry after a possible delay
setTimeout(() => {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Retrying initialising random snode pool, try #',
consecutiveErrors
);
trySeedNode(consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
log.error(
'loki_snodes:::refreshRandomPoolPromise - Giving up trying to contact seed node'
Object.keys(this.versionPools).forEach(version => {
const nodes = this.versionPools[version].length;
log.debug(
`loki_snode:::_getAllVerionsForRandomSnodePool - version ${version} has ${nodes.toLocaleString()} snodes`
);
if (snodes.length === 0) {
this.refreshRandomPoolPromise = null; // clear lock
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
reject();
}
}
});
}
};
const delay = (SEED_NODE_RETRIES + 1) * (SEED_NODE_RETRIES + 1) * 5000;
timeoutTimer = setTimeout(() => {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - TIMEDOUT after',
delay,
's'
*/
} catch (e) {
log.error(
`loki_snode:::_getAllVerionsForRandomSnodePool - error`,
e.code,
e.message
);
reject();
}, delay);
trySeedNode();
});
}
try {
await this.refreshRandomPoolPromise;
} catch (e) {
// we will throw for each time initialiseRandomPool has been called in parallel
log.error(
'loki_snodes:::refreshRandomPoolPromise - error',
e.code,
e.message
);
throw new window.textsecure.SeedNodeError('Failed to contact seed node');
}
log.info('loki_snodes:::refreshRandomPoolPromise - RESOLVED');
delete this.refreshRandomPoolPromise; // clear any lock
throw e;
}
}
);
// make abortable accessible outside this scope
this.stopGetAllVersionPromiseControl = loop.stop;
await loop.start(true);
this.stopGetAllVersionPromiseControl = false; // clear lock
// an array of objects
const versions = this.randomSnodePool.reduce((curVal, node) => {
if (curVal.indexOf(node.version) === -1) {
curVal.push(node.version);
}
return curVal;
}, []);
log.debug(
`loki_snode:::_getAllVerionsForRandomSnodePool - ${
versions.length
} versions retrieved from network!:`,
versions.join(',')
);
}
async refreshRandomPool(seedNodes = [...window.seedNodeList]) {
return primitives.allowOnlyOneAtATime('refreshRandomPool', async () => {
// are we running any _getAllVerionsForRandomSnodePool
if (this.stopGetAllVersionPromiseControl !== false) {
// we are, stop them
this.stopGetAllVersionPromiseControl();
}
let snodes = [];
try {
snodes = await getSnodeListFromLokidSeednode(seedNodes);
// make sure order of the list is random, so we get version in a non-deterministic way
snodes = _.shuffle(snodes);
// commit changes to be live
// we'll update the version (in case they upgrade) every cycle
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
log.info(
'loki_snodes:::refreshRandomPool - Refreshed random snode pool with',
this.randomSnodePool.length,
'snodes'
);
// start polling versions but no need to await it
this._getAllVerionsForRandomSnodePool();
} catch (e) {
log.warn('loki_snodes:::refreshRandomPool - error', e.code, e.message);
/*
log.error(
'loki_snodes:::refreshRandomPoolPromise - Giving up trying to contact seed node'
);
*/
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError(
'Failed to contact seed node'
);
}
}
return this.randomSnodePool;
});
}
// unreachableNode.url is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode
@ -536,8 +606,7 @@ class LokiSnodeAPI {
// keep all but thisNode
const thisNode =
node.address === unreachableNode.address &&
node.ip === unreachableNode.ip &&
node.port === unreachableNode.port;
compareSnodes(unreachableNode, node);
if (thisNode) {
found = true;
}
@ -550,59 +619,57 @@ class LokiSnodeAPI {
} has already been marked as bad`
);
}
await conversation.updateSwarmNodes(filteredNodes);
try {
await conversation.updateSwarmNodes(filteredNodes);
} catch (e) {
log.error(`loki_snodes:::unreachableNode - error ${e.code} ${e.message}`);
throw e;
}
return filteredNodes;
}
markRandomNodeUnreachable(snode, options = {}) {
// avoid retries when we can't get the version because they're offline
if (!options.versionPoolFailure) {
const snodeVersion = this.versionMap[`${snode.ip}:${snode.port}`];
if (this.versionPools[snodeVersion]) {
this.versionPools[snodeVersion] = _.without(
this.versionPools[snodeVersion],
snode
);
} else {
if (snodeVersion) {
// reverse map (versionMap) is out of sync with versionPools
log.error(
'loki_snode:::markRandomNodeUnreachable - No snodes for version',
snodeVersion,
'retrying in 10s'
);
} else {
// we don't know our version yet
// and if we're offline, we'll likely not get it until it restarts if it does...
log.warn(
'loki_snode:::markRandomNodeUnreachable - No version for snode',
`${snode.ip}:${snode.port}`,
'retrying in 10s'
);
}
// make sure we don't retry past 15 mins (10s * 100 ~ 1000s)
const retries = options.retries || 0;
if (retries < 100) {
setTimeout(() => {
this.markRandomNodeUnreachable(snode, {
...options,
retries: retries + 1,
});
}, 10000);
}
}
}
markRandomNodeUnreachable(snode) {
this.randomSnodePool = _.without(this.randomSnodePool, snode);
}
async updateLastHash(snode, hash, expiresAt) {
await window.Signal.Data.updateLastHash({ snode, hash, expiresAt });
async updateLastHash(snodeAddress, hash, expiresAt) {
// FIXME: handle rejections
await window.Signal.Data.updateLastHash({
snode: snodeAddress,
hash,
expiresAt,
});
}
getSwarmNodesForPubKey(pubKey) {
// called by loki_message:::sendMessage & loki_message:::startLongPolling
async getSwarmNodesForPubKey(pubKey, options = {}) {
const { fetchHashes } = options;
try {
const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')];
// always? include lashHash
if (fetchHashes) {
await Promise.all(
Object.keys(swarmNodes).map(async j => {
const node = swarmNodes[j];
// FIXME make a batch function call
const lastHash = await window.Signal.Data.getLastHashBySnode(
node.address
);
log.debug(
`loki_snode:::getSwarmNodesForPubKey - ${j} ${node.ip}:${
node.port
}`
);
swarmNodes[j] = {
...node,
lastHash,
};
})
);
}
return swarmNodes;
} catch (e) {
throw new window.textsecure.ReplayableError({
@ -618,39 +685,40 @@ class LokiSnodeAPI {
await conversation.updateSwarmNodes(filteredNodes);
return filteredNodes;
} catch (e) {
log.error(
`loki_snodes:::updateSwarmNodes - error ${e.code} ${e.message}`
);
throw new window.textsecure.ReplayableError({
message: 'Could not get conversation',
});
}
}
// FIXME: in it's own PR, reorder functions: put _getFreshSwarmNodes and it's callee
// only loki_message::startLongPolling calls this...
async refreshSwarmNodesForPubKey(pubKey) {
const newNodes = await this.getFreshSwarmNodes(pubKey);
// FIXME: handle rejections
const newNodes = await this._getFreshSwarmNodes(pubKey);
const filteredNodes = this.updateSwarmNodes(pubKey, newNodes);
return filteredNodes;
}
async getFreshSwarmNodes(pubKey) {
if (!(pubKey in this.swarmsPendingReplenish)) {
this.swarmsPendingReplenish[pubKey] = new Promise(async resolve => {
let newSwarmNodes;
try {
newSwarmNodes = await this.getSwarmNodes(pubKey);
} catch (e) {
log.error(
'loki_snodes:::getFreshSwarmNodes - error',
e.code,
e.message
);
// TODO: Handle these errors sensibly
newSwarmNodes = [];
}
resolve(newSwarmNodes);
});
}
const newSwarmNodes = await this.swarmsPendingReplenish[pubKey];
delete this.swarmsPendingReplenish[pubKey];
return newSwarmNodes;
async _getFreshSwarmNodes(pubKey) {
return primitives.allowOnlyOneAtATime(`swarmRefresh${pubKey}`, async () => {
let newSwarmNodes = [];
try {
newSwarmNodes = await this._getSwarmNodes(pubKey);
} catch (e) {
log.error(
'loki_snodes:::_getFreshSwarmNodes - error',
e.code,
e.message
);
// TODO: Handle these errors sensibly
newSwarmNodes = [];
}
return newSwarmNodes;
});
}
// helper function
@ -687,8 +755,9 @@ class LokiSnodeAPI {
const nameHash = dcodeIO.ByteBuffer.wrap(output).toString('base64');
// Get nodes capable of doing LNS
let lnsNodes = await this.getNodesMinVersion('2.0.3');
lnsNodes = _.shuffle(lnsNodes);
const lnsNodes = this.getNodesMinVersion('2.0.3');
// randomPool should already be shuffled
// lnsNodes = _.shuffle(lnsNodes);
// Loop until 3 confirmations
@ -745,8 +814,11 @@ class LokiSnodeAPI {
return pubkey;
}
async getSnodesForPubkey(snode, pubKey) {
// get snodes for pubkey from random snode
async _getSnodesForPubkey(pubKey) {
let snode = {};
try {
snode = await this.getRandomSnodeAddress();
const result = await lokiRpc(
`https://${snode.ip}`,
snode.port,
@ -760,7 +832,7 @@ class LokiSnodeAPI {
);
if (!result) {
log.warn(
`loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${
`loki_snode:::_getSnodesForPubkey - lokiRpc on ${snode.ip}:${
snode.port
} returned falsish value`,
result
@ -770,7 +842,7 @@ class LokiSnodeAPI {
if (!result.snodes) {
// we hit this when snode gives 500s
log.warn(
`loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${
`loki_snode:::_getSnodesForPubkey - lokiRpc on ${snode.ip}:${
snode.port
} returned falsish value for snodes`,
result
@ -783,7 +855,7 @@ class LokiSnodeAPI {
this.markRandomNodeUnreachable(snode);
const randomPoolRemainingCount = this.getRandomPoolLength();
log.error(
'loki_snodes:::getSnodesForPubkey - error',
'loki_snodes:::_getSnodesForPubkey - error',
e.code,
e.message,
`for ${snode.ip}:${
@ -794,19 +866,17 @@ class LokiSnodeAPI {
}
}
async getSwarmNodes(pubKey) {
async _getSwarmNodes(pubKey) {
const snodes = [];
// creates a range: [0, 1, 2]
const questions = [...Array(RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM).keys()];
// FIXME: handle rejections
await Promise.all(
questions.map(async () => {
// allow exceptions to pass through upwards
const rSnode = await this.getRandomSnodeAddress();
const resList = await this.getSnodesForPubkey(rSnode, pubKey);
// should we only activate entries that are in all results?
const resList = await this._getSnodesForPubkey(pubKey);
resList.map(item => {
const hasItem = snodes.some(
hItem => item.ip === hItem.ip && item.port === hItem.port
);
const hasItem = snodes.some(n => compareSnodes(n, item));
if (!hasItem) {
snodes.push(item);
}
@ -814,6 +884,7 @@ class LokiSnodeAPI {
});
})
);
// should we only activate entries that are in all results?
return snodes;
}
}

Loading…
Cancel
Save