/* eslint-disable no-await-in-loop */ /* eslint-disable no-loop-func */ /* global log, dcodeIO, window, callWorker, lokiP2pAPI, lokiSnodeAPI, textsecure */ const _ = require('lodash'); const { rpc } = require('./loki_fetch'); // Will be raised (to 3?) when we get more nodes const MINIMUM_SUCCESSFUL_REQUESTS = 2; class LokiMessageAPI { constructor({ snodeServerPort }) { this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; } async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) { const timestamp = Math.floor(Date.now() / 1000); // Data required to identify a message in a conversation const messageEventData = { pubKey, timestamp: messageTimeStamp, }; const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey); if (p2pDetails && (isPing || p2pDetails.isOnline)) { try { const port = p2pDetails.port ? `:${p2pDetails.port}` : ''; await rpc(p2pDetails.address, port, 'store', { data: data64, }); lokiP2pAPI.setContactOnline(pubKey); window.Whisper.events.trigger('p2pMessageSent', messageEventData); if (isPing) { log.info(`Successfully pinged ${pubKey}`); } else { log.info(`Successful p2p message to ${pubKey}`); } return; } catch (e) { lokiP2pAPI.setContactOffline(pubKey); if (isPing) { // If this was just a ping, we don't bother sending to storage server log.warn('Ping failed, contact marked offline', e); return; } log.warn('Failed to send P2P message, falling back to storage', e); } } // Nonce is returned as a base64 string to include in header let nonce; try { window.Whisper.events.trigger('calculatingPoW', messageEventData); const development = window.getEnvironment() !== 'production'; nonce = await callWorker( 'calcPoW', timestamp, ttl, pubKey, data64, development ); } catch (err) { // Something went horribly wrong throw err; } const completedNodes = []; const failedNodes = []; let successfulRequests = 0; let canResolve = true; let swarmNodes = await lokiSnodeAPI.getSwarmNodesForPubKey(pubKey); const nodeComplete = nodeUrl => { completedNodes.push(nodeUrl); swarmNodes = swarmNodes.filter(node => node !== nodeUrl); }; const doRequest = async nodeUrl => { const params = { pubKey, ttl: ttl.toString(), nonce, timestamp: timestamp.toString(), data: data64, }; try { await rpc(`http://${nodeUrl}`, this.snodeServerPort, 'store', params); nodeComplete(nodeUrl); successfulRequests += 1; } catch (e) { log.warn('Loki send message:', e); if (e instanceof textsecure.NotFoundError) { canResolve = false; } else if (e instanceof textsecure.HTTPError) { // We mark the node as complete as we could still reach it nodeComplete(nodeUrl); } else { const removeNode = await lokiSnodeAPI.unreachableNode( pubKey, nodeUrl ); if (removeNode) { log.error('Loki send message:', e); nodeComplete(nodeUrl); failedNodes.push(nodeUrl); } } } }; while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) { if (!canResolve) { throw new window.textsecure.DNSResolutionError('Sending messages'); } if (swarmNodes.length === 0) { const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey); const goodNodes = _.difference(freshNodes, failedNodes); await lokiSnodeAPI.updateSwarmNodes(pubKey, goodNodes); swarmNodes = _.difference(freshNodes, completedNodes); if (swarmNodes.length === 0) { if (successfulRequests !== 0) { // TODO: Decide how to handle some completed requests but not enough log.warn(`Partially successful storage message to ${pubKey}`); return; } throw new window.textsecure.EmptySwarmError( pubKey, 'Ran out of swarm nodes to query' ); } } const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests; await Promise.all( swarmNodes .splice(0, remainingRequests) .map(nodeUrl => doRequest(nodeUrl)) ); } log.info(`Successful storage message to ${pubKey}`); } async retrieveMessages(callback) { const ourKey = window.textsecure.storage.user.getNumber(); const completedNodes = []; let canResolve = true; let successfulRequests = 0; let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); const nodeComplete = nodeUrl => { completedNodes.push(nodeUrl); delete ourSwarmNodes[nodeUrl]; }; const doRequest = async (nodeUrl, nodeData) => { const params = { pubKey: ourKey, lastHash: nodeData.lastHash || '', }; try { const result = await rpc( `http://${nodeUrl}`, this.snodeServerPort, 'retrieve', params ); nodeComplete(nodeUrl); if (Array.isArray(result.messages) && result.messages.length) { const lastHash = [...result.messages].pop().hash; lokiSnodeAPI.updateLastHash(nodeUrl, lastHash); callback(result.messages); } successfulRequests += 1; } catch (e) { log.warn('Loki retrieve messages:', e); if (e instanceof textsecure.NotFoundError) { canResolve = false; } else if (e instanceof textsecure.HTTPError) { // We mark the node as complete as we could still reach it nodeComplete(nodeUrl); } else { const removeNode = await lokiSnodeAPI.unreachableNode( ourKey, nodeUrl ); if (removeNode) { log.error('Loki retrieve messages:', e); nodeComplete(nodeUrl); } } } }; while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) { if (!canResolve) { throw new window.textsecure.DNSResolutionError('Retrieving messages'); } if (Object.keys(ourSwarmNodes).length === 0) { ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); // Filter out the nodes we have already got responses from completedNodes.forEach(nodeUrl => delete ourSwarmNodes[nodeUrl]); if (Object.keys(ourSwarmNodes).length === 0) { if (successfulRequests !== 0) { // TODO: Decide how to handle some completed requests but not enough return; } throw new window.textsecure.EmptySwarmError( ourKey, 'Ran out of swarm nodes to query' ); } } const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests; await Promise.all( Object.entries(ourSwarmNodes) .splice(0, remainingRequests) .map(([nodeUrl, nodeData]) => doRequest(nodeUrl, nodeData)) ); } } } module.exports = LokiMessageAPI;