diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 175fdc479..75e47a075 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -12,6 +12,7 @@ const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll'; class LokiMessageAPI { constructor({ snodeServerPort }) { this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; + this.jobQueue = new window.JobQueue(); } async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) { @@ -162,6 +163,22 @@ class LokiMessageAPI { let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); + const filterIncomingMessages = async messages => { + const incomingHashes = messages.map(m => m.hash); + const dupHashes = await window.Signal.Data.getSeenMessagesByHashList( + incomingHashes + ); + const newMessages = messages.filter(m => !dupHashes.includes(m.hash)); + const newHashes = newMessages.map(m => ({ + expiresAt: m.expiration, + hash: m.hash, + })); + if (newHashes.length) { + await window.Signal.Data.saveSeenMessageHashes(newHashes); + } + return newMessages; + }; + const nodeComplete = nodeUrl => { completedNodes.push(nodeUrl); delete ourSwarmNodes[nodeUrl]; @@ -189,18 +206,23 @@ class LokiMessageAPI { ); nodeComplete(nodeUrl); + successfulRequests += 1; if (Array.isArray(result.messages) && result.messages.length) { - const lastHash = _.last(result.messages).hash; - lokiSnodeAPI.updateLastHash(nodeUrl, lastHash); - callback(result.messages); + const lastMessage = _.last(result.messages); + lokiSnodeAPI.updateLastHash(nodeUrl, lastMessage.hash, lastMessage.expiration); + const filteredMessages = await this.jobQueue.add(() => + filterIncomingMessages(result.messages) + ); + if (filteredMessages.length) { + callback(filteredMessages); + } } - successfulRequests += 1; } catch (e) { log.warn('Loki retrieve messages:', e); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; - lokiSnodeAPI.updateOurSwarmNodes(newSwarm); + await lokiSnodeAPI.updateOurSwarmNodes(newSwarm); completedNodes.push(nodeUrl); } else if (e instanceof textsecure.NotFoundError) { canResolve = false; diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 8edd758ba..79667f13f 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -106,13 +106,15 @@ class LokiSnodeAPI { return true; } - updateLastHash(nodeUrl, hash) { + async updateLastHash(nodeUrl, lastHash, expiresAt) { + await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt }); if (!this.ourSwarmNodes[nodeUrl]) { this.ourSwarmNodes[nodeUrl] = { - lastHash: hash, + failureCount: 0, + lastHash, }; } else { - this.ourSwarmNodes[nodeUrl].lastHash = hash; + this.ourSwarmNodes[nodeUrl].lastHash = lastHash; } } @@ -139,13 +141,16 @@ class LokiSnodeAPI { } } - updateOurSwarmNodes(newNodes) { + async updateOurSwarmNodes(newNodes) { this.ourSwarmNodes = {}; - newNodes.forEach(url => { + const ps = newNodes.map(async url => { + const lastHash = await window.Signal.Data.getLastHashBySnode(url); this.ourSwarmNodes[url] = { failureCount: 0, + lastHash, }; }); + await Promise.all(ps); } async getOurSwarmNodes() { @@ -153,16 +158,9 @@ class LokiSnodeAPI { !this.ourSwarmNodes || Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES ) { - this.ourSwarmNodes = {}; - // Try refresh our swarm list once const ourKey = window.textsecure.storage.user.getNumber(); const nodeAddresses = await this.getSwarmNodes(ourKey); - - nodeAddresses.forEach(url => { - this.ourSwarmNodes[url] = { - failureCount: 0, - }; - }); + await this.updateOurSwarmNodes(nodeAddresses); } return { ...this.ourSwarmNodes }; } diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index 65237f750..02f1eb95b 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -41,22 +41,6 @@ }; }; - const filterIncomingMessages = async function filterIncomingMessages( - messages - ) { - const incomingHashes = messages.map(m => m.hash); - const dupHashes = await window.Signal.Data.getSeenMessagesByHashList( - incomingHashes - ); - const newMessages = messages.filter(m => !dupHashes.includes(m.hash)); - const newHashes = newMessages.map(m => ({ - expiresAt: m.expiration, - hash: m.hash, - })); - await window.Signal.Data.saveSeenMessageHashes(newHashes); - return newMessages; - }; - window.HttpResource = function HttpResource(_server, opts = {}) { server = _server; let { handleRequest } = opts; @@ -64,17 +48,6 @@ handleRequest = request => request.respond(404, 'Not found'); } let connected = true; - const jobQueue = new window.JobQueue(); - - const processMessages = async messages => { - const newMessages = await jobQueue.add(() => - filterIncomingMessages(messages) - ); - newMessages.forEach(async message => { - const { data } = message; - this.handleMessage(data); - }); - }; this.handleMessage = (message, options = {}) => { try { @@ -104,16 +77,21 @@ } }; - this.startPolling = async function pollServer(callback) { + this.pollServer = async callback => { try { - await server.retrieveMessages(processMessages); + await server.retrieveMessages(messages => { + messages.forEach(message => { + const { data } = message; + this.handleMessage(data); + }); + }); connected = true; } catch (err) { connected = false; } callback(connected); setTimeout(() => { - pollServer(callback); + this.pollServer(callback); }, POLL_TIME); }; diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index d689c4416..e83ff7509 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -73,7 +73,7 @@ MessageReceiver.prototype.extend({ this.httpPollingResource = new HttpResource(lokiMessageAPI, { handleRequest: this.handleRequest.bind(this), }); - this.httpPollingResource.startPolling(connected => { + this.httpPollingResource.pollServer(connected => { // Emulate receiving an 'empty' websocket messages from the server. // This is required to update the internal logic that checks // if we are connected to the server. Without this, for example,