remove separate version pool vars, loki_primitive refactor, make sure _getAllVerionsForRandomSnodePool can't stack, exception clean up, findMatchingSnode refactor, getSwarmNodesForPubKey() now loads the lasthash, getNodesMinVersion is no longer async, _getSnodesForPubkey no longer takes a snode

pull/1061/head
Ryan Tharp 6 years ago
parent 9927eab539
commit 3f5a667989

@ -1,8 +1,11 @@
/* eslint-disable class-methods-use-this */
/* global window, textsecure, ConversationController, _, log, clearTimeout, process, Buffer, StringView, dcodeIO */
/* global window, textsecure, ConversationController, _, log, process, Buffer, StringView, dcodeIO */
const is = require('@sindresorhus/is');
const { lokiRpc } = require('./loki_rpc');
// not sure I like this name but it's been than util
const primitives = require('./loki_primitives');
const is = require('@sindresorhus/is');
const https = require('https');
const nodeFetch = require('node-fetch');
const semver = require('semver');
@ -13,8 +16,11 @@ const snodeHttpsAgent = new https.Agent({
const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3;
const SEED_NODE_RETRIES = 3;
const SNODE_VERSION_RETRIES = 3;
const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms));
// findMatchingSnode(search)(current)
const findMatchingSnode = search => current => current.ip === search.ip && current.port === search.port;
// just get the filtered list
async function tryGetSnodeListFromLokidSeednode(
@ -99,61 +105,16 @@ async function getSnodeListFromLokidSeednode(
return snodes;
}
// FIXME: move out to more generic adv promise library
const snodeGlobalLocks = {};
async function allowOnlyOneAtATime(name, process, timeout) {
// if currently not in progress
if (snodeGlobalLocks[name] === undefined) {
// set lock
snodeGlobalLocks[name] = new Promise(async (resolve, reject) => {
// set up timeout feature
let timeoutTimer = null;
if (timeout) {
timeoutTimer = setTimeout(() => {
log.warn(
`loki_snodes:::allowOnlyOneAtATime - TIMEDOUT after ${timeout}s`
);
delete snodeGlobalLocks[name]; // clear lock
reject();
}, timeout);
}
// do actual work
await process();
// clear timeout timer
if (timeout) {
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
}
delete snodeGlobalLocks[name]; // clear lock
// release the kraken
resolve();
});
}
try {
await snodeGlobalLocks[name];
} catch (e) {
// we will throw for each time initialiseRandomPool has been called in parallel
log.error('loki_snodes:::allowOnlyOneAtATime - error', e.code, e.message);
throw e;
}
log.info('loki_snodes:::allowOnlyOneAtATime - RESOLVED');
}
class LokiSnodeAPI {
constructor({ serverUrl, localUrl }) {
if (!is.string(serverUrl)) {
throw new Error('WebAPI.initialize: Invalid server url');
throw new Error('LokiSnodeAPI.initialize: Invalid server url');
}
this.serverUrl = serverUrl; // random.snode
this.localUrl = localUrl; // localhost.loki
this.randomSnodePool = [];
this.swarmsPendingReplenish = {};
this.refreshRandomPoolPromise = undefined;
this.versionPools = {};
this.versionMap = {}; // reverse version look up
this.versionsRetrieved = false; // to mark when it's done getting versions
this.stopGetAllVersionPromiseControl = false;
this.onionPaths = [];
this.guardNodes = [];
@ -227,14 +188,14 @@ class LokiSnodeAPI {
async selectGuardNodes() {
const _ = window.Lodash;
const nodePool = await this.getRandomSnodePool();
// FIXME: handle rejections
let nodePool = await this.getRandomSnodePool();
if (nodePool.length === 0) {
log.error(`Could not select guarn nodes: node pool is empty`);
return [];
}
const shuffled = _.shuffle(nodePool);
let shuffled = _.shuffle(nodePool);
let guardNodes = [];
@ -421,13 +382,10 @@ class LokiSnodeAPI {
];
}
async getNodesMinVersion(minVersion) {
const _ = window.Lodash;
return _.flatten(
_.entries(this.versionPools)
.filter(v => semver.gte(v[0], minVersion))
.map(v => v[1])
// not cacheable because we write to this.randomSnodePool elsewhere
getNodesMinVersion(minVersion) {
return this.randomSnodePool.filter(node =>
node.version && semver.gt(node.version, minVersion)
);
}
@ -439,31 +397,30 @@ class LokiSnodeAPI {
try {
await this.refreshRandomPool();
} catch (e) {
log.error(`loki_snode:::getRandomProxySnodeAddress - error ${e.code} ${e.message}`);
throw e;
}
await this.refreshRandomPool();
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}
}
const goodVersions = Object.keys(this.versionPools).filter(version =>
semver.gt(version, '2.0.1')
);
if (!goodVersions.length) {
const goodPool = this.getNodesMinVersion('2.0.1');
if (!goodPool.length) {
// FIXME: retry
log.warn(`loki_snode:::getRandomProxySnodeAddress - no good versions yet`);
return false;
}
// FIXME: _.sample?
const goodVersion =
goodVersions[Math.floor(Math.random() * goodVersions.length)];
const pool = this.versionPools[goodVersion];
// FIXME: _.sample?
return pool[Math.floor(Math.random() * pool.length)];
const goodRandomNode =
goodPool[Math.floor(Math.random() * goodPool.length)];
return goodRandomNode;
}
// WARNING: this leaks our IP to all snodes but with no other identifying information
// except that a client started up or ran out of random pool snodes
// and the order of the list is randomized, so a snode can't tell if it just started or not
async getVersion(node) {
async _getVersion(node, options = {}) {
const retries = options.retries || 0;
try {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
const result = await nodeFetch(
@ -473,75 +430,97 @@ class LokiSnodeAPI {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
const data = await result.json();
if (data.version) {
if (this.versionPools[data.version] === undefined) {
this.versionPools[data.version] = [node];
const foundNodeIdx = this.randomSnodePool.findIndex(findMatchingSnode(node));
if (foundNodeIdx !== -1) {
this.randomSnodePool[foundNodeIdx].version = data.version;
} else {
this.versionPools[data.version].push(node);
// maybe already marked bad...
log.warn(`loki_snode:::_getVersion - can't find ${node.ip}:${
node.port
} in randomSnodePool`);
}
// set up reverse mapping for removal lookup
this.versionMap[`${node.ip}:${node.port}`] = data.version;
}
return data.version;
} catch (e) {
// ECONNREFUSED likely means it's just offline...
// ECONNRESET seems to retry and fail as ECONNREFUSED (so likely a node going offline)
// ETIMEDOUT not sure what to do about these
// retry for now but maybe we should be marking bad...
if (e.code === 'ECONNREFUSED') {
this.markRandomNodeUnreachable(node, { versionPoolFailure: true });
this.markRandomNodeUnreachable(node);
const randomNodesLeft = this.getRandomPoolLength();
// clean up these error messages to be a little neater
log.warn(
`loki_snode:::getVersion - ${node.ip}:${
`loki_snode:::_getVersion - ${node.ip}:${
node.port
} is offline, removing, leaving ${randomNodesLeft} in the randomPool`
);
} else {
// mostly ECONNRESETs
// if not ECONNREFUSED, it's mostly ECONNRESETs
// ENOTFOUND could mean no internet or hiccup
} else if (retries < SNODE_VERSION_RETRIES) {
log.warn(
'loki_snode:::getVersion - Error',
'loki_snode:::_getVersion - Error',
e.code,
e.message,
`on ${node.ip}:${node.port} retrying in 1s`
);
await timeoutDelay(1000);
await this.getVersion(node);
await primitives.sleepFor(1000);
await this._getVersion(node, {...options, retries: retries + 1 });
} else {
this.markRandomNodeUnreachable(node);
const randomNodesLeft = this.getRandomPoolLength();
log.warn(`loki_snode:::_getVersion - failing to get version for ${node.ip}:${node.port}, removing, leaving ${randomNodesLeft} in the randomPool`)
}
// maybe throw?
return false;
}
}
async getAllVerionsForRandomSnodePool() {
// now get version for all snodes
// also acts an early online test/purge of bad nodes
let count = 0;
const verionStart = Date.now();
const total = this.randomSnodePool.length;
const noticeEvery = parseInt(total / 10, 10);
// eslint-disable-next-line no-restricted-syntax
for (const node of this.randomSnodePool) {
count += 1;
// eslint-disable-next-line no-await-in-loop
await this.getVersion(node);
async _getAllVerionsForRandomSnodePool() {
// let count = 0;
// const verionStart = Date.now();
// const total = this.randomSnodePool.length;
// const noticeEvery = parseInt(total / 10, 10);
const loop = primitives.abortableIterator(this.randomSnodePool, async (node) => {
// count += 1;
try {
await this._getVersion(node);
/*
if (count % noticeEvery === 0) {
// give stats
const diff = Date.now() - verionStart;
log.info(
`loki_snode:::getAllVerionsForRandomSnodePool - ${count}/${total} pool version status update, has taken ${diff.toLocaleString()}ms`
log.debug(
`loki_snode:::_getAllVerionsForRandomSnodePool - ${count}/${total} pool version status update, has taken ${diff.toLocaleString()}ms`
);
Object.keys(this.versionPools).forEach(version => {
const nodes = this.versionPools[version].length;
log.info(
`loki_snode:::getAllVerionsForRandomSnodePool - version ${version} has ${nodes.toLocaleString()} snodes`
log.debug(
`loki_snode:::_getAllVerionsForRandomSnodePool - version ${version} has ${nodes.toLocaleString()} snodes`
);
});
}
*/
} catch (e) {
log.error('loki_snode:::_getAllVerionsForRandomSnodePool - error', e.code, e.message);
throw(e);
}
log.info('Versions retrieved from network!');
this.versionsRetrieved = true;
});
// make abortable accessible outside this scope
this.stopGetAllVersionPromiseControl = loop.stop;
await loop.start(true);
this.stopGetAllVersionPromiseControl = false; // clear lock
log.debug('Versions retrieved from network!');
}
async refreshRandomPool(seedNodes = [...window.seedNodeList]) {
await allowOnlyOneAtATime('refreshRandomPool', async () => {
return primitives.allowOnlyOneAtATime('refreshRandomPool', async () => {
// are we running any _getAllVerionsForRandomSnodePool
if (this.stopGetAllVersionPromiseControl !== false) {
// we are, stop them
this.stopGetAllVersionPromiseControl();
}
let snodes = [];
try {
snodes = await getSnodeListFromLokidSeednode(seedNodes);
@ -549,8 +528,6 @@ class LokiSnodeAPI {
snodes = _.shuffle(snodes);
// commit changes to be live
// we'll update the version (in case they upgrade) every cycle
this.versionPools = {};
this.versionsRetrieved = false;
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
@ -558,15 +535,15 @@ class LokiSnodeAPI {
pubkey_ed25519: snode.pubkey_ed25519,
}));
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshed random snode pool with',
'loki_snodes:::refreshRandomPool - Refreshed random snode pool with',
this.randomSnodePool.length,
'snodes'
);
// start polling versions but no need to await it
this.getAllVerionsForRandomSnodePool();
this._getAllVerionsForRandomSnodePool();
} catch (e) {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - error',
'loki_snodes:::refreshRandomPool - error',
e.code,
e.message
);
@ -581,108 +558,10 @@ class LokiSnodeAPI {
);
}
}
return this.randomSnodePool;
});
}
async refreshRandomPool2(seedNodes = [...window.seedNodeList]) {
// if currently not in progress
if (this.refreshRandomPoolPromise) {
// set lock
this.refreshRandomPoolPromise = new Promise(async (resolve, reject) => {
let timeoutTimer = null;
// private retry container
const trySeedNode = async (consecutiveErrors = 0) => {
let snodes = [];
try {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshing random snode pool'
);
snodes = await getSnodeListFromLokidSeednode(seedNodes);
// make sure order of the list is random, so we get version in a non-deterministic way
snodes = _.shuffle(snodes);
// commit changes to be live
// we'll update the version (in case they upgrade) every cycle
this.versionPools = {};
this.versionsRetrieved = false;
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshed random snode pool with',
this.randomSnodePool.length,
'snodes'
);
// clear lock
delete this.refreshRandomPoolPromise;
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
// start polling versions but no need to await it
this.getAllVerionsForRandomSnodePool();
resolve();
} catch (e) {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - error',
e.code,
e.message
);
// handle retries in case of temporary hiccups
if (consecutiveErrors < SEED_NODE_RETRIES) {
// retry after a possible delay
setTimeout(() => {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Retrying initialising random snode pool, try #',
consecutiveErrors
);
trySeedNode(consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
log.error(
'loki_snodes:::refreshRandomPoolPromise - Giving up trying to contact seed node'
);
if (snodes.length === 0) {
this.refreshRandomPoolPromise = null; // clear lock
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
throw new window.textsecure.SeedNodeError(
'Failed to contact seed node'
);
}
}
}
};
const delay = (SEED_NODE_RETRIES + 1) * (SEED_NODE_RETRIES + 1) * 5000;
timeoutTimer = setTimeout(() => {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - TIMEDOUT after',
delay,
's'
);
reject();
}, delay);
trySeedNode();
});
}
try {
await this.refreshRandomPoolPromise;
} catch (e) {
// we will throw for each time initialiseRandomPool has been called in parallel
log.error(
'loki_snodes:::refreshRandomPoolPromise - error',
e.code,
e.message
);
throw e;
}
log.info('loki_snodes:::refreshRandomPoolPromise - RESOLVED');
}
// unreachableNode.url is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode
async unreachableNode(pubKey, unreachableNode) {
const conversation = ConversationController.get(pubKey);
@ -698,8 +577,7 @@ class LokiSnodeAPI {
// keep all but thisNode
const thisNode =
node.address === unreachableNode.address &&
node.ip === unreachableNode.ip &&
node.port === unreachableNode.port;
findMatchingSnode(unreachableNode)(node);
if (thisNode) {
found = true;
}
@ -712,59 +590,44 @@ class LokiSnodeAPI {
} has already been marked as bad`
);
}
try {
await conversation.updateSwarmNodes(filteredNodes);
} catch(e) {
log.error(`loki_snodes:::unreachableNode - error ${e.code} ${e.message}`);
throw(e);
}
return filteredNodes;
}
markRandomNodeUnreachable(snode, options = {}) {
// avoid retries when we can't get the version because they're offline
if (!options.versionPoolFailure) {
const snodeVersion = this.versionMap[`${snode.ip}:${snode.port}`];
if (this.versionPools[snodeVersion]) {
this.versionPools[snodeVersion] = _.without(
this.versionPools[snodeVersion],
snode
);
} else {
const retries = options.retries || 0;
if (snodeVersion) {
// reverse map (versionMap) is out of sync with versionPools
log.error(
'loki_snode:::markRandomNodeUnreachable - No snodes for version',
snodeVersion,
`try #${retries} retrying in 10s`
);
} else {
// we don't know our version yet
// and if we're offline, we'll likely not get it until it restarts if it does...
log.warn(
'loki_snode:::markRandomNodeUnreachable - No version for snode',
`${snode.ip}:${snode.port}`,
`try #${retries} retrying in 10s`
);
}
// make sure we don't retry past 15 mins (10s * 100 ~ 1000s)
if (retries < 100) {
setTimeout(() => {
this.markRandomNodeUnreachable(snode, {
...options,
retries: retries + 1,
});
}, 10 * 1000);
}
}
}
markRandomNodeUnreachable(snode) {
this.randomSnodePool = _.without(this.randomSnodePool, snode);
}
async updateLastHash(snode, hash, expiresAt) {
await window.Signal.Data.updateLastHash({ snode, hash, expiresAt });
async updateLastHash(snodeAddress, hash, expiresAt) {
// FIXME: handle rejections
await window.Signal.Data.updateLastHash({ snode: snodeAddress, hash, expiresAt });
}
getSwarmNodesForPubKey(pubKey) {
// called by loki_message:::sendMessage & loki_message:::startLongPolling
async getSwarmNodesForPubKey(pubKey) {
try {
const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')];
// always? include lashHash
await Promise.all(Object.keys(swarmNodes).map(async j => {
const node = swarmNodes[j];
// FIXME make a batch function call
const lastHash = await window.Signal.Data.getLastHashBySnode(
node.address
);
log.debug(`loki_snode:::getSwarmNodesForPubKey - ${j} ${node.ip}:${node.port} hash ${lastHash} for ${node.address}`);
swarmNodes[j] = {
...node,
lastHash,
};
}));
return swarmNodes;
} catch (e) {
throw new window.textsecure.ReplayableError({
@ -780,39 +643,38 @@ class LokiSnodeAPI {
await conversation.updateSwarmNodes(filteredNodes);
return filteredNodes;
} catch (e) {
log.error(`loki_snodes:::updateSwarmNodes - error ${e.code} ${e.message}`);
throw new window.textsecure.ReplayableError({
message: 'Could not get conversation',
});
}
}
// FIXME: in it's own PR, reorder functions: put _getFreshSwarmNodes and it's callee
// only loki_message::startLongPolling calls this...
async refreshSwarmNodesForPubKey(pubKey) {
const newNodes = await this.getFreshSwarmNodes(pubKey);
// FIXME: handle rejections
const newNodes = await this._getFreshSwarmNodes(pubKey);
const filteredNodes = this.updateSwarmNodes(pubKey, newNodes);
return filteredNodes;
}
async getFreshSwarmNodes(pubKey) {
if (!(pubKey in this.swarmsPendingReplenish)) {
this.swarmsPendingReplenish[pubKey] = new Promise(async resolve => {
let newSwarmNodes;
async _getFreshSwarmNodes(pubKey) {
return primitives.allowOnlyOneAtATime(`swarmRefresh${pubKey}`, async () => {
let newSwarmNodes = [];
try {
newSwarmNodes = await this.getSwarmNodes(pubKey);
newSwarmNodes = await this._getSwarmNodes(pubKey);
} catch (e) {
log.error(
'loki_snodes:::getFreshSwarmNodes - error',
'loki_snodes:::_getFreshSwarmNodes - error',
e.code,
e.message
);
// TODO: Handle these errors sensibly
newSwarmNodes = [];
}
resolve(newSwarmNodes);
});
}
const newSwarmNodes = await this.swarmsPendingReplenish[pubKey];
delete this.swarmsPendingReplenish[pubKey];
return newSwarmNodes;
});
}
// helper function
@ -849,8 +711,9 @@ class LokiSnodeAPI {
const nameHash = dcodeIO.ByteBuffer.wrap(output).toString('base64');
// Get nodes capable of doing LNS
let lnsNodes = await this.getNodesMinVersion('2.0.3');
lnsNodes = _.shuffle(lnsNodes);
const lnsNodes = this.getNodesMinVersion('2.0.3');
// randomPool should already be shuffled
// lnsNodes = _.shuffle(lnsNodes);
// Loop until 3 confirmations
@ -907,8 +770,11 @@ class LokiSnodeAPI {
return pubkey;
}
async getSnodesForPubkey(snode, pubKey) {
// get snodes for pubkey from random snode
async _getSnodesForPubkey(pubKey) {
let snode = {};
try {
snode = await this.getRandomSnodeAddress();
const result = await lokiRpc(
`https://${snode.ip}`,
snode.port,
@ -922,7 +788,7 @@ class LokiSnodeAPI {
);
if (!result) {
log.warn(
`loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${
`loki_snode:::_getSnodesForPubkey - lokiRpc on ${snode.ip}:${
snode.port
} returned falsish value`,
result
@ -932,7 +798,7 @@ class LokiSnodeAPI {
if (!result.snodes) {
// we hit this when snode gives 500s
log.warn(
`loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${
`loki_snode:::_getSnodesForPubkey - lokiRpc on ${snode.ip}:${
snode.port
} returned falsish value for snodes`,
result
@ -945,7 +811,7 @@ class LokiSnodeAPI {
this.markRandomNodeUnreachable(snode);
const randomPoolRemainingCount = this.getRandomPoolLength();
log.error(
'loki_snodes:::getSnodesForPubkey - error',
'loki_snodes:::_getSnodesForPubkey - error',
e.code,
e.message,
`for ${snode.ip}:${
@ -956,19 +822,17 @@ class LokiSnodeAPI {
}
}
async getSwarmNodes(pubKey) {
async _getSwarmNodes(pubKey) {
const snodes = [];
// creates a range: [0, 1, 2]
const questions = [...Array(RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM).keys()];
// FIXME: handle rejections
await Promise.all(
questions.map(async () => {
// allow exceptions to pass through upwards
const rSnode = await this.getRandomSnodeAddress();
const resList = await this.getSnodesForPubkey(rSnode, pubKey);
// should we only activate entries that are in all results?
const resList = await this._getSnodesForPubkey(pubKey);
resList.map(item => {
const hasItem = snodes.some(
hItem => item.ip === hItem.ip && item.port === hItem.port
);
const hasItem = snodes.some(findMatchingSnode(item));
if (!hasItem) {
snodes.push(item);
}
@ -976,6 +840,7 @@ class LokiSnodeAPI {
});
})
);
// should we only activate entries that are in all results?
return snodes;
}
}

Loading…
Cancel
Save