add some retries for not already retries requests

pull/1624/head
Audric Ackermann 5 years ago
parent a777b09165
commit a2ea02960e
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4

@ -35,12 +35,12 @@ window.Signal = {
window.Signal.Logs = require('./js/modules/logs'); window.Signal.Logs = require('./js/modules/logs');
window.resetDatabase = () => { window.resetDatabase = () => {
window?.log?.info('reset database'); window.log.info('reset database');
ipcRenderer.send('resetDatabase'); ipcRenderer.send('resetDatabase');
}; };
window.restart = () => { window.restart = () => {
window?.log?.info('restart'); window.log.info('restart');
ipc.send('restart'); ipc.send('restart');
}; };

@ -72,7 +72,7 @@ window.isBeforeVersion = (toCheck, baseVersion) => {
try { try {
return semver.lt(toCheck, baseVersion); return semver.lt(toCheck, baseVersion);
} catch (error) { } catch (error) {
window?.log?.error( window.log.error(
`isBeforeVersion error: toCheck: ${toCheck}, baseVersion: ${baseVersion}`, `isBeforeVersion error: toCheck: ${toCheck}, baseVersion: ${baseVersion}`,
error && error.stack ? error.stack : error error && error.stack ? error.stack : error
); );
@ -164,11 +164,11 @@ window.open = () => null;
window.eval = global.eval = () => null; window.eval = global.eval = () => null;
window.drawAttention = () => { window.drawAttention = () => {
// window?.log?.debug('draw attention'); // window.log.debug('draw attention');
ipc.send('draw-attention'); ipc.send('draw-attention');
}; };
window.showWindow = () => { window.showWindow = () => {
window?.log?.info('show window'); window.log.info('show window');
ipc.send('show-window'); ipc.send('show-window');
}; };
@ -177,12 +177,12 @@ window.setAutoHideMenuBar = autoHide => ipc.send('set-auto-hide-menu-bar', autoH
window.setMenuBarVisibility = visibility => ipc.send('set-menu-bar-visibility', visibility); window.setMenuBarVisibility = visibility => ipc.send('set-menu-bar-visibility', visibility);
window.restart = () => { window.restart = () => {
window?.log?.info('restart'); window.log.info('restart');
ipc.send('restart'); ipc.send('restart');
}; };
window.resetDatabase = () => { window.resetDatabase = () => {
window?.log?.info('reset database'); window.log.info('reset database');
ipc.send('resetDatabase'); ipc.send('resetDatabase');
}; };
@ -198,7 +198,7 @@ window.onUnblockNumber = async number => {
const conversation = window.getConversationController().get(number); const conversation = window.getConversationController().get(number);
await conversation.unblock(); await conversation.unblock();
} catch (e) { } catch (e) {
window?.log?.info('IPC on unblock: failed to fetch conversation for number: ', number); window.log.info('IPC on unblock: failed to fetch conversation for number: ', number);
} }
} }
}; };
@ -278,7 +278,7 @@ window.setAutoUpdateEnabled = value => ipc.send('set-auto-update-setting', !!val
ipc.on('get-ready-for-shutdown', async () => { ipc.on('get-ready-for-shutdown', async () => {
const { shutdown } = window.Events || {}; const { shutdown } = window.Events || {};
if (!shutdown) { if (!shutdown) {
window?.log?.error('preload shutdown handler: shutdown method not found'); window.log.error('preload shutdown handler: shutdown method not found');
ipc.send('now-ready-for-shutdown'); ipc.send('now-ready-for-shutdown');
return; return;
} }
@ -299,7 +299,7 @@ window.removeSetupMenuItems = () => ipc.send('remove-setup-menu-items');
require('./js/logging'); require('./js/logging');
if (config.proxyUrl) { if (config.proxyUrl) {
window?.log?.info('Using provided proxy url'); window.log.info('Using provided proxy url');
} }
window.nodeSetImmediate = setImmediate; window.nodeSetImmediate = setImmediate;

@ -164,7 +164,7 @@ export async function incrementBadPathCountOrDrop(guardNodeEd25519: string) {
// a guard node is dropped when the path is dropped completely (in dropPathStartingWithGuardNode) // a guard node is dropped when the path is dropped completely (in dropPathStartingWithGuardNode)
for (let index = 1; index < pathWithIssues.length; index++) { for (let index = 1; index < pathWithIssues.length; index++) {
const snode = pathWithIssues[index]; const snode = pathWithIssues[index];
await incrementBadSnodeCountOrDrop(snode.pubkey_ed25519); await incrementBadSnodeCountOrDrop({ snodeEd25519: snode.pubkey_ed25519 });
} }
if (newPathFailureCount >= pathFailureThreshold) { if (newPathFailureCount >= pathFailureThreshold) {

@ -1,20 +1,10 @@
import _ from 'lodash'; import _ from 'lodash';
import { SendParams, storeOnNode } from '../snode_api/SNodeAPI'; import { storeOnNode } from '../snode_api/SNodeAPI';
import { getSwarmFor, Snode } from '../snode_api/snodePool'; import { getSwarmFor } from '../snode_api/snodePool';
import { firstTrue } from '../utils/Promise'; import { firstTrue } from '../utils/Promise';
const DEFAULT_CONNECTIONS = 3; const DEFAULT_CONNECTIONS = 3;
async function openSendConnection(snode: Snode, params: SendParams) {
// TODO: Revert back to using snode address instead of IP
const successfulSend = await storeOnNode(snode, params);
if (successfulSend) {
return snode;
}
// should we mark snode as bad if it can't store our message?
return undefined;
}
/** /**
* Refactor note: We should really clean this up ... it's very messy * Refactor note: We should really clean this up ... it's very messy
* *
@ -57,7 +47,18 @@ export async function sendMessage(
const usedNodes = _.slice(swarm, 0, DEFAULT_CONNECTIONS); const usedNodes = _.slice(swarm, 0, DEFAULT_CONNECTIONS);
const promises = usedNodes.map(snodeConnection => openSendConnection(snodeConnection, params)); const promises = usedNodes.map(async usedNode => {
// TODO: Revert back to using snode address instead of IP
// No pRetry here as if this is a bad path it will be handled and retried in lokiOnionFetch.
// the only case we could care about a retry would be when the usedNode is not correct,
// but considering we trigger this request with a few snode in //, this should be fine.
const successfulSend = await storeOnNode(usedNode, params);
if (successfulSend) {
return usedNode;
}
// should we mark snode as bad if it can't store our message?
return undefined;
});
let snode; let snode;
try { try {

@ -15,6 +15,7 @@ import { getRandomSnode, getRandomSnodePool, requiredSnodesForAgreement, Snode }
import { Constants } from '..'; import { Constants } from '..';
import { sha256 } from '../crypto'; import { sha256 } from '../crypto';
import _ from 'lodash'; import _ from 'lodash';
import pRetry from 'p-retry';
const getSslAgentForSeedNode = (seedNodeHost: string, isSsl = false) => { const getSslAgentForSeedNode = (seedNodeHost: string, isSsl = false) => {
let filePrefix = ''; let filePrefix = '';
@ -177,50 +178,68 @@ export type SendParams = {
}; };
// get snodes for pubkey from random snode. Uses an existing snode // get snodes for pubkey from random snode. Uses an existing snode
export async function requestSnodesForPubkey(pubKey: string): Promise<Array<Snode>> {
let targetNode; async function requestSnodesForPubkeyRetryable(
try { pubKey: string,
targetNode = await getRandomSnode(); targetNode: Snode
const result = await snodeRpc( ): Promise<Array<Snode>> {
'get_snodes_for_pubkey', const params = {
{ pubKey,
pubKey, };
}, const result = await snodeRpc('get_snodes_for_pubkey', params, targetNode, pubKey);
targetNode,
pubKey if (!result) {
window?.log?.warn(
`LokiSnodeAPI::requestSnodesForPubkeyRetryable - lokiRpc on ${targetNode.ip}:${targetNode.port} returned falsish value`,
result
); );
throw new Error('requestSnodesForPubkeyRetryable: Invalid result');
}
if (!result) { if (result.status !== 200) {
window?.log?.warn('Status is not 200 for get_snodes_for_pubkey');
throw new Error('requestSnodesForPubkeyRetryable: Invalid status code');
}
try {
const json = JSON.parse(result.body);
if (!json.snodes) {
// we hit this when snode gives 500s
window?.log?.warn( window?.log?.warn(
`LokiSnodeAPI::requestSnodesForPubkey - lokiRpc on ${targetNode.ip}:${targetNode.port} returned falsish value`, `LokiSnodeAPI::requestSnodesForPubkeyRetryable - lokiRpc on ${targetNode.ip}:${targetNode.port} returned falsish value for snodes`,
result result
); );
return []; throw new Error('Invalid json (empty)');
} }
if (result.status !== 200) { const snodes = json.snodes.filter((tSnode: any) => tSnode.ip !== '0.0.0.0');
window?.log?.warn('Status is not 200 for get_snodes_for_pubkey'); return snodes;
return []; } catch (e) {
} throw new Error('Invalid json');
}
}
try { export async function requestSnodesForPubkey(pubKey: string): Promise<Array<Snode>> {
const json = JSON.parse(result.body); try {
const targetNode = await getRandomSnode();
if (!json.snodes) { return await pRetry(
// we hit this when snode gives 500s async () => {
window?.log?.warn( return requestSnodesForPubkeyRetryable(pubKey, targetNode);
`LokiSnodeAPI::requestSnodesForPubkey - lokiRpc on ${targetNode.ip}:${targetNode.port} returned falsish value for snodes`, },
result {
); retries: 10, // each path can fail 3 times before being dropped, we have 3 paths at most
return []; factor: 2,
minTimeout: 200,
maxTimeout: 4000,
onFailedAttempt: e => {
window?.log?.warn(
`requestSnodesForPubkey attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
);
},
} }
);
const snodes = json.snodes.filter((tSnode: any) => tSnode.ip !== '0.0.0.0');
return snodes;
} catch (e) {
window?.log?.warn('Invalid json');
return [];
}
} catch (e) { } catch (e) {
window?.log?.error('LokiSnodeAPI::requestSnodesForPubkey - error', e); window?.log?.error('LokiSnodeAPI::requestSnodesForPubkey - error', e);
@ -301,8 +320,7 @@ export async function getSnodePoolFromSnode(targetNode: Snode): Promise<Array<Sn
}, },
}, },
}; };
const method = 'oxend_request'; const result = await snodeRpc('oxend_request', params, targetNode);
const result = await snodeRpc(method, params, targetNode);
if (!result || result.status !== 200) { if (!result || result.status !== 200) {
throw new Error('Invalid result'); throw new Error('Invalid result');
} }
@ -339,6 +357,8 @@ export async function getSnodePoolFromSnode(targetNode: Snode): Promise<Array<Sn
export async function storeOnNode(targetNode: Snode, params: SendParams): Promise<boolean> { export async function storeOnNode(targetNode: Snode, params: SendParams): Promise<boolean> {
try { try {
// no retry here. If an issue is with the path this is handled in lokiOnionFetch
// if there is an issue with the targetNode, we still send a few times this request to a few snodes in // already so it's handled
const result = await snodeRpc('store', params, targetNode, params.pubKey); const result = await snodeRpc('store', params, targetNode, params.pubKey);
if (!result || result.status !== 200) { if (!result || result.status !== 200) {
@ -356,6 +376,7 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
return false; return false;
} }
/** */
export async function retrieveNextMessages( export async function retrieveNextMessages(
targetNode: Snode, targetNode: Snode,
lastHash: string, lastHash: string,
@ -367,26 +388,32 @@ export async function retrieveNextMessages(
}; };
// let exceptions bubble up // let exceptions bubble up
const result = await snodeRpc('retrieve', params, targetNode, pubkey); try {
// no retry for this one as this a call we do every few seconds while polling for messages
const result = await snodeRpc('retrieve', params, targetNode, pubkey);
if (!result) { if (!result) {
window?.log?.warn( window?.log?.warn(
`loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}` `loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}`
); );
return []; return [];
} }
if (result.status !== 200) { if (result.status !== 200) {
window.log('retrieve result is not 200'); window.log('retrieve result is not 200');
return []; return [];
} }
try { try {
const json = JSON.parse(result.body); const json = JSON.parse(result.body);
return json.messages || []; return json.messages || [];
} catch (e) { } catch (e) {
window?.log?.warn('exception while parsing json of nextMessage:', e); window?.log?.warn('exception while parsing json of nextMessage:', e);
return [];
}
} catch (e) {
window?.log?.warn('Got an error while retrieving next messages:', e);
return []; return [];
} }
} }

@ -63,8 +63,13 @@ async function lokiFetch(
} }
} }
// Wrapper for a JSON RPC request /**
// Annoyngly, this is used for Lokid requests too * This function will throw for a few reasons.
* The loki-important ones are
* -> if we try to make a request to a path which fails too many times => user will need to retry himself
* -> if the targetNode gets too many errors => we will need to try do to this request again with anoter target node
* The
*/
export async function snodeRpc( export async function snodeRpc(
method: string, method: string,
params: any, params: any,

@ -30,6 +30,8 @@ export interface SnodeResponse {
status: number; status: number;
} }
const NEXT_NODE_NOT_FOUND_PREFIX = 'Next node not found: ';
// Returns the actual ciphertext, symmetric key that will be used // Returns the actual ciphertext, symmetric key that will be used
// for decryption, and an ephemeral_key to send to the next hop // for decryption, and an ephemeral_key to send to the next hop
async function encryptForPubKey(pubKeyX25519hex: string, reqObj: any): Promise<DestinationContext> { async function encryptForPubKey(pubKeyX25519hex: string, reqObj: any): Promise<DestinationContext> {
@ -232,10 +234,10 @@ async function processOnionRequestErrorAtDestination({
process406Error(statusCode); process406Error(statusCode);
await process421Error(statusCode, body, associatedWith, destinationEd25519); await process421Error(statusCode, body, associatedWith, destinationEd25519);
if (destinationEd25519) { if (destinationEd25519) {
await processAnyOtherErrorAtDestination(statusCode, destinationEd25519, associatedWith); await processAnyOtherErrorAtDestination(statusCode, body, destinationEd25519, associatedWith);
} else { } else {
console.warn( console.warn(
'processOnionRequestErrorAtDestination: destinationEd25519 unset. was it a group call?', 'processOnionRequestErrorAtDestination: destinationEd25519 unset. was it an open group call?',
statusCode statusCode
); );
} }
@ -257,16 +259,21 @@ async function processAnyOtherErrorOnPath(
) { ) {
window?.log?.warn(`[path] Got status: ${status}`); window?.log?.warn(`[path] Got status: ${status}`);
// //
const prefix = 'Next node not found: ';
let nodeNotFound; let nodeNotFound;
if (ciphertext?.startsWith(prefix)) { if (ciphertext?.startsWith(NEXT_NODE_NOT_FOUND_PREFIX)) {
nodeNotFound = ciphertext.substr(prefix.length); nodeNotFound = ciphertext.substr(NEXT_NODE_NOT_FOUND_PREFIX.length);
} }
// If we have a specific node in fault we can exclude just this node. // If we have a specific node in fault we can exclude just this node.
// Otherwise we increment the whole path failure count // Otherwise we increment the whole path failure count
if (nodeNotFound) { if (nodeNotFound) {
await incrementBadSnodeCountOrDrop(nodeNotFound, associatedWith); await incrementBadSnodeCountOrDrop({
snodeEd25519: nodeNotFound,
associatedWith,
isNodeNotFound: true,
});
// we are checking errors on the path, a nodeNotFound on the path should trigger a rebuild
} else { } else {
await incrementBadPathCountOrDrop(guardNodeEd25519); await incrementBadPathCountOrDrop(guardNodeEd25519);
} }
@ -276,6 +283,7 @@ async function processAnyOtherErrorOnPath(
async function processAnyOtherErrorAtDestination( async function processAnyOtherErrorAtDestination(
status: number, status: number,
body: string,
destinationEd25519: string, destinationEd25519: string,
associatedWith?: string associatedWith?: string
) { ) {
@ -289,7 +297,25 @@ async function processAnyOtherErrorAtDestination(
) { ) {
window?.log?.warn(`[path] Got status at destination: ${status}`); window?.log?.warn(`[path] Got status at destination: ${status}`);
await incrementBadSnodeCountOrDrop(destinationEd25519, associatedWith); let nodeNotFound;
if (body?.startsWith(NEXT_NODE_NOT_FOUND_PREFIX)) {
nodeNotFound = body.substr(NEXT_NODE_NOT_FOUND_PREFIX.length);
if (nodeNotFound) {
await incrementBadSnodeCountOrDrop({ snodeEd25519: destinationEd25519, associatedWith });
// if we get a nodeNotFound at the desitnation. it means the targetNode to which we made the request is not found.
// We have to retry with another targetNode so it's not just rebuilding the path. We have to go one lever higher (lokiOnionFetch).
// status is 502 for a node not found
throw new pRetry.AbortError(
`Bad Path handled. Retry this request with another targetNode. Status: ${status}`
);
}
}
// If we have a specific node in fault we can exclude just this node.
// Otherwise we increment the whole path failure count
// if (nodeNotFound) {
await incrementBadSnodeCountOrDrop({ snodeEd25519: destinationEd25519, associatedWith });
throw new Error(`Bad Path handled. Retry this request. Status: ${status}`); throw new Error(`Bad Path handled. Retry this request. Status: ${status}`);
} }
@ -496,14 +522,28 @@ async function handle421InvalidSwarm(snodeEd25519: string, body: string, associa
* *
* @param snodeEd25519 the snode ed25519 which cause issues * @param snodeEd25519 the snode ed25519 which cause issues
* @param associatedWith if set, we will drop this snode from the swarm of the pubkey too * @param associatedWith if set, we will drop this snode from the swarm of the pubkey too
* @param isNodeNotFound if set, we will drop this snode right now as this is an invalid node for the network.
*/ */
export async function incrementBadSnodeCountOrDrop(snodeEd25519: string, associatedWith?: string) { export async function incrementBadSnodeCountOrDrop({
snodeEd25519,
associatedWith,
isNodeNotFound,
}: {
snodeEd25519: string;
associatedWith?: string;
isNodeNotFound?: boolean;
}) {
const oldFailureCount = snodeFailureCount[snodeEd25519] || 0; const oldFailureCount = snodeFailureCount[snodeEd25519] || 0;
const newFailureCount = oldFailureCount + 1; const newFailureCount = oldFailureCount + 1;
snodeFailureCount[snodeEd25519] = newFailureCount; snodeFailureCount[snodeEd25519] = newFailureCount;
if (newFailureCount >= snodeFailureThreshold) { if (newFailureCount >= snodeFailureThreshold || isNodeNotFound) {
window?.log?.warn(`Failure threshold reached for: ${snodeEd25519}; dropping it.`); if (isNodeNotFound) {
window?.log?.warn(`Node not found reported for: ${snodeEd25519}; dropping it.`);
} else {
window?.log?.warn(`Failure threshold reached for: ${snodeEd25519}; dropping it.`);
}
if (associatedWith) { if (associatedWith) {
window?.log?.info(`Dropping ${snodeEd25519} from swarm of ${associatedWith}`); window?.log?.info(`Dropping ${snodeEd25519} from swarm of ${associatedWith}`);
await dropSnodeFromSwarmIfNeeded(associatedWith, snodeEd25519); await dropSnodeFromSwarmIfNeeded(associatedWith, snodeEd25519);
@ -753,7 +793,8 @@ export async function lokiOnionFetch(
{ {
retries: 10, retries: 10,
factor: 1, factor: 1,
minTimeout: 1000, minTimeout: 200,
maxTimeout: 2000,
onFailedAttempt: e => { onFailedAttempt: e => {
window?.log?.warn( window?.log?.warn(
`onionFetchRetryable attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...` `onionFetchRetryable attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
@ -766,6 +807,6 @@ export async function lokiOnionFetch(
} catch (e) { } catch (e) {
window?.log?.warn('onionFetchRetryable failed ', e); window?.log?.warn('onionFetchRetryable failed ', e);
console.warn('error to show to user'); console.warn('error to show to user');
return undefined; throw e;
} }
} }

Loading…
Cancel
Save