diff --git a/js/conversation_controller.js b/js/conversation_controller.js index feaa2e1eb..c92f07c9c 100644 --- a/js/conversation_controller.js +++ b/js/conversation_controller.js @@ -181,8 +181,9 @@ return conversation; } - window.LokiSnodeAPI.replenishSwarm(id); try { + const swarmNodes = await window.LokiSnodeAPI.getFreshSwarmNodes(id); + conversation.set({ swarmNodes}); await window.Signal.Data.saveConversation(conversation.attributes, { Conversation: Whisper.Conversation, }); diff --git a/js/models/messages.js b/js/models/messages.js index fe684e961..790349056 100644 --- a/js/models/messages.js +++ b/js/models/messages.js @@ -847,6 +847,8 @@ e.name === 'SendMessageNetworkError' || e.name === 'SignedPreKeyRotationError' || e.name === 'OutgoingIdentityKeyError' || + e.name === 'DNSResolutionError' || + e.name === 'EmptySwarmError' || e.name === 'PoWError' ); }, diff --git a/js/modules/data.js b/js/modules/data.js index 1d937b521..e8afe5b14 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -662,19 +662,15 @@ async function removeAllSessions(id) { function setifyProperty(data, propertyName) { if (!data) return data; - const returnData = data; - if (returnData[propertyName]) { + const returnData = { ...data }; + if (Array.isArray(returnData[propertyName])) { returnData[propertyName] = new Set(returnData[propertyName]); } return returnData; } async function getSwarmNodesByPubkey(pubkey) { - let swarmNodes = await channels.getSwarmNodesByPubkey(pubkey); - if (Array.isArray(swarmNodes)) { - swarmNodes = new Set(swarmNodes); - } - return swarmNodes; + return channels.getSwarmNodesByPubkey(pubkey); } async function getConversationCount() { @@ -682,11 +678,7 @@ async function getConversationCount() { } async function saveConversation(data) { - const storeData = data; - if (storeData.swarmNodes) { - storeData.swarmNodes = Array.from(storeData.swarmNodes); - } - await channels.saveConversation(storeData); + await channels.saveConversation(data); } async function saveConversations(data) { @@ -694,8 +686,7 @@ async function saveConversations(data) { } async function getConversationById(id, { Conversation }) { - const rawData = await channels.getConversationById(id); - const data = setifyProperty(rawData, 'swarmNodes'); + const data = await channels.getConversationById(id); return new Conversation(data); } @@ -704,8 +695,10 @@ async function updateConversation(id, data, { Conversation }) { if (!existing) { throw new Error(`Conversation ${id} does not exist!`); } + const setData = setifyProperty(data, 'swarmNodes'); + const setExisting = setifyProperty(existing.attributes, 'swarmNodes'); - const merged = merge({}, existing.attributes, data); + const merged = merge({}, setExisting, setData); if (merged.swarmNodes instanceof Set) { merged.swarmNodes = Array.from(merged.swarmNodes); } @@ -729,9 +722,7 @@ async function _removeConversations(ids) { } async function getAllConversations({ ConversationCollection }) { - const conversations = (await channels.getAllConversations()).map(c => - setifyProperty(c, 'swarmNodes') - ); + const conversations = await channels.getAllConversations(); const collection = new ConversationCollection(); collection.add(conversations); @@ -744,9 +735,7 @@ async function getAllConversationIds() { } async function getAllPrivateConversations({ ConversationCollection }) { - const conversations = (await channels.getAllPrivateConversations()).map(c => - setifyProperty(c, 'swarmNodes') - ); + const conversations = await channels.getAllPrivateConversations(); const collection = new ConversationCollection(); collection.add(conversations); @@ -754,9 +743,7 @@ async function getAllPrivateConversations({ ConversationCollection }) { } async function getAllGroupsInvolvingId(id, { ConversationCollection }) { - const conversations = (await channels.getAllGroupsInvolvingId(id)).map(c => - setifyProperty(c, 'swarmNodes') - ); + const conversations = await channels.getAllGroupsInvolvingId(id); const collection = new ConversationCollection(); collection.add(conversations); @@ -764,9 +751,7 @@ async function getAllGroupsInvolvingId(id, { ConversationCollection }) { } async function searchConversations(query, { ConversationCollection }) { - const conversations = (await channels.searchConversations(query)).map(c => - setifyProperty(c, 'swarmNodes') - ); + const conversations = await channels.searchConversations(query); const collection = new ConversationCollection(); collection.add(conversations); diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 1a3a47c54..dfd67cc90 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -1,25 +1,18 @@ /* eslint-disable no-await-in-loop */ +/* eslint-disable no-loop-func */ /* global log, dcodeIO, window, callWorker */ const fetch = require('node-fetch'); -// eslint-disable-next-line -const invert = p => new Promise((res, rej) => p.then(rej, res)); -const firstOf = ps => invert(Promise.all(ps.map(invert))); - // Will be raised (to 3?) when we get more nodes const MINIMUM_SUCCESSFUL_REQUESTS = 2; + class LokiMessageAPI { constructor({ messageServerPort }) { this.messageServerPort = messageServerPort ? `:${messageServerPort}` : ''; } async sendMessage(pubKey, data, messageTimeStamp, ttl) { - const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey); - if (!swarmNodes || swarmNodes.size === 0) { - throw Error('No swarm nodes to query!'); - } - const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); const timestamp = Math.floor(Date.now() / 1000); // Nonce is returned as a base64 string to include in header @@ -42,14 +35,17 @@ class LokiMessageAPI { // Something went horribly wrong throw err; } + const completedNodes = []; + let successfulRequests = 0; + let canResolve = true; - const requests = Array.from(swarmNodes).map(async node => { + const doRequest = async nodeUrl => { // TODO: Confirm sensible timeout const options = { - url: `${node}${this.messageServerPort}/store`, + url: `${nodeUrl}${this.messageServerPort}/store`, type: 'POST', responseType: undefined, - timeout: 5000, + timeout: 10000, }; const fetchOptions = { @@ -68,9 +64,17 @@ class LokiMessageAPI { try { response = await fetch(options.url, fetchOptions); } catch (e) { + if (e.code === 'ENOTFOUND') { + // TODO: Handle the case where lokinet is not working + canResolve = false; + return; + } log.error(options.type, options.url, 0, 'Error sending message'); - window.LokiSnodeAPI.unreachableNode(pubKey, node); - throw HTTPError('fetch error', 0, e.toString()); + if (window.LokiSnodeAPI.unreachableNode(pubKey, nodeUrl)) { + completedNodes.push(nodeUrl); + swarmNodes = swarmNodes.filter(node => node !== nodeUrl); + } + return; } let result; @@ -86,7 +90,10 @@ class LokiMessageAPI { } if (response.status >= 0 && response.status < 400) { - return result; + completedNodes.push(nodeUrl); + swarmNodes = swarmNodes.filter(node => node !== nodeUrl); + successfulRequests += 1; + return; } log.error( options.type, @@ -95,19 +102,48 @@ class LokiMessageAPI { 'Error sending message' ); throw HTTPError('sendMessage: error response', response.status, result); - }); + }; + + let swarmNodes; try { - // TODO: Possibly change this to require more than a single response? - const result = await firstOf(requests); - return result; - } catch (err) { - throw err; + swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey); + } catch (e) { + throw new window.textsecure.EmptySwarmError(pubKey, e); + } + while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) { + if (!canResolve) { + throw new window.textsecure.DNSResolutionError('Sending messages'); + } + if (!swarmNodes || swarmNodes.length === 0) { + swarmNodes = await window.LokiSnodeAPI.getFreshSwarmNodes(pubKey); + swarmNodes = swarmNodes.filter(node => !(node in completedNodes)); + if (!swarmNodes || swarmNodes.length === 0) { + if (successfulRequests !== 0) { + // TODO: Decide how to handle some completed requests but not enough + return; + } + throw new window.textsecure.EmptySwarmError( + pubKey, + new Error('Ran out of swarm nodes to query') + ); + } + await window.LokiSnodeAPI.saveSwarmNodes(pubKey, swarmNodes); + } + const remainingRequests = + MINIMUM_SUCCESSFUL_REQUESTS - completedNodes.length; + await Promise.all( + swarmNodes + .splice(0, remainingRequests) + .map(nodeUrl => doRequest(nodeUrl)) + ); } } async retrieveMessages(callback) { const ourKey = window.textsecure.storage.user.getNumber(); - let completedRequests = 0; + const completedNodes = []; + let canResolve = true; + let successfulRequests = 0; const doRequest = async (nodeUrl, nodeData) => { // TODO: Confirm sensible timeout @@ -115,7 +151,7 @@ class LokiMessageAPI { url: `${nodeUrl}${this.messageServerPort}/retrieve`, type: 'GET', responseType: 'json', - timeout: 5000, + timeout: 10000, }; const headers = { @@ -135,15 +171,21 @@ class LokiMessageAPI { try { response = await fetch(options.url, fetchOptions); } catch (e) { - // TODO: Maybe we shouldn't immediately delete? - // And differentiate between different connectivity issues + if (e.code === 'ENOTFOUND') { + // TODO: Handle the case where lokinet is not working + canResolve = false; + return; + } log.error( options.type, options.url, 0, `Error retrieving messages from ${nodeUrl}` ); - window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl); + if (window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl)) { + completedNodes.push(nodeUrl); + delete ourSwarmNodes[nodeUrl]; + } return; } @@ -158,29 +200,59 @@ class LokiMessageAPI { } else { result = await response.text(); } - completedRequests += 1; + completedNodes.push(nodeUrl); + delete ourSwarmNodes[nodeUrl]; if (response.status === 200) { if (result.lastHash) { window.LokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash); callback(result.messages); } + successfulRequests += 1; return; } // Handle error from snode log.error(options.type, options.url, response.status, 'Error'); }; - while (completedRequests < MINIMUM_SUCCESSFUL_REQUESTS) { - const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests; - const ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); - if (Object.keys(ourSwarmNodes).length < remainingRequests) { - // This means we don't have enough swarm nodes to meet the minimum threshold - if (completedRequests !== 0) { - // TODO: Decide how to handle some completed requests but not enough + let ourSwarmNodes; + try { + ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); + } catch (e) { + throw new window.textsecure.EmptySwarmError( + window.textsecure.storage.user.getNumber(), + e + ); + } + while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) { + if (!canResolve) { + throw new window.textsecure.DNSResolutionError('Retrieving messages'); + } + if (Object.keys(ourSwarmNodes).length === 0) { + try { + ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); + // Filter out the nodes we have already got responses from + completedNodes.forEach(nodeUrl => delete ourSwarmNodes[nodeUrl]); + } catch (e) { + throw new window.textsecure.EmptySwarmError( + window.textsecure.storage.user.getNumber(), + e + ); + } + if (Object.keys(ourSwarmNodes).length === 0) { + if (successfulRequests !== 0) { + // TODO: Decide how to handle some completed requests but not enough + return; + } + throw new window.textsecure.EmptySwarmError( + window.textsecure.storage.user.getNumber(), + new Error('Ran out of swarm nodes to query') + ); } } + const remainingRequests = + MINIMUM_SUCCESSFUL_REQUESTS - completedNodes.length; await Promise.all( Object.entries(ourSwarmNodes) .splice(0, remainingRequests) diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 80b999909..d6000c3e8 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -1,3 +1,4 @@ +/* eslint-disable class-methods-use-this */ /* global log, window, Whisper */ const fetch = require('node-fetch'); @@ -6,6 +7,7 @@ const dns = require('dns'); // Will be raised (to 3?) when we get more nodes const MINIMUM_SWARM_NODES = 1; +const FAILURE_THRESHOLD = 3; class LokiSnodeAPI { constructor({ url, swarmServerPort }) { @@ -16,6 +18,7 @@ class LokiSnodeAPI { this.swarmServerPort = swarmServerPort ? `:${swarmServerPort}` : ''; this.swarmsPendingReplenish = {}; this.ourSwarmNodes = {}; + this.contactSwarmNodes = {}; } getRandomSnodeAddress() { @@ -33,8 +36,27 @@ class LokiSnodeAPI { async unreachableNode(pubKey, nodeUrl) { if (pubKey === window.textsecure.storage.user.getNumber()) { - delete this.ourSwarmNodes[nodeUrl]; - return; + if (!this.ourSwarmNodes[nodeUrl]) { + this.ourSwarmNodes[nodeUrl] = { + failureCount: 1, + }; + } else { + this.ourSwarmNodes[nodeUrl].failureCount += 1; + } + if (this.ourSwarmNodes[nodeUrl].failureCount >= FAILURE_THRESHOLD) { + delete this.ourSwarmNodes[nodeUrl]; + } + return false; + } + if (!this.contactSwarmNodes[nodeUrl]) { + this.contactSwarmNodes[nodeUrl] = { + failureCount: 1, + }; + } else { + this.contactSwarmNodes[nodeUrl].failureCount += 1; + } + if (this.contactSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) { + return false; } const conversation = window.ConversationController.get(pubKey); const swarmNodes = conversation.get('swarmNodes'); @@ -47,7 +69,9 @@ class LokiSnodeAPI { Conversation: Whisper.Conversation, } ); + delete this.contactSwarmNodes[nodeUrl]; } + return true; } updateLastHash(nodeUrl, hash) { @@ -74,7 +98,9 @@ class LokiSnodeAPI { } nodeAddresses.forEach(url => { - this.ourSwarmNodes[url] = {}; + this.ourSwarmNodes[url] = { + failureCount: 0, + }; }); } return this.ourSwarmNodes; @@ -82,32 +108,34 @@ class LokiSnodeAPI { async getSwarmNodesByPubkey(pubKey) { const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey); - // TODO: Check if swarm list is below a threshold rather than empty - if (swarmNodes && swarmNodes.size !== 0) { + if (swarmNodes) { return swarmNodes; } - return this.replenishSwarm(pubKey); + return []; } - async replenishSwarm(pubKey) { + async saveSwarmNodes(pubKey, swarmNodes) { const conversation = window.ConversationController.get(pubKey); + conversation.set({ swarmNodes }); + await window.Signal.Data.updateConversation( + conversation.id, + conversation.attributes, + { + Conversation: Whisper.Conversation, + } + ); + } + + async getFreshSwarmNodes(pubKey) { if (!(pubKey in this.swarmsPendingReplenish)) { this.swarmsPendingReplenish[pubKey] = new Promise(async resolve => { let newSwarmNodes; try { - newSwarmNodes = new Set(await this.getSwarmNodes(pubKey)); + newSwarmNodes = await this.getSwarmNodes(pubKey); } catch (e) { // TODO: Handle these errors sensibly - newSwarmNodes = new Set([]); + newSwarmNodes = []; } - conversation.set({ swarmNodes: newSwarmNodes }); - await window.Signal.Data.updateConversation( - conversation.id, - conversation.attributes, - { - Conversation: Whisper.Conversation, - } - ); resolve(newSwarmNodes); }); } @@ -124,7 +152,7 @@ class LokiSnodeAPI { url: `http://${node}${this.swarmServerPort}/json_rpc`, type: 'POST', responseType: 'json', - timeout: 5000, + timeout: 10000, }; const body = { @@ -155,7 +183,7 @@ class LokiSnodeAPI { 0, `Error getting swarm nodes for ${pubKey}` ); - throw HTTPError('fetch error', 0, e.toString()); + throw HTTPError('getSwarmNodes fetch error', 0, e.toString()); } let result; @@ -179,7 +207,7 @@ class LokiSnodeAPI { response.status, `Error getting swarm nodes for ${pubKey}` ); - throw HTTPError('sendMessage: error response', response.status, result); + throw HTTPError('getSwarmNodes: error response', response.status, result); } } diff --git a/libtextsecure/errors.js b/libtextsecure/errors.js index 51c879afe..7f972492e 100644 --- a/libtextsecure/errors.js +++ b/libtextsecure/errors.js @@ -127,6 +127,21 @@ } inherit(Error, UnregisteredUserError); + function EmptySwarmError(number, error) { + // eslint-disable-next-line prefer-destructuring + this.number = number.split('.')[0]; + + ReplayableError.call(this, { + name: 'EmptySwarmError', + message: 'Could not get any swarm nodes to query', + }); + + if (error) { + appendStack(this, error); + } + } + inherit(ReplayableError, EmptySwarmError); + function PoWError(number, error) { // eslint-disable-next-line prefer-destructuring this.number = number.split('.')[0]; @@ -142,6 +157,16 @@ } inherit(ReplayableError, PoWError); + function DNSResolutionError(message) { + // eslint-disable-next-line prefer-destructuring + + ReplayableError.call(this, { + name: 'DNSResolutionError', + message: `Error resolving url: ${message}`, + }); + } + inherit(ReplayableError, DNSResolutionError); + window.textsecure.UnregisteredUserError = UnregisteredUserError; window.textsecure.SendMessageNetworkError = SendMessageNetworkError; window.textsecure.IncomingIdentityKeyError = IncomingIdentityKeyError; @@ -151,4 +176,6 @@ window.textsecure.MessageError = MessageError; window.textsecure.SignedPreKeyRotationError = SignedPreKeyRotationError; window.textsecure.PoWError = PoWError; + window.textsecure.EmptySwarmError = EmptySwarmError; + window.textsecure.DNSResolutionError = DNSResolutionError; })(); diff --git a/libtextsecure/outgoing_message.js b/libtextsecure/outgoing_message.js index be11d8905..668e4067e 100644 --- a/libtextsecure/outgoing_message.js +++ b/libtextsecure/outgoing_message.js @@ -186,13 +186,7 @@ OutgoingMessage.prototype = { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) { const pubKey = number; try { - const result = await this.lokiMessageAPI.sendMessage( - pubKey, - data, - timestamp, - ttl - ); - return result; + await this.lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl); } catch (e) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { // 409 and 410 should bubble and be handled by doSendMessage