Merge pull request #868 from neuroscr/tls-fix

Snode communication retries and logging improvement
pull/933/head
Ryan Tharp 5 years ago committed by GitHub
commit 0c43c511d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -217,7 +217,7 @@ class LokiMessageAPI {
} }
return true; return true;
} catch (e) { } catch (e) {
log.warn('Loki send message:', e); log.warn('Loki send message error:', e.code, e.message, `from ${address}`);
if (e instanceof textsecure.WrongSwarmError) { if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e; const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm); await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);
@ -272,6 +272,8 @@ class LokiMessageAPI {
try { try {
// TODO: Revert back to using snode address instead of IP // TODO: Revert back to using snode address instead of IP
let messages = await this.retrieveNextMessages(nodeData.ip, nodeData); let messages = await this.retrieveNextMessages(nodeData.ip, nodeData);
// this only tracks retrieval failures
// won't include parsing failures...
successiveFailures = 0; successiveFailures = 0;
if (messages.length) { if (messages.length) {
const lastMessage = _.last(messages); const lastMessage = _.last(messages);
@ -288,7 +290,12 @@ class LokiMessageAPI {
// Execute callback even with empty array to signal online status // Execute callback even with empty array to signal online status
callback(messages); callback(messages);
} catch (e) { } catch (e) {
log.warn('Loki retrieve messages:', e.code, e.message); log.warn(
'Loki retrieve messages error:',
e.code,
e.message,
`on ${nodeData.ip}:${nodeData.port}`
);
if (e instanceof textsecure.WrongSwarmError) { if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e; const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm); await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm);
@ -312,9 +319,24 @@ class LokiMessageAPI {
} }
} }
if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) { if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
log.warn(
`removing ${nodeData.ip}:${
nodeData.port
} from our swarm pool. We have ${
Object.keys(this.ourSwarmNodes).length
} usable swarm nodes left`
);
await lokiSnodeAPI.unreachableNode(this.ourKey, address); await lokiSnodeAPI.unreachableNode(this.ourKey, address);
} }
} }
// if not stopPollingResult
if (_.isEmpty(this.ourSwarmNodes)) {
log.error(
'We no longer have any swarm nodes available to try in pool, closing retrieve connection'
);
return false;
}
return true;
} }
async retrieveNextMessages(nodeUrl, nodeData) { async retrieveNextMessages(nodeUrl, nodeData) {
@ -342,12 +364,31 @@ class LokiMessageAPI {
} }
async startLongPolling(numConnections, stopPolling, callback) { async startLongPolling(numConnections, stopPolling, callback) {
log.info('startLongPolling for', numConnections, 'connections');
this.ourSwarmNodes = {}; this.ourSwarmNodes = {};
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
log.info('swarmNodes', nodes.length, 'for', this.ourKey);
Object.keys(nodes).forEach(j => {
const node = nodes[j];
log.info(`${j} ${node.ip}:${node.port}`);
});
if (nodes.length < numConnections) { if (nodes.length < numConnections) {
await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); log.warn(
nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); 'Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
);
nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
if (nodes.length < numConnections) {
log.error(
'Could not get enough SwarmNodes for our pubkey from blockchain'
);
}
} }
log.info(
`There are currently ${
nodes.length
} swarmNodes for pubKey in our local database`
);
for (let i = 0; i < nodes.length; i += 1) { for (let i = 0; i < nodes.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode( const lastHash = await window.Signal.Data.getLastHashBySnode(
nodes[i].address nodes[i].address
@ -364,9 +405,13 @@ class LokiMessageAPI {
promises.push(this.openRetrieveConnection(stopPolling, callback)); promises.push(this.openRetrieveConnection(stopPolling, callback));
} }
// blocks until all snodes in our swarms have been removed from the list // blocks until numConnections snodes in our swarms have been removed from the list
// less than numConnections being active is fine, only need to restart if none per Niels 20/02/13
// or if there is network issues (ENOUTFOUND due to lokinet) // or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises); await Promise.all(promises);
log.error('All our long poll swarm connections have been removed');
// should we just call ourself again?
// no, our caller already handles this...
} }
} }

@ -29,10 +29,10 @@ const decryptResponse = async (response, address) => {
return {}; return {};
}; };
// TODO: Don't allow arbitrary URLs, only snodes and loki servers
const sendToProxy = async (options = {}, targetNode) => { const sendToProxy = async (options = {}, targetNode) => {
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress(); const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
// Don't allow arbitrary URLs, only snodes and loki servers
const url = `https://${randSnode.ip}:${randSnode.port}/proxy`; const url = `https://${randSnode.ip}:${randSnode.port}/proxy`;
const snPubkeyHex = StringView.hexToArrayBuffer(targetNode.pubkey_x25519); const snPubkeyHex = StringView.hexToArrayBuffer(targetNode.pubkey_x25519);
@ -67,20 +67,56 @@ const sendToProxy = async (options = {}, targetNode) => {
const response = await nodeFetch(url, firstHopOptions); const response = await nodeFetch(url, firstHopOptions);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1;
const ciphertext = await response.text(); // detect SNode is not ready (not in swarm; not done syncing)
if (response.status === 503) {
const ciphertext = await response.text();
log.error(`lokiRpc sendToProxy snode ${randSnode.ip}:${randSnode.port} error`, ciphertext);
// mark as bad for this round (should give it some time and improve success rates)
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
// retry for a new working snode
return sendToProxy(options, targetNode);
}
const ciphertextBuffer = dcodeIO.ByteBuffer.wrap( // FIXME: handle nodeFetch errors/exceptions...
ciphertext, if (response.status !== 200) {
'base64' // let us know we need to create handlers for new unhandled codes
).toArrayBuffer(); log.warn('lokiRpc sendToProxy fetch non-200 statusCode', response.status);
}
const plaintextBuffer = await window.libloki.crypto.DHDecrypt( const ciphertext = await response.text();
symmetricKey, if (!ciphertext) {
ciphertextBuffer // avoid base64 decode failure
); log.warn('Server did not return any data for', options);
}
const textDecoder = new TextDecoder(); let plaintext;
const plaintext = textDecoder.decode(plaintextBuffer); let ciphertextBuffer;
try {
ciphertextBuffer = dcodeIO.ByteBuffer.wrap(
ciphertext,
'base64'
).toArrayBuffer();
const plaintextBuffer = await window.libloki.crypto.DHDecrypt(
symmetricKey,
ciphertextBuffer
);
const textDecoder = new TextDecoder();
plaintext = textDecoder.decode(plaintextBuffer);
} catch(e) {
log.error(
'lokiRpc sendToProxy decode error',
e.code,
e.message,
`from ${randSnode.ip}:${randSnode.port} ciphertext:`,
ciphertext
);
if (ciphertextBuffer) {
log.error('ciphertextBuffer', ciphertextBuffer);
}
return false;
}
try { try {
const jsonRes = JSON.parse(plaintext); const jsonRes = JSON.parse(plaintext);
@ -90,10 +126,10 @@ const sendToProxy = async (options = {}, targetNode) => {
return JSON.parse(jsonRes.body); return JSON.parse(jsonRes.body);
} catch (e) { } catch (e) {
log.error( log.error(
'lokiRpc sendToProxy error', 'lokiRpc sendToProxy parse error',
e.code, e.code,
e.message, e.message,
'json', `from ${randSnode.ip}:${randSnode.port} json:`,
jsonRes.body jsonRes.body
); );
} }
@ -102,10 +138,10 @@ const sendToProxy = async (options = {}, targetNode) => {
return jsonRes; return jsonRes;
} catch (e) { } catch (e) {
log.error( log.error(
'lokiRpc sendToProxy error', 'lokiRpc sendToProxy parse error',
e.code, e.code,
e.message, e.message,
'json', `from ${randSnode.ip}:${randSnode.port} json:`,
plaintext plaintext
); );
} }
@ -150,7 +186,7 @@ const lokiFetch = async (url, options = {}, targetNode = null) => {
try { try {
if (window.lokiFeatureFlags.useSnodeProxy && targetNode) { if (window.lokiFeatureFlags.useSnodeProxy && targetNode) {
const result = await sendToProxy(fetchOptions, targetNode); const result = await sendToProxy(fetchOptions, targetNode);
return result.json(); return result ? result.json() : false;
} }
if (url.match(/https:\/\//)) { if (url.match(/https:\/\//)) {

@ -4,6 +4,8 @@
const is = require('@sindresorhus/is'); const is = require('@sindresorhus/is');
const { lokiRpc } = require('./loki_rpc'); const { lokiRpc } = require('./loki_rpc');
const RANDOM_SNODES_TO_USE = 3;
class LokiSnodeAPI { class LokiSnodeAPI {
constructor({ serverUrl, localUrl }) { constructor({ serverUrl, localUrl }) {
if (!is.string(serverUrl)) { if (!is.string(serverUrl)) {
@ -18,6 +20,7 @@ class LokiSnodeAPI {
async getRandomSnodeAddress() { async getRandomSnodeAddress() {
/* resolve random snode */ /* resolve random snode */
if (this.randomSnodePool.length === 0) { if (this.randomSnodePool.length === 0) {
// allow exceptions to pass through upwards
await this.initialiseRandomPool(); await this.initialiseRandomPool();
} }
if (this.randomSnodePool.length === 0) { if (this.randomSnodePool.length === 0) {
@ -28,7 +31,7 @@ class LokiSnodeAPI {
]; ];
} }
async initialiseRandomPool(seedNodes = [...window.seedNodeList]) { async initialiseRandomPool(seedNodes = [...window.seedNodeList], consecutiveErrors = 0) {
const params = { const params = {
limit: 20, limit: 20,
active_only: true, active_only: true,
@ -43,8 +46,9 @@ class LokiSnodeAPI {
Math.floor(Math.random() * seedNodes.length), Math.floor(Math.random() * seedNodes.length),
1 1
)[0]; )[0];
let snodes = [];
try { try {
const result = await lokiRpc( const response = await lokiRpc(
`http://${seedNode.ip}`, `http://${seedNode.ip}`,
seedNode.port, seedNode.port,
'get_n_service_nodes', 'get_n_service_nodes',
@ -53,7 +57,7 @@ class LokiSnodeAPI {
'/json_rpc' // Seed request endpoint '/json_rpc' // Seed request endpoint
); );
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs // Filter 0.0.0.0 nodes which haven't submitted uptime proofs
const snodes = result.result.service_node_states.filter( snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0' snode => snode.public_ip !== '0.0.0.0'
); );
this.randomSnodePool = snodes.map(snode => ({ this.randomSnodePool = snodes.map(snode => ({
@ -64,12 +68,20 @@ class LokiSnodeAPI {
})); }));
} catch (e) { } catch (e) {
log.warn('initialiseRandomPool error', e.code, e.message); log.warn('initialiseRandomPool error', e.code, e.message);
if (seedNodes.length === 0) { if (consecutiveErrors < 3) {
throw new window.textsecure.SeedNodeError( // retry after a possible delay
'Failed to contact seed node' setTimeout(() => {
); log.info('Retrying initialising random snode pool, try #', consecutiveErrors);
this.initialiseRandomPool(seedNodes, consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
log.error('Giving up trying to contact seed node');
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError(
'Failed to contact seed node'
);
}
} }
this.initialiseRandomPool(seedNodes);
} }
} }
@ -111,6 +123,7 @@ class LokiSnodeAPI {
const filteredNodes = newNodes.filter(snode => snode.ip !== '0.0.0.0'); const filteredNodes = newNodes.filter(snode => snode.ip !== '0.0.0.0');
const conversation = ConversationController.get(pubKey); const conversation = ConversationController.get(pubKey);
await conversation.updateSwarmNodes(filteredNodes); await conversation.updateSwarmNodes(filteredNodes);
return filteredNodes;
} catch (e) { } catch (e) {
throw new window.textsecure.ReplayableError({ throw new window.textsecure.ReplayableError({
message: 'Could not get conversation', message: 'Could not get conversation',
@ -120,7 +133,8 @@ class LokiSnodeAPI {
async refreshSwarmNodesForPubKey(pubKey) { async refreshSwarmNodesForPubKey(pubKey) {
const newNodes = await this.getFreshSwarmNodes(pubKey); const newNodes = await this.getFreshSwarmNodes(pubKey);
this.updateSwarmNodes(pubKey, newNodes); const filteredNodes = this.updateSwarmNodes(pubKey, newNodes);
return filteredNodes;
} }
async getFreshSwarmNodes(pubKey) { async getFreshSwarmNodes(pubKey) {
@ -130,6 +144,7 @@ class LokiSnodeAPI {
try { try {
newSwarmNodes = await this.getSwarmNodes(pubKey); newSwarmNodes = await this.getSwarmNodes(pubKey);
} catch (e) { } catch (e) {
log.error('getFreshSwarmNodes error', e.code, e.message);
// TODO: Handle these errors sensibly // TODO: Handle these errors sensibly
newSwarmNodes = []; newSwarmNodes = [];
} }
@ -141,9 +156,7 @@ class LokiSnodeAPI {
return newSwarmNodes; return newSwarmNodes;
} }
async getSwarmNodes(pubKey) { async getSnodesForPubkey(snode, pubKey) {
// TODO: Hit multiple random nodes and merge lists?
const snode = await this.getRandomSnodeAddress();
try { try {
const result = await lokiRpc( const result = await lokiRpc(
`https://${snode.ip}`, `https://${snode.ip}`,
@ -158,7 +171,7 @@ class LokiSnodeAPI {
); );
if (!result) { if (!result) {
log.warn( log.warn(
`getSwarmNodes lokiRpc on ${snode.ip}:${ `getSnodesForPubkey lokiRpc on ${snode.ip}:${
snode.port snode.port
} returned falsish value`, } returned falsish value`,
result result
@ -168,11 +181,32 @@ class LokiSnodeAPI {
const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0'); const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0');
return snodes; return snodes;
} catch (e) { } catch (e) {
log.error('getSwarmNodes error', e.code, e.message); log.error('getSnodesForPubkey error', e.code, e.message, `for ${snode.ip}:${snode.port}`);
this.markRandomNodeUnreachable(snode); this.markRandomNodeUnreachable(snode);
return this.getSwarmNodes(pubKey); return [];
} }
} }
async getSwarmNodes(pubKey) {
const snodes = [];
const questions = [...Array(RANDOM_SNODES_TO_USE).keys()];
await Promise.all(
questions.map(async () => {
// allow exceptions to pass through upwards
const rSnode = await this.getRandomSnodeAddress();
const resList = await this.getSnodesForPubkey(rSnode, pubKey);
// should we only activate entries that are in all results?
resList.map(item => {
const hasItem = snodes.some(hItem => item.ip === hItem.ip && item.port === hItem.port);
if (!hasItem) {
snodes.push(item);
}
return true;
});
})
);
return snodes;
}
} }
module.exports = LokiSnodeAPI; module.exports = LokiSnodeAPI;

@ -100,6 +100,7 @@
); );
} catch (e) { } catch (e) {
// we'll try again anyway // we'll try again anyway
window.log.error('http-resource pollServer error', e.code, e.message);
} }
if (this.calledStop) { if (this.calledStop) {
@ -109,6 +110,10 @@
connected = false; connected = false;
// Exhausted all our snodes urls, trying again later from scratch // Exhausted all our snodes urls, trying again later from scratch
setTimeout(() => { setTimeout(() => {
window.log.info(
`Exhausted all our snodes urls, trying again in ${EXHAUSTED_SNODES_RETRY_DELAY /
1000}s from scratch`
);
this.pollServer(); this.pollServer();
}, EXHAUSTED_SNODES_RETRY_DELAY); }, EXHAUSTED_SNODES_RETRY_DELAY);
}; };

Loading…
Cancel
Save