change unreachableNode 2nd param to object, put lock around getRandomSnodeAddress so they can't stack, markRandomNodeUnreachable returns remaining count, adjust logging

pull/988/head
Ryan Tharp 5 years ago
parent 1c78e1a701
commit e3545fa338

@ -15,6 +15,7 @@ class LokiSnodeAPI {
this.localUrl = localUrl; // localhost.loki
this.randomSnodePool = [];
this.swarmsPendingReplenish = {};
this.initialiseRandomPoolPromise = false;
}
async getRandomSnodeAddress() {
@ -35,70 +36,99 @@ class LokiSnodeAPI {
seedNodes = [...window.seedNodeList],
consecutiveErrors = 0
) {
const params = {
limit: 20,
active_only: true,
fields: {
public_ip: true,
storage_port: true,
pubkey_x25519: true,
pubkey_ed25519: true,
},
};
const seedNode = seedNodes.splice(
Math.floor(Math.random() * seedNodes.length),
1
)[0];
let snodes = [];
try {
const response = await lokiRpc(
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
} catch (e) {
log.warn('initialiseRandomPool error', e.code, e.message);
if (consecutiveErrors < 3) {
// retry after a possible delay
setTimeout(() => {
log.info(
'Retrying initialising random snode pool, try #',
consecutiveErrors
// if currently not in progress
if (this.initialiseRandomPoolPromise === false) {
// FIXME: add timeout
// set lock
this.initialiseRandomPoolPromise = new Promise(async resolve => {
const params = {
limit: 1024,
active_only: true,
fields: {
public_ip: true,
storage_port: true,
pubkey_x25519: true,
pubkey_ed25519: true,
},
};
const seedNode = seedNodes.splice(
Math.floor(Math.random() * seedNodes.length),
1
)[0];
let snodes = [];
try {
log.info('loki_snodes: Refreshing random snode pool');
const response = await lokiRpc(
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
this.initialiseRandomPool(seedNodes, consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
log.error('Giving up trying to contact seed node');
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError(
'Failed to contact seed node'
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
log.info('loki_snodes: Refreshed random snode pool with', this.randomSnodePool.length, 'snodes');
} catch (e) {
log.warn('loki_snodes: initialiseRandomPool error', e.code, e.message);
if (consecutiveErrors < 3) {
// retry after a possible delay
setTimeout(() => {
log.info(
'loki_snodes: Retrying initialising random snode pool, try #',
consecutiveErrors
);
this.initialiseRandomPool(seedNodes, consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
log.error('loki_snodes: Giving up trying to contact seed node');
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError(
'Failed to contact seed node'
);
}
}
}
}
// clear lock
this.initialiseRandomPoolPromise = null;
resolve();
})
}
await this.initialiseRandomPoolPromise;
}
// nodeUrl is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode
async unreachableNode(pubKey, nodeUrl) {
// unreachableNode.url is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode
async unreachableNode(pubKey, unreachableNode) {
const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')];
if (typeof(unreachableNode) === 'string') {
log.warn('loki_snodes::unreachableNode: String passed as unreachableNode to unreachableNode');
return swarmNodes;
}
let found = false
const filteredNodes = swarmNodes.filter(
node => node.address !== nodeUrl && node.ip !== nodeUrl
node => {
// keep all but thisNode
const thisNode = (node.address === unreachableNode.address && node.ip === unreachableNode.ip && node.port === unreachableNode.port)
if (thisNode) {
found = true
}
return !thisNode
}
);
if (!found) {
log.warn(`loki_snodes::unreachableNode snode ${unreachableNode.ip}:${unreachableNode.port} has already been marked as bad`);
}
await conversation.updateSwarmNodes(filteredNodes);
return filteredNodes;
}
markRandomNodeUnreachable(snode) {
@ -106,6 +136,7 @@ class LokiSnodeAPI {
this.randomSnodePool,
_.find(this.randomSnodePool, { ip: snode.ip, port: snode.port })
);
return this.randomSnodePool.length;
}
async updateLastHash(snode, hash, expiresAt) {
@ -150,7 +181,7 @@ class LokiSnodeAPI {
try {
newSwarmNodes = await this.getSwarmNodes(pubKey);
} catch (e) {
log.error('getFreshSwarmNodes error', e.code, e.message);
log.error('loki_snodes: getFreshSwarmNodes error', e.code, e.message);
// TODO: Handle these errors sensibly
newSwarmNodes = [];
}
@ -184,16 +215,25 @@ class LokiSnodeAPI {
);
return [];
}
if (!result.snodes) {
log.warn(
`getSnodesForPubkey lokiRpc on ${snode.ip}:${
snode.port
} returned falsish value for snodes`,
result
);
return [];
}
const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0');
return snodes;
} catch (e) {
const randomPoolRemainingCount = this.markRandomNodeUnreachable(snode);
log.error(
'getSnodesForPubkey error',
'loki_snodes: getSnodesForPubkey error',
e.code,
e.message,
`for ${snode.ip}:${snode.port}`
`for ${snode.ip}:${snode.port}. ${randomPoolRemainingCount} snodes remaining in randomPool`
);
this.markRandomNodeUnreachable(snode);
return [];
}
}

Loading…
Cancel
Save