/* eslint-disable no-await-in-loop */
/* eslint-disable no-loop-func */
/* global log, dcodeIO, window, callWorker, lokiSnodeAPI, textsecure */
const _ = require ( 'lodash' ) ;
const { lokiRpc } = require ( './loki_rpc' ) ;
const primitives = require ( './loki_primitives' ) ;
const DEFAULT _CONNECTIONS = 3 ;
const MAX _ACCEPTABLE _FAILURES = 10 ;
const filterIncomingMessages = async messages => {
const incomingHashes = messages . map ( m => m . hash ) ;
const dupHashes = await window . Signal . Data . getSeenMessagesByHashList (
incomingHashes
) ;
const newMessages = messages . filter ( m => ! dupHashes . includes ( m . hash ) ) ;
if ( newMessages . length ) {
const newHashes = newMessages . map ( m => ( {
expiresAt : m . expiration ,
hash : m . hash ,
} ) ) ;
await window . Signal . Data . saveSeenMessageHashes ( newHashes ) ;
}
return newMessages ;
} ;
const calcNonce = ( messageEventData , pubKey , data64 , timestamp , ttl ) => {
const difficulty = window . storage . get ( 'PoWDifficulty' , null ) ;
// Nonce is returned as a base64 string to include in header
window . Whisper . events . trigger ( 'calculatingPoW' , messageEventData ) ;
return callWorker ( 'calcPoW' , timestamp , ttl , pubKey , data64 , difficulty ) ;
} ;
// we don't throw or catch here
// mark private (_ prefix) since no error handling is done here...
async function _retrieveNextMessages ( nodeData , pubkey ) {
const params = {
pubKey : pubkey ,
lastHash : nodeData . lastHash || '' ,
} ;
const options = {
timeout : 40000 ,
ourPubKey : pubkey ,
} ;
// let exceptions bubble up
const result = await lokiRpc (
` https:// ${ nodeData . ip } ` ,
nodeData . port ,
'retrieve' ,
params ,
options ,
'/storage_rpc/v1' ,
nodeData
) ;
if ( result === false ) {
// make a note of it because of caller doesn't care...
log . warn (
` loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${ nodeData . ip } : ${ nodeData . port } `
) ;
return [ ] ;
}
return result . messages || [ ] ;
}
class LokiMessageAPI {
constructor ( ourKey ) {
this . jobQueue = new window . JobQueue ( ) ;
this . sendingData = { } ;
this . ourKey = ourKey ;
// stop polling for a group if its id is no longer found here
this . groupIdsToPoll = { } ;
}
async sendMessage ( pubKey , data , messageTimeStamp , ttl , options = { } ) {
const {
isPublic = false ,
numConnections = DEFAULT _CONNECTIONS ,
publicSendData = null ,
} = options ;
// Data required to identify a message in a conversation
const messageEventData = {
pubKey ,
timestamp : messageTimeStamp ,
} ;
if ( isPublic ) {
if ( ! publicSendData ) {
throw new window . textsecure . PublicChatError (
'Missing public send data for public chat message'
) ;
}
const res = await publicSendData . sendMessage ( data , messageTimeStamp ) ;
if ( res === false ) {
throw new window . textsecure . PublicChatError (
'Failed to send public chat message'
) ;
}
messageEventData . serverId = res ;
window . Whisper . events . trigger ( 'publicMessageSent' , messageEventData ) ;
return ;
}
const data64 = dcodeIO . ByteBuffer . wrap ( data ) . toString ( 'base64' ) ;
const timestamp = Date . now ( ) ;
const nonce = await calcNonce (
messageEventData ,
window . getStoragePubKey ( pubKey ) ,
data64 ,
timestamp ,
ttl
) ;
// Using timestamp as a unique identifier
const swarm = await lokiSnodeAPI . getSwarmNodesForPubKey ( pubKey ) ;
this . sendingData [ timestamp ] = {
swarm ,
hasFreshList : false ,
} ;
if ( this . sendingData [ timestamp ] . swarm . length < numConnections ) {
await this . refreshSendingSwarm ( pubKey , timestamp ) ;
}
// send parameters
const params = {
pubKey ,
ttl : ttl . toString ( ) ,
nonce ,
timestamp : timestamp . toString ( ) ,
data : data64 ,
} ;
const promises = [ ] ;
let completedConnections = 0 ;
for ( let i = 0 ; i < numConnections ; i += 1 ) {
const connectionPromise = this . _openSendConnection ( params ) . finally ( ( ) => {
completedConnections += 1 ;
if ( completedConnections >= numConnections ) {
delete this . sendingData [ timestamp ] ;
}
} ) ;
promises . push ( connectionPromise ) ;
}
let snode ;
try {
// eslint-disable-next-line more/no-then
snode = await primitives . firstTrue ( promises ) ;
} catch ( e ) {
log . warn (
` loki_message:::sendMessage - ${ e . code } ${ e . message } to ${ pubKey } via ${ snode . ip } : ${ snode . port } `
) ;
if ( e instanceof textsecure . WrongDifficultyError ) {
// Force nonce recalculation
// NOTE: Currently if there are snodes with conflicting difficulties we
// will send the message twice (or more). Won't affect client side but snodes
// could store the same message multiple times because they will have different
// timestamps (and therefore nonces)
await this . sendMessage ( pubKey , data , messageTimeStamp , ttl , options ) ;
return ;
}
throw e ;
}
if ( ! snode ) {
throw new window . textsecure . EmptySwarmError (
pubKey ,
'Ran out of swarm nodes to query'
) ;
}
log . info (
` loki_message:::sendMessage - Successfully stored message to ${ pubKey } via ${ snode . ip } : ${ snode . port } `
) ;
}
async refreshSendingSwarm ( pubKey , timestamp ) {
const freshNodes = await lokiSnodeAPI . refreshSwarmNodesForPubKey ( pubKey ) ;
this . sendingData [ timestamp ] . swarm = freshNodes ;
this . sendingData [ timestamp ] . hasFreshList = true ;
return true ;
}
async _openSendConnection ( params ) {
// timestamp is likely the current second...
while ( ! _ . isEmpty ( this . sendingData [ params . timestamp ] . swarm ) ) {
const snode = this . sendingData [ params . timestamp ] . swarm . shift ( ) ;
// TODO: Revert back to using snode address instead of IP
const successfulSend = await this . _sendToNode ( snode , params ) ;
if ( successfulSend ) {
return snode ;
}
// should we mark snode as bad if it can't store our message?
}
if ( ! this . sendingData [ params . timestamp ] . hasFreshList ) {
// Ensure that there is only a single refresh per outgoing message
if ( ! this . sendingData [ params . timestamp ] . refreshPromise ) {
this . sendingData [
params . timestamp
] . refreshPromise = this . refreshSendingSwarm (
params . pubKey ,
params . timestamp
) ;
}
await this . sendingData [ params . timestamp ] . refreshPromise ;
// Retry with a fresh list again
return this . _openSendConnection ( params ) ;
}
return false ;
}
async _sendToNode ( targetNode , params ) {
let successiveFailures = 0 ;
while ( successiveFailures < MAX _ACCEPTABLE _FAILURES ) {
// the higher this is, the longer the user delay is
// we don't want to burn through all our retries quickly
// we need to give the node a chance to heal
// also failed the user quickly, just means they pound the retry faster
// this favors a lot more retries and lower delays
// but that may chew up the bandwidth...
await primitives . sleepFor ( successiveFailures * 500 ) ;
try {
const result = await lokiRpc (
` https:// ${ targetNode . ip } ` ,
targetNode . port ,
'store' ,
params ,
{ } ,
'/storage_rpc/v1' ,
targetNode
) ;
// succcessful messages should look like
// `{\"difficulty\":1}`
// but so does invalid pow, so be careful!
// do not return true if we get false here...
if ( result === false ) {
// this means the node we asked for is likely down
log . warn (
` loki_message:::_sendToNode - Try # ${ successiveFailures } / ${ MAX _ACCEPTABLE _FAILURES } ${ targetNode . ip } : ${ targetNode . port } failed `
) ;
successiveFailures += 1 ;
// eslint-disable-next-line no-continue
continue ;
}
// Make sure we aren't doing too much PoW
const currentDifficulty = window . storage . get ( 'PoWDifficulty' , null ) ;
if (
result &&
result . difficulty &&
result . difficulty !== currentDifficulty
) {
window . storage . put ( 'PoWDifficulty' , result . difficulty ) ;
// should we return false?
}
return true ;
} catch ( e ) {
log . warn (
'loki_message:::_sendToNode - send error:' ,
e . code ,
e . message ,
` destination ${ targetNode . ip } : ${ targetNode . port } `
) ;
if ( e instanceof textsecure . WrongSwarmError ) {
const { newSwarm } = e ;
await lokiSnodeAPI . updateSwarmNodes ( params . pubKey , newSwarm ) ;
this . sendingData [ params . timestamp ] . swarm = newSwarm ;
this . sendingData [ params . timestamp ] . hasFreshList = true ;
return false ;
} else if ( e instanceof textsecure . WrongDifficultyError ) {
const { newDifficulty } = e ;
if ( ! Number . isNaN ( newDifficulty ) ) {
window . storage . put ( 'PoWDifficulty' , newDifficulty ) ;
}
throw e ;
} else if ( e instanceof textsecure . NotFoundError ) {
// TODO: Handle resolution error
} else if ( e instanceof textsecure . TimestampError ) {
log . warn ( 'loki_message:::_sendToNode - Timestamp is invalid' ) ;
throw e ;
} else if ( e instanceof textsecure . HTTPError ) {
// TODO: Handle working connection but error response
const body = await e . response . text ( ) ;
log . warn ( 'loki_message:::_sendToNode - HTTPError body:' , body ) ;
}
successiveFailures += 1 ;
}
}
const remainingSwarmSnodes = await lokiSnodeAPI . unreachableNode (
params . pubKey ,
targetNode
) ;
log . error (
` loki_message:::_sendToNode - Too many successive failures trying to send to node ${ targetNode . ip } : ${ targetNode . port } , ${ remainingSwarmSnodes . lengt } remaining swarm nodes `
) ;
return false ;
}
async pollNodeForGroupId ( groupId , nodeParam , onMessages ) {
const node = nodeParam ;
const lastHash = await window . Signal . Data . getLastHashBySnode (
groupId ,
node . address
) ;
node . lastHash = lastHash ;
log . debug (
` [last hash] lashHash for group id ${ groupId . substr ( 0 , 5 ) } : node ${
node . port
} ` ,
lastHash
) ;
// eslint-disable-next-line no-constant-condition
while ( this . groupIdsToPoll [ groupId ] ) {
try {
let messages = await _retrieveNextMessages ( node , groupId ) ;
if ( messages . length > 0 ) {
const lastMessage = _ . last ( messages ) ;
// TODO: this is for groups, so need to specify ID
// Is this too early to update last hash??
await lokiSnodeAPI . updateLastHash (
groupId ,
node . address ,
lastMessage . hash ,
lastMessage . expiration
) ;
log . debug (
` Updated lashHash for group id ${ groupId . substr ( 0 , 5 ) } : node ${
node . port
} ` ,
lastMessage . hash
) ;
node . lastHash = lastMessage . hash ;
messages = await this . jobQueue . add ( ( ) =>
filterIncomingMessages ( messages )
) ;
// At this point we still know what servier identity the messages
// are associated with, so we save it in messages' conversationId field:
const modifiedMessages = messages . map ( m => {
// eslint-disable-next-line no-param-reassign
m . conversationId = groupId ;
return m ;
} ) ;
onMessages ( modifiedMessages ) ;
}
} catch ( e ) {
log . warn ( '' ) ;
log . warn (
'pollNodeForGroupId - retrieve error:' ,
e . code ,
e . message ,
` on ${ node . ip } : ${ node . port } `
) ;
// TODO: Handle unreachable nodes and wrong swarms
}
await primitives . sleepFor ( 5000 ) ;
}
}
async pollForGroupId ( groupId , onMessages ) {
log . info ( ` Starting to poll for group id: ${ groupId } ` ) ;
if ( this . groupIdsToPoll [ groupId ] ) {
log . warn ( ` Already polling for group id: ${ groupId } ` ) ;
return ;
}
this . groupIdsToPoll [ groupId ] = true ;
// Get nodes for groupId
const nodes = await lokiSnodeAPI . refreshSwarmNodesForPubKey ( groupId ) ;
log . info ( 'Nodes for group id:' , nodes ) ;
_ . sampleSize ( nodes , 3 ) . forEach ( node =>
this . pollNodeForGroupId ( groupId , node , onMessages )
) ;
}
async stopPollingForGroup ( groupId ) {
if ( ! this . groupIdsToPoll [ groupId ] ) {
log . warn ( ` Already not polling for group id: ${ groupId } ` ) ;
return ;
}
log . warn ( ` Stop polling for group id: ${ groupId } ` ) ;
delete this . groupIdsToPoll [ groupId ] ;
}
async _openRetrieveConnection ( pSwarmPool , stopPollingPromise , onMessages ) {
const swarmPool = pSwarmPool ; // lint
let stopPollingResult = false ;
// When message_receiver restarts from onoffline/ononline events it closes
// http-resources, which will then resolve the stopPollingPromise with true. We then
// want to cancel these polling connections because new ones will be created
// eslint-disable-next-line more/no-then
stopPollingPromise . then ( result => {
stopPollingResult = result ;
} ) ;
while ( ! stopPollingResult && ! _ . isEmpty ( swarmPool ) ) {
const address = Object . keys ( swarmPool ) [ 0 ] ; // X.snode hostname
const nodeData = swarmPool [ address ] ;
delete swarmPool [ address ] ;
let successiveFailures = 0 ;
while (
! stopPollingResult &&
successiveFailures < MAX _ACCEPTABLE _FAILURES
) {
// TODO: Revert back to using snode address instead of IP
try {
// in general, I think we want exceptions to bubble up
// so the user facing UI can report unhandled errors
// except in this case of living inside http-resource pollServer
// because it just restarts more connections...
let messages = await _retrieveNextMessages ( nodeData , this . ourKey ) ;
// this only tracks retrieval failures
// won't include parsing failures...
successiveFailures = 0 ;
if ( messages . length ) {
const lastMessage = _ . last ( messages ) ;
nodeData . lastHash = lastMessage . hash ;
await lokiSnodeAPI . updateLastHash (
this . ourKey ,
address ,
lastMessage . hash ,
lastMessage . expiration
) ;
messages = await this . jobQueue . add ( ( ) =>
filterIncomingMessages ( messages )
) ;
}
// Execute callback even with empty array to signal online status
onMessages ( messages ) ;
} catch ( e ) {
log . warn (
'loki_message:::_openRetrieveConnection - retrieve error:' ,
e . code ,
e . message ,
` on ${ nodeData . ip } : ${ nodeData . port } `
) ;
if ( e instanceof textsecure . WrongSwarmError ) {
const { newSwarm } = e ;
// Is this a security concern that we replace the list of snodes
// based on a response from a single snode?
await lokiSnodeAPI . updateSwarmNodes ( this . ourKey , newSwarm ) ;
// FIXME: restart all openRetrieves when this happens...
// FIXME: lokiSnode should handle this
for ( let i = 0 ; i < newSwarm . length ; i += 1 ) {
const lastHash = await window . Signal . Data . getLastHashBySnode (
this . ourKey ,
newSwarm [ i ]
) ;
swarmPool [ newSwarm [ i ] ] = {
lastHash ,
} ;
}
// Try another snode
break ;
} else if ( e instanceof textsecure . NotFoundError ) {
// DNS/Lokinet error, needs to bubble up
throw new window . textsecure . DNSResolutionError (
'Retrieving messages'
) ;
}
successiveFailures += 1 ;
}
// Always wait a bit as we are no longer long-polling
await primitives . sleepFor ( Math . max ( successiveFailures , 2 ) * 1000 ) ;
}
if ( successiveFailures >= MAX _ACCEPTABLE _FAILURES ) {
const remainingSwarmSnodes = await lokiSnodeAPI . unreachableNode (
this . ourKey ,
nodeData
) ;
log . warn (
` loki_message:::_openRetrieveConnection - too many successive failures, removing ${
nodeData . ip
} : $ { nodeData . port } from our swarm pool . We have $ {
Object . keys ( swarmPool ) . length
} usable swarm nodes left for our connection ( $ {
remainingSwarmSnodes . length
} in local db ) `
) ;
}
}
// if not stopPollingResult
if ( _ . isEmpty ( swarmPool ) ) {
log . error (
'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection'
) ;
return false ;
}
return true ;
}
// we don't throw or catch here
async startLongPolling ( numConnections , stopPolling , onMessages ) {
// load from local DB
let nodes = await lokiSnodeAPI . getSwarmNodesForPubKey ( this . ourKey , {
fetchHashes : true ,
} ) ;
// Start polling for medium size groups as well (they might be in different swarms)
{
const convos = window . getConversations ( ) . filter ( c => c . isMediumGroup ( ) ) ;
const self = this ;
convos . forEach ( c => {
self . pollForGroupId ( c . id , onMessages ) ;
// TODO: unsubscribe if the group is deleted
} ) ;
}
if ( nodes . length < numConnections ) {
log . warn (
'loki_message:::startLongPolling - Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
) ;
// load from blockchain
nodes = await lokiSnodeAPI . refreshSwarmNodesForPubKey ( this . ourKey ) ;
if ( nodes . length < numConnections ) {
log . error (
'loki_message:::startLongPolling - Could not get enough SwarmNodes for our pubkey from blockchain'
) ;
}
}
log . info (
'loki_message:::startLongPolling - start polling for' ,
numConnections ,
'connections. We have swarmNodes' ,
nodes . length ,
'for' ,
this . ourKey
) ;
// ok now split up our swarm pool into numConnections number of pools
// one for each retrieve connection
// floor or ceil probably doesn't matter, since it's likely always uneven
const poolSize = Math . floor ( nodes . length / numConnections ) ;
const pools = [ ] ;
while ( nodes . length ) {
const poolList = nodes . splice ( 0 , poolSize ) ;
const byAddressObj = poolList . reduce ( ( result , node ) => {
// eslint-disable-next-line no-param-reassign
result [ node . address ] = node ;
return result ;
} , { } ) ;
pools . push ( byAddressObj ) ;
}
const promises = [ ] ;
let unresolved = numConnections ;
for ( let i = 0 ; i < numConnections ; i += 1 ) {
promises . push (
// eslint-disable-next-line more/no-then
this . _openRetrieveConnection ( pools [ i ] , stopPolling , onMessages ) . then (
stoppedPolling => {
unresolved -= 1 ;
log . info (
` loki_message:::startLongPolling - There are ${ unresolved } ` ,
` open retrieve connections left. Stopped? ${ stoppedPolling } `
) ;
}
)
) ;
}
// blocks until numConnections snodes in our swarms have been removed from the list
// less than numConnections being active is fine, only need to restart if none per Niels 20/02/13
// or if there is network issues (ENOUTFOUND due to lokinet)
await Promise . all ( promises ) ;
log . warn (
'loki_message:::startLongPolling - All our long poll swarm connections have been removed'
) ;
// should we just call ourself again?
// no, our caller already handles this...
}
}
module . exports = LokiMessageAPI ;