From d12f6b6d32a368a138655c90c5b291f8845eba40 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Fri, 14 Jun 2019 15:23:07 +1000 Subject: [PATCH 1/2] Handle swarm nodes the same for us or contacts, attach our key to message api object --- js/background.js | 2 ++ js/modules/loki_message_api.js | 31 ++++++++++++++++++------- js/modules/loki_snode_api.js | 42 ---------------------------------- preload.js | 4 +--- 4 files changed, 26 insertions(+), 53 deletions(-) diff --git a/js/background.js b/js/background.js index 6bdb08222..e4c29700c 100644 --- a/js/background.js +++ b/js/background.js @@ -233,6 +233,8 @@ window.libloki.api.sendOnlineBroadcastMessage(pubKey, isPing); }); + window.lokiMessageAPI = new window.LokiMessageAPI(); + const currentPoWDifficulty = storage.get('PoWDifficulty', null); if (!currentPoWDifficulty) { storage.put('PoWDifficulty', window.getDefaultPoWDifficulty()); diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index fabbdceb2..bccd59d4d 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -92,6 +92,7 @@ class LokiMessageAPI { constructor() { this.jobQueue = new window.JobQueue(); this.sendingSwarmNodes = {}; + this.ourKey = window.textsecure.storage.user.getNumber(); } async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) { @@ -228,7 +229,6 @@ class LokiMessageAPI { } async openConnection(callback) { - const ourKey = window.textsecure.storage.user.getNumber(); while (!_.isEmpty(this.ourSwarmNodes)) { const address = Object.keys(this.ourSwarmNodes)[0]; const nodeData = this.ourSwarmNodes[address]; @@ -239,11 +239,7 @@ class LokiMessageAPI { try { // TODO: Revert back to using snode address instead of IP - let messages = await retrieveNextMessages( - nodeData.ip, - nodeData, - ourKey - ); + let messages = await retrieveNextMessages(nodeData.ip, nodeData); successiveFailures = 0; if (messages.length) { const lastMessage = _.last(messages); @@ -263,7 +259,15 @@ class LokiMessageAPI { log.warn('Loki retrieve messages:', e); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; - await lokiSnodeAPI.updateOurSwarmNodes(newSwarm); + await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm); + for (let i = 0; i < newSwarm.length; i += 1) { + const lastHash = await window.Signal.Data.getLastHashBySnode( + newSwarm[i] + ); + this.ourSwarmnewSwarm[newSwarm[i]] = { + lastHash, + }; + } // Try another snode break; } else if (e instanceof textsecure.NotFoundError) { @@ -279,7 +283,18 @@ class LokiMessageAPI { } async startLongPolling(numConnections, callback) { - this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); + this.ourSwarmNodes = {}; + let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); + if (nodes.length < numConnections) { + await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); + nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); + } + for (let i = 0; i < nodes.length; i += 1) { + const lastHash = await window.Signal.Data.getLastHashBySnode(nodes[i]); + this.ourSwarmNodes[nodes[i]] = { + lastHash, + }; + } const promises = []; diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index fc79cd77f..9347ef7d2 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -6,9 +6,6 @@ const dns = require('dns'); const process = require('process'); const { rpc } = require('./loki_rpc'); -// Will be raised (to 3?) when we get more nodes -const MINIMUM_SWARM_NODES = 1; - const resolve4 = url => new Promise((resolve, reject) => { dns.resolve4(url, (err, ip) => { @@ -40,8 +37,6 @@ class LokiSnodeAPI { this.localUrl = localUrl; this.randomSnodePool = []; this.swarmsPendingReplenish = {}; - this.ourSwarmNodes = {}; - this.contactSwarmNodes = {}; // When we package lokinet with messenger we can ensure this ip is correct if (process.platform === 'win32') { dns.setServers(['127.0.0.1']); @@ -92,26 +87,16 @@ class LokiSnodeAPI { } async unreachableNode(pubKey, nodeUrl) { - if (pubKey === window.textsecure.storage.user.getNumber()) { - delete this.ourSwarmNodes[nodeUrl]; - return; - } - const conversation = ConversationController.get(pubKey); const swarmNodes = [...conversation.get('swarmNodes')]; if (swarmNodes.includes(nodeUrl)) { const filteredNodes = swarmNodes.filter(node => node !== nodeUrl); await conversation.updateSwarmNodes(filteredNodes); - delete this.contactSwarmNodes[nodeUrl]; } } async updateLastHash(nodeUrl, lastHash, expiresAt) { await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt }); - if (!this.ourSwarmNodes[nodeUrl]) { - return; - } - this.ourSwarmNodes[nodeUrl].lastHash = lastHash; } getSwarmNodesForPubKey(pubKey) { @@ -137,33 +122,6 @@ class LokiSnodeAPI { } } - async updateOurSwarmNodes(newNodes) { - this.ourSwarmNodes = {}; - const ps = newNodes.map(async snode => { - const lastHash = await window.Signal.Data.getLastHashBySnode( - snode.address - ); - this.ourSwarmNodes[snode.address] = { - lastHash, - port: snode.port, - ip: snode.ip, - }; - }); - await Promise.all(ps); - } - - async getOurSwarmNodes() { - if ( - !this.ourSwarmNodes || - Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES - ) { - const ourKey = window.textsecure.storage.user.getNumber(); - const nodeAddresses = await this.getSwarmNodes(ourKey); - await this.updateOurSwarmNodes(nodeAddresses); - } - return { ...this.ourSwarmNodes }; - } - async refreshSwarmNodesForPubKey(pubKey) { const newNodes = await this.getFreshSwarmNodes(pubKey); this.updateSwarmNodes(pubKey, newNodes); diff --git a/preload.js b/preload.js index 6c80e3f3a..e9e70580d 100644 --- a/preload.js +++ b/preload.js @@ -302,9 +302,7 @@ window.lokiSnodeAPI = new LokiSnodeAPI({ window.LokiP2pAPI = require('./js/modules/loki_p2p_api'); -const LokiMessageAPI = require('./js/modules/loki_message_api'); - -window.lokiMessageAPI = new LokiMessageAPI(); +window.LokiMessageAPI = require('./js/modules/loki_message_api'); const LocalLokiServer = require('./libloki/modules/local_loki_server'); From c5c01b0ac8838b3541d5cf9c40cc96a4f1a5ae5c Mon Sep 17 00:00:00 2001 From: Beaudan Date: Tue, 18 Jun 2019 16:34:58 +1000 Subject: [PATCH 2/2] Purge retrieving snodes --- js/modules/loki_message_api.js | 65 ++++++++++++++++++---------------- js/modules/loki_snode_api.js | 6 ++-- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index bccd59d4d..7dfc99d35 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -66,28 +66,6 @@ const trySendP2p = async (pubKey, data64, isPing, messageEventData) => { } }; -const retrieveNextMessages = async (nodeUrl, nodeData, ourKey) => { - const params = { - pubKey: ourKey, - lastHash: nodeData.lastHash || '', - }; - const options = { - timeout: 40000, - headers: { - [LOKI_LONGPOLL_HEADER]: true, - }, - }; - - const result = await rpc( - `https://${nodeUrl}`, - nodeData.port, - 'retrieve', - params, - options - ); - return result.messages || []; -}; - class LokiMessageAPI { constructor() { this.jobQueue = new window.JobQueue(); @@ -228,7 +206,7 @@ class LokiMessageAPI { return false; } - async openConnection(callback) { + async openRetrieveConnection(callback) { while (!_.isEmpty(this.ourSwarmNodes)) { const address = Object.keys(this.ourSwarmNodes)[0]; const nodeData = this.ourSwarmNodes[address]; @@ -239,12 +217,12 @@ class LokiMessageAPI { try { // TODO: Revert back to using snode address instead of IP - let messages = await retrieveNextMessages(nodeData.ip, nodeData); + let messages = await this.retrieveNextMessages(nodeData.ip, nodeData); successiveFailures = 0; if (messages.length) { const lastMessage = _.last(messages); - nodeData.lashHash = lastMessage.hash; - lokiSnodeAPI.updateLastHash( + nodeData.lastHash = lastMessage.hash; + await lokiSnodeAPI.updateLastHash( address, lastMessage.hash, lastMessage.expiration @@ -264,7 +242,7 @@ class LokiMessageAPI { const lastHash = await window.Signal.Data.getLastHashBySnode( newSwarm[i] ); - this.ourSwarmnewSwarm[newSwarm[i]] = { + this.ourSwarmNodes[newSwarm[i]] = { lastHash, }; } @@ -279,9 +257,34 @@ class LokiMessageAPI { successiveFailures += 1; } } + if (successiveFailures >= 3) { + await lokiSnodeAPI.unreachableNode(this.ourKey, address); + } } } + async retrieveNextMessages(nodeUrl, nodeData) { + const params = { + pubKey: this.ourKey, + lastHash: nodeData.lastHash || '', + }; + const options = { + timeout: 40000, + headers: { + [LOKI_LONGPOLL_HEADER]: true, + }, + }; + + const result = await rpc( + `https://${nodeUrl}`, + nodeData.port, + 'retrieve', + params, + options + ); + return result.messages || []; + }; + async startLongPolling(numConnections, callback) { this.ourSwarmNodes = {}; let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); @@ -290,16 +293,18 @@ class LokiMessageAPI { nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); } for (let i = 0; i < nodes.length; i += 1) { - const lastHash = await window.Signal.Data.getLastHashBySnode(nodes[i]); - this.ourSwarmNodes[nodes[i]] = { + const lastHash = await window.Signal.Data.getLastHashBySnode(nodes[i].address); + this.ourSwarmNodes[nodes[i].address] = { lastHash, + ip: nodes[i].ip, + port: nodes[i].port, }; } const promises = []; for (let i = 0; i < numConnections; i += 1) - promises.push(this.openConnection(callback)); + promises.push(this.openRetrieveConnection(callback)); // blocks until all snodes in our swarms have been removed from the list // or if there is network issues (ENOUTFOUND due to lokinet) diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 9347ef7d2..4a23208c3 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -89,10 +89,8 @@ class LokiSnodeAPI { async unreachableNode(pubKey, nodeUrl) { const conversation = ConversationController.get(pubKey); const swarmNodes = [...conversation.get('swarmNodes')]; - if (swarmNodes.includes(nodeUrl)) { - const filteredNodes = swarmNodes.filter(node => node !== nodeUrl); - await conversation.updateSwarmNodes(filteredNodes); - } + const filteredNodes = swarmNodes.filter(node => node.address !== nodeUrl); + await conversation.updateSwarmNodes(filteredNodes); } async updateLastHash(nodeUrl, lastHash, expiresAt) {