Merge branch 'clearnet' of https://github.com/loki-project/session-desktop into clearnet

pull/1002/head
Vincent 5 years ago
commit 6d9bcc3079

@ -1958,6 +1958,12 @@
"relink": { "relink": {
"message": "Relink" "message": "Relink"
}, },
"autoUpdateSettingTitle": {
"message": "Auto Update"
},
"autoUpdateSettingDescription": {
"message": "Automatically check for updates on launch"
},
"autoUpdateNewVersionTitle": { "autoUpdateNewVersionTitle": {
"message": "Session update available" "message": "Session update available"
}, },

@ -0,0 +1,15 @@
export interface BaseConfig {
set(keyPath: string, value: any): void;
get(keyPath: string): any | undefined;
remove(): void;
}
interface Options {
allowMalformedOnStartup: boolean;
}
export function start(
name: string,
targetPath: string,
options: Options
): BaseConfig;

@ -0,0 +1,3 @@
import { BaseConfig } from './base_config';
type UserConfig = BaseConfig;

@ -709,12 +709,10 @@
} }
// lets not allow ANY URLs, lets force it to be local to public chat server // lets not allow ANY URLs, lets force it to be local to public chat server
const relativeFileUrl = fileObj.url.replace( const url = new URL(fileObj.url);
API.serverAPI.baseServerUrl,
''
);
// write it to the channel // write it to the channel
await API.setChannelAvatar(relativeFileUrl); await API.setChannelAvatar(url.pathname);
} }
if (await API.setChannelName(groupName)) { if (await API.setChannelName(groupName)) {
@ -1946,6 +1944,10 @@
}) { }) {
return async event => { return async event => {
const { data, confirm } = event; const { data, confirm } = event;
if (!data) {
window.log.warn('Invalid data passed to createMessageHandler.', event);
return confirm();
}
const messageDescriptor = getMessageDescriptor(data); const messageDescriptor = getMessageDescriptor(data);
@ -1968,36 +1970,38 @@
messageDescriptor.id messageDescriptor.id
); );
let message; let message;
if ( const { source } = data;
// Note: This only works currently because we have a 1 device limit
// When we change that, the check below needs to change too
const ourNumber = textsecure.storage.user.getNumber();
const primaryDevice = window.storage.get('primaryDevicePubKey');
const isOurDevice =
source && (source === ourNumber || source === primaryDevice);
const isPublicChatMessage =
messageDescriptor.type === 'group' && messageDescriptor.type === 'group' &&
descriptorId.match(/^publicChat:/) descriptorId.match(/^publicChat:/);
) { if (isPublicChatMessage && isOurDevice) {
// Note: This only works currently because we have a 1 device limit // Public chat messages from ourselves should be outgoing
// When we change that, the check below needs to change too message = await createSentMessage(data);
const ourNumber = textsecure.storage.user.getNumber();
const primaryDevice = window.storage.get('primaryDevicePubKey');
const { source } = data;
if (source && (source === ourNumber || source === primaryDevice)) {
// Public chat messages from ourselves should be outgoing
message = await createSentMessage(data);
}
} else { } else {
message = await createMessage(data); message = await createMessage(data);
} }
const isDuplicate = await isMessageDuplicate(message); const isDuplicate = await isMessageDuplicate(message);
if (isDuplicate) { if (isDuplicate) {
// RSS expects duplciates, so squelch log // RSS expects duplicates, so squelch log
if (!descriptorId.match(/^rss:/)) { if (!descriptorId.match(/^rss:/)) {
window.log.warn('Received duplicate message', message.idForLogging()); window.log.warn('Received duplicate message', message.idForLogging());
} }
return event.confirm(); return confirm();
} }
await ConversationController.getOrCreateAndWait( await ConversationController.getOrCreateAndWait(
messageDescriptor.id, messageDescriptor.id,
messageDescriptor.type messageDescriptor.type
); );
return message.handleDataMessage(data.message, event.confirm, { return message.handleDataMessage(data.message, confirm, {
initialLoadComplete, initialLoadComplete,
}); });
}; };

@ -330,9 +330,13 @@
resetMessageSelection() { resetMessageSelection() {
this.selectedMessages.clear(); this.selectedMessages.clear();
this.messageCollection.forEach(m => { this.messageCollection.forEach(m => {
// eslint-disable-next-line no-param-reassign // on change for ALL messages without real changes is a really costly operation
m.selected = false; // -> cause refresh of the whole conversation view even if not a single message was selected
m.trigger('change'); if (m.selected) {
// eslint-disable-next-line no-param-reassign
m.selected = false;
m.trigger('change');
}
}); });
this.trigger('message-selection-changed'); this.trigger('message-selection-changed');

@ -14,6 +14,13 @@ const PUBLICCHAT_DELETION_POLL_EVERY = 5 * 1000; // 5s
const PUBLICCHAT_MOD_POLL_EVERY = 30 * 1000; // 30s const PUBLICCHAT_MOD_POLL_EVERY = 30 * 1000; // 30s
const PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES = 10 * 1000; // 10s const PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES = 10 * 1000; // 10s
const FILESERVER_HOSTS = [
'file-dev.lokinet.org',
'file.lokinet.org',
'file-dev.getsession.org',
'file.getsession.org',
];
const HOMESERVER_USER_ANNOTATION_TYPE = 'network.loki.messenger.homeserver'; const HOMESERVER_USER_ANNOTATION_TYPE = 'network.loki.messenger.homeserver';
const AVATAR_USER_ANNOTATION_TYPE = 'network.loki.messenger.avatar'; const AVATAR_USER_ANNOTATION_TYPE = 'network.loki.messenger.avatar';
const SETTINGS_CHANNEL_ANNOTATION_TYPE = 'net.patter-app.settings'; const SETTINGS_CHANNEL_ANNOTATION_TYPE = 'net.patter-app.settings';
@ -25,6 +32,288 @@ const snodeHttpsAgent = new https.Agent({
rejectUnauthorized: false, rejectUnauthorized: false,
}); });
const sendToProxy = async (
srvPubKey,
endpoint,
pFetchOptions,
options = {}
) => {
if (!srvPubKey) {
log.error(
'loki_app_dot_net: sendToProxy called without a server public key'
);
return {};
}
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`;
const fetchOptions = pFetchOptions; // make lint happy
// safety issue with file server, just safer to have this
if (fetchOptions.headers === undefined) {
fetchOptions.headers = {};
}
const payloadObj = {
body: fetchOptions.body, // might need to b64 if binary...
endpoint,
method: fetchOptions.method,
headers: fetchOptions.headers,
};
// from https://github.com/sindresorhus/is-stream/blob/master/index.js
if (
payloadObj.body &&
typeof payloadObj.body === 'object' &&
typeof payloadObj.body.pipe === 'function'
) {
const fData = payloadObj.body.getBuffer();
const fHeaders = payloadObj.body.getHeaders();
// update headers for boundary
payloadObj.headers = { ...payloadObj.headers, ...fHeaders };
// update body with base64 chunk
payloadObj.body = {
fileUpload: fData.toString('base64'),
};
}
// convert our payload to binary buffer
const payloadData = Buffer.from(
dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer()
);
payloadObj.body = false; // free memory
// make temporary key for this request/response
const ephemeralKey = libsignal.Curve.generateKeyPair();
// mix server pub key with our priv key
const symKey = libsignal.Curve.calculateAgreement(
srvPubKey, // server's pubkey
ephemeralKey.privKey // our privkey
);
const ivAndCiphertext = await libloki.crypto.DHEncrypt(symKey, payloadData);
// convert final buffer to base64
const cipherText64 = dcodeIO.ByteBuffer.wrap(ivAndCiphertext).toString(
'base64'
);
const ephemeralPubKey64 = dcodeIO.ByteBuffer.wrap(
ephemeralKey.pubKey
).toString('base64');
const finalRequestHeader = {
'X-Loki-File-Server-Ephemeral-Key': ephemeralPubKey64,
};
const firstHopOptions = {
method: 'POST',
// not sure why I can't use anything but json...
// text/plain would be preferred...
body: JSON.stringify({ cipherText64 }),
headers: {
'Content-Type': 'application/json',
'X-Loki-File-Server-Target': '/loki/v1/secure_rpc',
'X-Loki-File-Server-Verb': 'POST',
'X-Loki-File-Server-Headers': JSON.stringify(finalRequestHeader),
},
// we are talking to a snode...
agent: snodeHttpsAgent,
};
// weird this doesn't need NODE_TLS_REJECT_UNAUTHORIZED = '0'
const result = await nodeFetch(url, firstHopOptions);
const txtResponse = await result.text();
if (txtResponse.match(/^Service node is not ready: not in any swarm/i)) {
// mark snode bad
const randomPoolRemainingCount = lokiSnodeAPI.markRandomNodeUnreachable(
randSnode
);
log.warn(
`loki_app_dot_net: Marking random snode bad, internet address ${
randSnode.ip
}:${
randSnode.port
}. ${randomPoolRemainingCount} snodes remaining in randomPool`
);
// retry (hopefully with new snode)
// FIXME: max number of retries...
return sendToProxy(srvPubKey, endpoint, fetchOptions);
}
let response = {};
try {
response = JSON.parse(txtResponse);
} catch (e) {
log.warn(
`loki_app_dot_net: sendToProxy Could not parse outer JSON [${txtResponse}]`,
endpoint,
'on',
url
);
}
if (response.meta && response.meta.code === 200) {
// convert base64 in response to binary
const ivAndCiphertextResponse = dcodeIO.ByteBuffer.wrap(
response.data,
'base64'
).toArrayBuffer();
const decrypted = await libloki.crypto.DHDecrypt(
symKey,
ivAndCiphertextResponse
);
const textDecoder = new TextDecoder();
const respStr = textDecoder.decode(decrypted);
// replace response
try {
response = options.textResponse ? respStr : JSON.parse(respStr);
} catch (e) {
log.warn(
`loki_app_dot_net: sendToProxy Could not parse inner JSON [${respStr}]`,
endpoint,
'on',
url
);
}
} else {
log.warn(
'loki_app_dot_net: file server secure_rpc gave an non-200 response: ',
response,
` txtResponse[${txtResponse}]`,
endpoint
);
}
return { result, txtResponse, response };
};
const serverRequest = async (endpoint, options = {}) => {
const {
params = {},
method,
rawBody,
objBody,
token,
srvPubKey,
forceFreshToken = false,
} = options;
const url = new URL(endpoint);
if (params) {
url.search = new URLSearchParams(params);
}
const fetchOptions = {};
const headers = {};
try {
if (token) {
headers.Authorization = `Bearer ${token}`;
}
if (method) {
fetchOptions.method = method;
}
if (objBody) {
headers['Content-Type'] = 'application/json';
fetchOptions.body = JSON.stringify(objBody);
} else if (rawBody) {
fetchOptions.body = rawBody;
}
fetchOptions.headers = headers;
// domain ends in .loki
if (url.host.match(/\.loki$/i)) {
fetchOptions.agent = snodeHttpsAgent;
}
} catch (e) {
log.info('serverRequest set up error:', e.code, e.message);
return {
err: e,
};
}
let response;
let result;
let txtResponse;
let mode = 'nodeFetch';
try {
const host = url.host.toLowerCase();
// log.info('host', host, FILESERVER_HOSTS);
if (
window.lokiFeatureFlags.useSnodeProxy &&
FILESERVER_HOSTS.includes(host)
) {
mode = 'sendToProxy';
// strip trailing slash
const search = url.search ? `?${url.search}` : '';
const endpointWithQS = `${url.pathname}${search}`.replace(/^\//, '');
// log.info('endpointWithQS', endpointWithQS)
({ response, txtResponse, result } = await sendToProxy(
srvPubKey,
endpointWithQS,
fetchOptions,
options
));
} else {
// disable check for .loki
process.env.NODE_TLS_REJECT_UNAUTHORIZED = url.host.match(/\.loki$/i)
? '0'
: '1';
result = await nodeFetch(url, fetchOptions);
// always make sure this check is enabled
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
txtResponse = await result.text();
// cloudflare timeouts (504s) will be html...
response = options.textResponse ? txtResponse : JSON.parse(txtResponse);
}
} catch (e) {
if (txtResponse) {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
`json: ${txtResponse}`,
'attempting connection to',
url
);
} else {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
'attempting connection to',
url
);
}
if (mode === '_sendToProxy') {
// if we can detect, certain types of failures, we can retry...
if (e.code === 'ECONNRESET') {
// retry with counter?
}
}
return {
err: e,
};
}
// if it's a response style with a meta
if (result.status !== 200) {
if (!forceFreshToken && (!response.meta || response.meta.code === 401)) {
// retry with forcing a fresh token
return this.serverRequest(endpoint, {
...options,
forceFreshToken: true,
});
}
return {
err: 'statusCode',
statusCode: result.status,
response,
};
}
return {
statusCode: result.status,
response,
};
};
// the core ADN class that handles all communication with a specific server // the core ADN class that handles all communication with a specific server
class LokiAppDotNetServerAPI { class LokiAppDotNetServerAPI {
constructor(ourKey, url) { constructor(ourKey, url) {
@ -394,276 +683,21 @@ class LokiAppDotNetServerAPI {
if (urlStr.match(/\.loki\//)) { if (urlStr.match(/\.loki\//)) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
} }
const result = await nodeFetch(urlObj, fetchOptions, options); const result = nodeFetch(urlObj, fetchOptions, options);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
return result; return result;
} }
async _sendToProxy(endpoint, pFetchOptions, options = {}) {
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`;
const fetchOptions = pFetchOptions; // make lint happy
// safety issue with file server, just safer to have this
if (fetchOptions.headers === undefined) {
fetchOptions.headers = {};
}
const payloadObj = {
body: fetchOptions.body, // might need to b64 if binary...
endpoint,
method: fetchOptions.method,
headers: fetchOptions.headers,
};
// from https://github.com/sindresorhus/is-stream/blob/master/index.js
if (
payloadObj.body &&
typeof payloadObj.body === 'object' &&
typeof payloadObj.body.pipe === 'function'
) {
log.info('detected body is a stream');
const fData = payloadObj.body.getBuffer();
const fHeaders = payloadObj.body.getHeaders();
// update headers for boundary
payloadObj.headers = { ...payloadObj.headers, ...fHeaders };
// update body with base64 chunk
payloadObj.body = {
fileUpload: fData.toString('base64'),
};
}
// convert our payload to binary buffer
const payloadData = Buffer.from(
dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer()
);
payloadObj.body = false; // free memory
// make temporary key for this request/response
const ephemeralKey = libsignal.Curve.generateKeyPair();
// mix server pub key with our priv key
const symKey = libsignal.Curve.calculateAgreement(
this.pubKey, // server's pubkey
ephemeralKey.privKey // our privkey
);
const ivAndCiphertext = await libloki.crypto.DHEncrypt(symKey, payloadData);
// convert final buffer to base64
const cipherText64 = dcodeIO.ByteBuffer.wrap(ivAndCiphertext).toString(
'base64'
);
const ephemeralPubKey64 = dcodeIO.ByteBuffer.wrap(
ephemeralKey.pubKey
).toString('base64');
const finalRequestHeader = {
'X-Loki-File-Server-Ephemeral-Key': ephemeralPubKey64,
};
const firstHopOptions = {
method: 'POST',
// not sure why I can't use anything but json...
// text/plain would be preferred...
body: JSON.stringify({ cipherText64 }),
headers: {
'Content-Type': 'application/json',
'X-Loki-File-Server-Target': '/loki/v1/secure_rpc',
'X-Loki-File-Server-Verb': 'POST',
'X-Loki-File-Server-Headers': JSON.stringify(finalRequestHeader),
},
// we are talking to a snode...
agent: snodeHttpsAgent,
};
// weird this doesn't need NODE_TLS_REJECT_UNAUTHORIZED = 0
const result = await nodeFetch(url, firstHopOptions);
const txtResponse = await result.text();
if (txtResponse.match(/^Service node is not ready: not in any swarm/i)) {
// mark snode bad
log.warn(
`Marking random snode bad, internet address ${randSnode.ip}:${
randSnode.port
}`
);
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
// retry (hopefully with new snode)
// FIXME: max number of retries...
return this._sendToProxy(endpoint, fetchOptions);
}
let response = {};
try {
response = JSON.parse(txtResponse);
} catch (e) {
log.warn(
`_sendToProxy Could not parse outer JSON [${txtResponse}]`,
endpoint
);
}
if (response.meta && response.meta.code === 200) {
// convert base64 in response to binary
const ivAndCiphertextResponse = dcodeIO.ByteBuffer.wrap(
response.data,
'base64'
).toArrayBuffer();
const decrypted = await libloki.crypto.DHDecrypt(
symKey,
ivAndCiphertextResponse
);
const textDecoder = new TextDecoder();
const respStr = textDecoder.decode(decrypted);
// replace response
try {
response = options.textResponse ? respStr : JSON.parse(respStr);
} catch (e) {
log.warn(
`_sendToProxy Could not parse inner JSON [${respStr}]`,
endpoint
);
}
} else {
log.warn(
'file server secure_rpc gave an non-200 response: ',
response,
` txtResponse[${txtResponse}]`,
endpoint
);
}
return { result, txtResponse, response };
}
// make a request to the server // make a request to the server
async serverRequest(endpoint, options = {}) { async serverRequest(endpoint, options = {}) {
const { if (options.forceFreshToken) {
params = {}, await this.getOrRefreshServerToken(true);
method,
rawBody,
objBody,
forceFreshToken = false,
} = options;
const url = new URL(`${this.baseServerUrl}/${endpoint}`);
if (params) {
url.search = new URLSearchParams(params);
}
const fetchOptions = {};
const headers = {};
try {
if (forceFreshToken) {
await this.getOrRefreshServerToken(true);
}
if (this.token) {
headers.Authorization = `Bearer ${this.token}`;
}
if (method) {
fetchOptions.method = method;
}
if (objBody) {
headers['Content-Type'] = 'application/json';
fetchOptions.body = JSON.stringify(objBody);
} else if (rawBody) {
fetchOptions.body = rawBody;
}
fetchOptions.headers = headers;
// domain ends in .loki
if (url.toString().match(/\.loki\//)) {
fetchOptions.agent = snodeHttpsAgent;
}
} catch (e) {
log.info('serverRequest set up error:', e.code, e.message);
return {
err: e,
};
} }
return serverRequest(`${this.baseServerUrl}/${endpoint}`, {
let response; ...options,
let result; token: this.token,
let txtResponse; srvPubKey: this.pubKey,
let mode = 'nodeFetch'; });
try {
if (
window.lokiFeatureFlags.useSnodeProxy &&
(this.baseServerUrl === 'https://file-dev.lokinet.org' ||
this.baseServerUrl === 'https://file.lokinet.org' ||
this.baseServerUrl === 'https://file-dev.getsession.org' ||
this.baseServerUrl === 'https://file.getsession.org')
) {
mode = '_sendToProxy';
const endpointWithQS = url
.toString()
.replace(`${this.baseServerUrl}/`, '');
({ response, txtResponse, result } = await this._sendToProxy(
endpointWithQS,
fetchOptions,
options
));
} else {
// disable check for .loki
if (url.toString().match(/\.loki/)) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
}
result = await nodeFetch(url, fetchOptions);
// always make sure this check is enabled
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1;
txtResponse = await result.text();
// hrm cloudflare timeouts (504s) will be html...
response = options.textResponse ? txtResponse : JSON.parse(txtResponse);
}
} catch (e) {
if (txtResponse) {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
`json: ${txtResponse}`,
'attempting connection to',
url
);
} else {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
'attempting connection to',
url
);
}
if (mode === '_sendToProxy') {
// if we can detect, certain types of failures, we can retry...
if (e.code === 'ECONNRESET') {
// retry with counter?
}
}
return {
err: e,
};
}
// if it's a response style with a meta
if (result.status !== 200) {
if (!forceFreshToken && (!response.meta || response.meta.code === 401)) {
// copy options because lint complains if we modify this directly
const updatedOptions = options;
// force it this time
updatedOptions.forceFreshToken = true;
// retry with updated options
return this.serverRequest(endpoint, updatedOptions);
}
return {
err: 'statusCode',
statusCode: result.status,
response,
};
}
return {
statusCode: result.status,
response,
};
} }
async getUserAnnotations(pubKey) { async getUserAnnotations(pubKey) {
@ -946,6 +980,8 @@ class LokiPublicChannelAPI {
this.deleteLastId = 1; this.deleteLastId = 1;
this.timers = {}; this.timers = {};
this.myPrivateKey = false; this.myPrivateKey = false;
this.messagesPollLock = false;
// can escalated to SQL if it start uses too much memory // can escalated to SQL if it start uses too much memory
this.logMop = {}; this.logMop = {};
@ -1322,7 +1358,6 @@ class LokiPublicChannelAPI {
Conversation: Whisper.Conversation, Conversation: Whisper.Conversation,
} }
); );
await this.pollForChannelOnce();
} }
// get moderation actions // get moderation actions
@ -1523,6 +1558,20 @@ class LokiPublicChannelAPI {
} }
async pollOnceForMessages() { async pollOnceForMessages() {
if (this.messagesPollLock) {
// TODO: check if lock is stale
log.warn(
'pollOnceForModerators locked',
'on',
this.channelId,
'at',
this.serverAPI.baseServerUrl
);
return;
}
// disable locking system for now as it's not quite perfect yet
// this.messagesPollLock = Date.now();
const params = { const params = {
include_annotations: 1, include_annotations: 1,
include_user_annotations: 1, // to get the home server include_user_annotations: 1, // to get the home server
@ -1551,6 +1600,7 @@ class LokiPublicChannelAPI {
if (res.err) { if (res.err) {
log.error('pollOnceForMessages receive error', res.err); log.error('pollOnceForMessages receive error', res.err);
} }
this.messagesPollLock = false;
return; return;
} }
@ -1706,6 +1756,7 @@ class LokiPublicChannelAPI {
// do we really need this? // do we really need this?
if (!pendingMessages.length) { if (!pendingMessages.length) {
this.conversation.setLastRetrievedMessage(this.lastGot); this.conversation.setLastRetrievedMessage(this.lastGot);
this.messagesPollLock = false;
return; return;
} }
@ -1755,7 +1806,14 @@ class LokiPublicChannelAPI {
); );
sendNow.forEach(message => { sendNow.forEach(message => {
// send them out now // send them out now
log.info('emitting primary message', message.serverId); log.info(
'emitting primary message',
message.serverId,
'on',
this.channelId,
'at',
this.serverAPI.baseServerUrl
);
this.chatAPI.emit('publicMessage', { this.chatAPI.emit('publicMessage', {
message, message,
}); });
@ -1832,7 +1890,14 @@ class LokiPublicChannelAPI {
return; return;
} }
} }
log.info('emitting pending message', message.serverId); log.info(
'emitting pending message',
message.serverId,
'on',
this.channelId,
'at',
this.serverAPI.baseServerUrl
);
this.chatAPI.emit('publicMessage', { this.chatAPI.emit('publicMessage', {
message, message,
}); });
@ -1854,6 +1919,7 @@ class LokiPublicChannelAPI {
// finally update our position // finally update our position
this.conversation.setLastRetrievedMessage(this.lastGot); this.conversation.setLastRetrievedMessage(this.lastGot);
this.messagesPollLock = false;
} }
static getPreviewFromAnnotation(annotation) { static getPreviewFromAnnotation(annotation) {

@ -103,8 +103,11 @@ class LokiFileServerInstance {
if (!Array.isArray(authorisations)) { if (!Array.isArray(authorisations)) {
return; return;
} }
const validAuthorisations = authorisations.filter(
a => a && typeof auth === 'object'
);
await Promise.all( await Promise.all(
authorisations.map(async auth => { validAuthorisations.map(async auth => {
// only skip, if in secondary search mode // only skip, if in secondary search mode
if (isRequest && auth.secondaryDevicePubKey !== user.username) { if (isRequest && auth.secondaryDevicePubKey !== user.username) {
// this is not the authorization we're looking for // this is not the authorization we're looking for

@ -94,6 +94,7 @@ class LokiMessageAPI {
await this.refreshSendingSwarm(pubKey, timestamp); await this.refreshSendingSwarm(pubKey, timestamp);
} }
// send parameters
const params = { const params = {
pubKey, pubKey,
ttl: ttl.toString(), ttl: ttl.toString(),
@ -104,7 +105,7 @@ class LokiMessageAPI {
const promises = []; const promises = [];
let completedConnections = 0; let completedConnections = 0;
for (let i = 0; i < numConnections; i += 1) { for (let i = 0; i < numConnections; i += 1) {
const connectionPromise = this.openSendConnection(params).finally(() => { const connectionPromise = this._openSendConnection(params).finally(() => {
completedConnections += 1; completedConnections += 1;
if (completedConnections >= numConnections) { if (completedConnections >= numConnections) {
delete this.sendingData[timestamp]; delete this.sendingData[timestamp];
@ -151,7 +152,9 @@ class LokiMessageAPI {
'Ran out of swarm nodes to query' 'Ran out of swarm nodes to query'
); );
} }
log.info(`Successful storage message to ${pubKey}`); log.info(
`loki_message:::sendMessage - Successfully stored message to ${pubKey}`
);
} }
async refreshSendingSwarm(pubKey, timestamp) { async refreshSendingSwarm(pubKey, timestamp) {
@ -162,16 +165,11 @@ class LokiMessageAPI {
return true; return true;
} }
async openSendConnection(params) { async _openSendConnection(params) {
while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) { while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) {
const snode = this.sendingData[params.timestamp].swarm.shift(); const snode = this.sendingData[params.timestamp].swarm.shift();
// TODO: Revert back to using snode address instead of IP // TODO: Revert back to using snode address instead of IP
const successfulSend = await this.sendToNode( const successfulSend = await this._sendToNode(snode, params);
snode.ip,
snode.port,
snode,
params
);
if (successfulSend) { if (successfulSend) {
return true; return true;
} }
@ -189,19 +187,19 @@ class LokiMessageAPI {
} }
await this.sendingData[params.timestamp].refreshPromise; await this.sendingData[params.timestamp].refreshPromise;
// Retry with a fresh list again // Retry with a fresh list again
return this.openSendConnection(params); return this._openSendConnection(params);
} }
return false; return false;
} }
async sendToNode(address, port, targetNode, params) { async _sendToNode(targetNode, params) {
let successiveFailures = 0; let successiveFailures = 0;
while (successiveFailures < MAX_ACCEPTABLE_FAILURES) { while (successiveFailures < MAX_ACCEPTABLE_FAILURES) {
await sleepFor(successiveFailures * 500); await sleepFor(successiveFailures * 500);
try { try {
const result = await lokiRpc( const result = await lokiRpc(
`https://${address}`, `https://${targetNode.ip}`,
port, targetNode.port,
'store', 'store',
params, params,
{}, {},
@ -211,17 +209,21 @@ class LokiMessageAPI {
// Make sure we aren't doing too much PoW // Make sure we aren't doing too much PoW
const currentDifficulty = window.storage.get('PoWDifficulty', null); const currentDifficulty = window.storage.get('PoWDifficulty', null);
const newDifficulty = result.difficulty; if (
if (newDifficulty != null && newDifficulty !== currentDifficulty) { result &&
window.storage.put('PoWDifficulty', newDifficulty); result.difficulty &&
result.difficulty !== currentDifficulty
) {
window.storage.put('PoWDifficulty', result.difficulty);
// should we return false?
} }
return true; return true;
} catch (e) { } catch (e) {
log.warn( log.warn(
'Loki send message error:', 'loki_message:::_sendToNode - send error:',
e.code, e.code,
e.message, e.message,
`from ${address}` `destination ${targetNode.ip}:${targetNode.port}`
); );
if (e instanceof textsecure.WrongSwarmError) { if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e; const { newSwarm } = e;
@ -238,26 +240,35 @@ class LokiMessageAPI {
} else if (e instanceof textsecure.NotFoundError) { } else if (e instanceof textsecure.NotFoundError) {
// TODO: Handle resolution error // TODO: Handle resolution error
} else if (e instanceof textsecure.TimestampError) { } else if (e instanceof textsecure.TimestampError) {
log.warn('Timestamp is invalid'); log.warn('loki_message:::_sendToNode - Timestamp is invalid');
throw e; throw e;
} else if (e instanceof textsecure.HTTPError) { } else if (e instanceof textsecure.HTTPError) {
// TODO: Handle working connection but error response // TODO: Handle working connection but error response
const body = await e.response.text(); const body = await e.response.text();
log.warn('HTTPError body:', body); log.warn('loki_message:::_sendToNode - HTTPError body:', body);
} }
successiveFailures += 1; successiveFailures += 1;
} }
} }
log.error(`Failed to send to node: ${address}`); const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
await lokiSnodeAPI.unreachableNode(params.pubKey, address); params.pubKey,
targetNode
);
log.error(
`loki_message:::_sendToNode - Too many successive failures trying to send to node ${
targetNode.ip
}:${targetNode.port}, ${remainingSwarmSnodes.lengt} remaining swarm nodes`
);
return false; return false;
} }
async openRetrieveConnection(stopPollingPromise, callback) { async _openRetrieveConnection(stopPollingPromise, callback) {
let stopPollingResult = false; let stopPollingResult = false;
// When message_receiver restarts from onoffline/ononline events it closes // When message_receiver restarts from onoffline/ononline events it closes
// http-resources, which will then resolve the stopPollingPromise with true. We then // http-resources, which will then resolve the stopPollingPromise with true. We then
// want to cancel these polling connections because new ones will be created // want to cancel these polling connections because new ones will be created
// eslint-disable-next-line more/no-then // eslint-disable-next-line more/no-then
stopPollingPromise.then(result => { stopPollingPromise.then(result => {
stopPollingResult = result; stopPollingResult = result;
@ -274,9 +285,13 @@ class LokiMessageAPI {
) { ) {
await sleepFor(successiveFailures * 1000); await sleepFor(successiveFailures * 1000);
// TODO: Revert back to using snode address instead of IP
try { try {
// TODO: Revert back to using snode address instead of IP // in general, I think we want exceptions to bubble up
let messages = await this.retrieveNextMessages(nodeData.ip, nodeData); // so the user facing UI can report unhandled errors
// except in this case of living inside http-resource pollServer
// because it just restarts more connections...
let messages = await this._retrieveNextMessages(nodeData);
// this only tracks retrieval failures // this only tracks retrieval failures
// won't include parsing failures... // won't include parsing failures...
successiveFailures = 0; successiveFailures = 0;
@ -296,7 +311,7 @@ class LokiMessageAPI {
callback(messages); callback(messages);
} catch (e) { } catch (e) {
log.warn( log.warn(
'Loki retrieve messages error:', 'loki_message:::_openRetrieveConnection - retrieve error:',
e.code, e.code,
e.message, e.message,
`on ${nodeData.ip}:${nodeData.port}` `on ${nodeData.ip}:${nodeData.port}`
@ -324,40 +339,49 @@ class LokiMessageAPI {
} }
} }
if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) { if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
this.ourKey,
nodeData
);
log.warn( log.warn(
`removing ${nodeData.ip}:${ `loki_message:::_openRetrieveConnection - too many successive failures, removing ${
nodeData.port nodeData.ip
} from our swarm pool. We have ${ }:${nodeData.port} from our swarm pool. We have ${
Object.keys(this.ourSwarmNodes).length Object.keys(this.ourSwarmNodes).length
} usable swarm nodes left` } usable swarm nodes left (${
remainingSwarmSnodes.length
} in local db)`
); );
await lokiSnodeAPI.unreachableNode(this.ourKey, address);
} }
} }
// if not stopPollingResult // if not stopPollingResult
if (_.isEmpty(this.ourSwarmNodes)) { if (_.isEmpty(this.ourSwarmNodes)) {
log.error( log.error(
'We no longer have any swarm nodes available to try in pool, closing retrieve connection' 'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection'
); );
return false; return false;
} }
return true; return true;
} }
async retrieveNextMessages(nodeUrl, nodeData) { // we don't throw or catch here
// mark private (_ prefix) since no error handling is done here...
async _retrieveNextMessages(nodeData) {
const params = { const params = {
pubKey: this.ourKey, pubKey: this.ourKey,
lastHash: nodeData.lastHash || '', lastHash: nodeData.lastHash || '',
}; };
const options = { const options = {
timeout: 40000, timeout: 40000,
ourPubKey: this.ourKey,
headers: { headers: {
[LOKI_LONGPOLL_HEADER]: true, [LOKI_LONGPOLL_HEADER]: true,
}, },
}; };
// let exceptions bubble up
const result = await lokiRpc( const result = await lokiRpc(
`https://${nodeUrl}`, `https://${nodeData.ip}`,
nodeData.port, nodeData.port,
'retrieve', 'retrieve',
params, params,
@ -365,34 +389,39 @@ class LokiMessageAPI {
'/storage_rpc/v1', '/storage_rpc/v1',
nodeData nodeData
); );
return result.messages || []; return result.messages || [];
} }
// we don't throw or catch here
async startLongPolling(numConnections, stopPolling, callback) { async startLongPolling(numConnections, stopPolling, callback) {
log.info('startLongPolling for', numConnections, 'connections');
this.ourSwarmNodes = {}; this.ourSwarmNodes = {};
// load from local DB
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
log.info('swarmNodes', nodes.length, 'for', this.ourKey);
Object.keys(nodes).forEach(j => {
const node = nodes[j];
log.info(`${j} ${node.ip}:${node.port}`);
});
if (nodes.length < numConnections) { if (nodes.length < numConnections) {
log.warn( log.warn(
'Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain' 'loki_message:::startLongPolling - Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
); );
// load from blockchain
nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
if (nodes.length < numConnections) { if (nodes.length < numConnections) {
log.error( log.error(
'Could not get enough SwarmNodes for our pubkey from blockchain' 'loki_message:::startLongPolling - Could not get enough SwarmNodes for our pubkey from blockchain'
); );
} }
} }
log.info( log.info(
`There are currently ${ 'loki_message:::startLongPolling - start polling for',
nodes.length numConnections,
} swarmNodes for pubKey in our local database` 'connections. We have swarmNodes',
nodes.length,
'for',
this.ourKey
); );
Object.keys(nodes).forEach(j => {
const node = nodes[j];
log.info(`loki_message: ${j} ${node.ip}:${node.port}`);
});
for (let i = 0; i < nodes.length; i += 1) { for (let i = 0; i < nodes.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode( const lastHash = await window.Signal.Data.getLastHashBySnode(
@ -406,15 +435,28 @@ class LokiMessageAPI {
const promises = []; const promises = [];
let unresolved = numConnections;
for (let i = 0; i < numConnections; i += 1) { for (let i = 0; i < numConnections; i += 1) {
promises.push(this.openRetrieveConnection(stopPolling, callback)); promises.push(
// eslint-disable-next-line more/no-then
this._openRetrieveConnection(stopPolling, callback).then(() => {
unresolved -= 1;
log.info(
'loki_message:::startLongPolling - There are',
unresolved,
'open retrieve connections left'
);
})
);
} }
// blocks until numConnections snodes in our swarms have been removed from the list // blocks until numConnections snodes in our swarms have been removed from the list
// less than numConnections being active is fine, only need to restart if none per Niels 20/02/13 // less than numConnections being active is fine, only need to restart if none per Niels 20/02/13
// or if there is network issues (ENOUTFOUND due to lokinet) // or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises); await Promise.all(promises);
log.error('All our long poll swarm connections have been removed'); log.error(
'loki_message:::startLongPolling - All our long poll swarm connections have been removed'
);
// should we just call ourself again? // should we just call ourself again?
// no, our caller already handles this... // no, our caller already handles this...
} }

@ -1,4 +1,4 @@
/* global log, window, process */ /* global log, window, process, URL */
const EventEmitter = require('events'); const EventEmitter = require('events');
const nodeFetch = require('node-fetch'); const nodeFetch = require('node-fetch');
const LokiAppDotNetAPI = require('./loki_app_dot_net_api'); const LokiAppDotNetAPI = require('./loki_app_dot_net_api');
@ -27,16 +27,18 @@ class LokiPublicChatFactoryAPI extends EventEmitter {
static async validServer(serverUrl) { static async validServer(serverUrl) {
// test to make sure it's online (and maybe has a valid SSL cert) // test to make sure it's online (and maybe has a valid SSL cert)
try { try {
const url = new URL(serverUrl);
// allow .loki (may only need an agent but not sure // allow .loki (may only need an agent but not sure
// until we have a .loki to test with) // until we have a .loki to test with)
if (serverUrl.match(/\.loki$/)) { process.env.NODE_TLS_REJECT_UNAUTHORIZED = url.host.match(/\.loki$/i)
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; ? '0'
} : '1';
// FIXME: use proxy when we have open groups that work with proxy
await nodeFetch(serverUrl); await nodeFetch(serverUrl);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1'; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
// const txt = await res.text(); // const txt = await res.text();
} catch (e) { } catch (e) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
log.warn(`failing to created ${serverUrl}`, e.code, e.message); log.warn(`failing to created ${serverUrl}`, e.code, e.message);
// bail out if not valid enough // bail out if not valid enough
return false; return false;

@ -29,7 +29,9 @@ const decryptResponse = async (response, address) => {
return {}; return {};
}; };
const sendToProxy = async (options = {}, targetNode) => { const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms));
const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress(); const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
// Don't allow arbitrary URLs, only snodes and loki servers // Don't allow arbitrary URLs, only snodes and loki servers
@ -60,36 +62,101 @@ const sendToProxy = async (options = {}, targetNode) => {
'X-Sender-Public-Key': StringView.arrayBufferToHex(myKeys.pubKey), 'X-Sender-Public-Key': StringView.arrayBufferToHex(myKeys.pubKey),
'X-Target-Snode-Key': targetNode.pubkey_ed25519, 'X-Target-Snode-Key': targetNode.pubkey_ed25519,
}, },
agent: snodeHttpsAgent,
}; };
// we only proxy to snodes... // we only proxy to snodes...
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 0;
const response = await nodeFetch(url, firstHopOptions); const response = await nodeFetch(url, firstHopOptions);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1;
if (response.status === 401) {
// decom or dereg
// remove
// but which the proxy or the target...
// we got a ton of randomPool nodes, let's just not worry about this one
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
const randomPoolRemainingCount = lokiSnodeAPI.getRandomPoolLength();
log.warn(
`lokiRpc sendToProxy`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`,
`snode is decom or dereg: `,
ciphertext,
// `marking random snode bad ${randomPoolRemainingCount} remaining`
`Try #${retryNumber}`,
`removing randSnode leaving ${randomPoolRemainingCount} in the random pool`
);
// retry, just count it happening 5 times to be the target for now
return sendToProxy(options, targetNode, retryNumber + 1);
}
// detect SNode is not ready (not in swarm; not done syncing) // detect SNode is not ready (not in swarm; not done syncing)
if (response.status === 503) { if (response.status === 503 || response.status === 500) {
const ciphertext = await response.text(); const ciphertext = await response.text();
log.error( // we shouldn't do these,
`lokiRpc sendToProxy snode ${randSnode.ip}:${randSnode.port} error`, // it's seems to be not the random node that's always bad
ciphertext // but the target node
// we got a ton of randomPool nodes, let's just not worry about this one
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
const randomPoolRemainingCount = lokiSnodeAPI.getRandomPoolLength();
log.warn(
`lokiRpc sendToProxy`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`,
`code ${response.status} error`,
ciphertext,
// `marking random snode bad ${randomPoolRemainingCount} remaining`
`Try #${retryNumber}`,
`removing randSnode leaving ${randomPoolRemainingCount} in the random pool`
); );
// mark as bad for this round (should give it some time and improve success rates) // mark as bad for this round (should give it some time and improve success rates)
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
// retry for a new working snode // retry for a new working snode
return sendToProxy(options, targetNode); const pRetryNumber = retryNumber + 1;
if (pRetryNumber > 5) {
// it's likely a net problem or an actual problem on the target node
// lets mark the target node bad for now
// we'll just rotate it back in if it's a net problem
log.warn(`Failing ${targetNode.ip}:${targetNode.port} after 5 retries`);
if (options.ourPubKey) {
lokiSnodeAPI.unreachableNode(options.ourPubKey, targetNode);
}
return false;
}
// 500 burns through a node too fast,
// let's slow the retry to give it more time to recover
if (response.status === 500) {
await timeoutDelay(5000);
}
return sendToProxy(options, targetNode, pRetryNumber);
} }
/*
if (response.status === 500) {
// usually when the server returns nothing...
}
*/
// FIXME: handle nodeFetch errors/exceptions... // FIXME: handle nodeFetch errors/exceptions...
if (response.status !== 200) { if (response.status !== 200) {
// let us know we need to create handlers for new unhandled codes // let us know we need to create handlers for new unhandled codes
log.warn('lokiRpc sendToProxy fetch non-200 statusCode', response.status); log.warn(
'lokiRpc sendToProxy fetch non-200 statusCode',
response.status,
`from snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`
);
return false;
} }
const ciphertext = await response.text(); const ciphertext = await response.text();
if (!ciphertext) { if (!ciphertext) {
// avoid base64 decode failure // avoid base64 decode failure
log.warn('Server did not return any data for', options); // usually a 500 but not always
// could it be a timeout?
log.warn('Server did not return any data for', options, targetNode);
return false;
} }
let plaintext; let plaintext;
@ -112,7 +179,9 @@ const sendToProxy = async (options = {}, targetNode) => {
'lokiRpc sendToProxy decode error', 'lokiRpc sendToProxy decode error',
e.code, e.code,
e.message, e.message,
`from ${randSnode.ip}:${randSnode.port} ciphertext:`, `from ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
} ciphertext:`,
ciphertext ciphertext
); );
if (ciphertextBuffer) { if (ciphertextBuffer) {
@ -138,6 +207,15 @@ const sendToProxy = async (options = {}, targetNode) => {
} }
return false; return false;
}; };
if (retryNumber) {
log.info(
`lokiRpc sendToProxy request succeeded,`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`,
`on retry #${retryNumber}`
);
}
return jsonRes; return jsonRes;
} catch (e) { } catch (e) {
log.error( log.error(
@ -182,22 +260,24 @@ const lokiFetch = async (url, options = {}, targetNode = null) => {
timeout, timeout,
method, method,
}; };
if (url.match(/https:\/\//)) {
fetchOptions.agent = snodeHttpsAgent;
}
try { try {
if (window.lokiFeatureFlags.useSnodeProxy && targetNode) { if (window.lokiFeatureFlags.useSnodeProxy && targetNode) {
const result = await sendToProxy(fetchOptions, targetNode); const result = await sendToProxy(fetchOptions, targetNode);
return result ? result.json() : false; // if not result, maybe we should throw??
return result ? result.json() : {};
} }
if (url.match(/https:\/\//)) { if (url.match(/https:\/\//)) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 0; // import that this does not get set in sendToProxy fetchOptions
fetchOptions.agent = snodeHttpsAgent;
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
} else {
log.info('lokiRpc http communication', url);
} }
const response = await nodeFetch(url, fetchOptions); const response = await nodeFetch(url, fetchOptions);
// restore TLS checking // restore TLS checking
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
let result; let result;
// Wrong swarm // Wrong swarm

@ -1,10 +1,12 @@
/* eslint-disable class-methods-use-this */ /* eslint-disable class-methods-use-this */
/* global window, ConversationController, _, log */ /* global window, ConversationController, _, log, clearTimeout */
const is = require('@sindresorhus/is'); const is = require('@sindresorhus/is');
const { lokiRpc } = require('./loki_rpc'); const { lokiRpc } = require('./loki_rpc');
const RANDOM_SNODES_TO_USE = 3; const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3;
const RANDOM_SNODES_POOL_SIZE = 1024;
const SEED_NODE_RETRIES = 3;
class LokiSnodeAPI { class LokiSnodeAPI {
constructor({ serverUrl, localUrl }) { constructor({ serverUrl, localUrl }) {
@ -15,13 +17,14 @@ class LokiSnodeAPI {
this.localUrl = localUrl; // localhost.loki this.localUrl = localUrl; // localhost.loki
this.randomSnodePool = []; this.randomSnodePool = [];
this.swarmsPendingReplenish = {}; this.swarmsPendingReplenish = {};
this.refreshRandomPoolPromise = false;
} }
async getRandomSnodeAddress() { async getRandomSnodeAddress() {
/* resolve random snode */ /* resolve random snode */
if (this.randomSnodePool.length === 0) { if (this.randomSnodePool.length === 0) {
// allow exceptions to pass through upwards // allow exceptions to pass through upwards
await this.initialiseRandomPool(); await this.refreshRandomPool();
} }
if (this.randomSnodePool.length === 0) { if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response'); throw new window.textsecure.SeedNodeError('Invalid seed node response');
@ -31,74 +34,150 @@ class LokiSnodeAPI {
]; ];
} }
async initialiseRandomPool( async refreshRandomPool(seedNodes = [...window.seedNodeList]) {
seedNodes = [...window.seedNodeList], // if currently not in progress
consecutiveErrors = 0 if (this.refreshRandomPoolPromise === false) {
) { // set lock
const params = { this.refreshRandomPoolPromise = new Promise(async (resolve, reject) => {
limit: 20, let timeoutTimer = null;
active_only: true, // private retry container
fields: { const trySeedNode = async (consecutiveErrors = 0) => {
public_ip: true, const params = {
storage_port: true, limit: RANDOM_SNODES_POOL_SIZE,
pubkey_x25519: true, active_only: true,
pubkey_ed25519: true, fields: {
}, public_ip: true,
}; storage_port: true,
const seedNode = seedNodes.splice( pubkey_x25519: true,
Math.floor(Math.random() * seedNodes.length), pubkey_ed25519: true,
1 },
)[0]; };
let snodes = []; const seedNode = seedNodes.splice(
Math.floor(Math.random() * seedNodes.length),
1
)[0];
let snodes = [];
try {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshing random snode pool'
);
const response = await lokiRpc(
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshed random snode pool with',
this.randomSnodePool.length,
'snodes'
);
// clear lock
this.refreshRandomPoolPromise = null;
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
resolve();
} catch (e) {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - error',
e.code,
e.message
);
if (consecutiveErrors < SEED_NODE_RETRIES) {
// retry after a possible delay
setTimeout(() => {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Retrying initialising random snode pool, try #',
consecutiveErrors
);
trySeedNode(consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
log.error(
'loki_snodes:::refreshRandomPoolPromise - Giving up trying to contact seed node'
);
if (snodes.length === 0) {
this.refreshRandomPoolPromise = null; // clear lock
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
reject();
}
}
}
};
const delay = (SEED_NODE_RETRIES + 1) * (SEED_NODE_RETRIES + 1) * 5000;
timeoutTimer = setTimeout(() => {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - TIMEDOUT after',
delay,
's'
);
reject();
}, delay);
trySeedNode();
});
}
try { try {
const response = await lokiRpc( await this.refreshRandomPoolPromise;
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
} catch (e) { } catch (e) {
log.warn('initialiseRandomPool error', e.code, e.message); // we will throw for each time initialiseRandomPool has been called in parallel
if (consecutiveErrors < 3) { log.error(
// retry after a possible delay 'loki_snodes:::refreshRandomPoolPromise - error',
setTimeout(() => { e.code,
log.info( e.message
'Retrying initialising random snode pool, try #', );
consecutiveErrors throw new window.textsecure.SeedNodeError('Failed to contact seed node');
);
this.initialiseRandomPool(seedNodes, consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
log.error('Giving up trying to contact seed node');
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError(
'Failed to contact seed node'
);
}
}
} }
log.info('loki_snodes:::refreshRandomPoolPromise - RESOLVED');
} }
// nodeUrl is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode // unreachableNode.url is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode
async unreachableNode(pubKey, nodeUrl) { async unreachableNode(pubKey, unreachableNode) {
const conversation = ConversationController.get(pubKey); const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')]; const swarmNodes = [...conversation.get('swarmNodes')];
const filteredNodes = swarmNodes.filter( if (typeof unreachableNode === 'string') {
node => node.address !== nodeUrl && node.ip !== nodeUrl log.warn(
); 'loki_snodes:::unreachableNode - String passed as unreachableNode to unreachableNode'
);
return swarmNodes;
}
let found = false;
const filteredNodes = swarmNodes.filter(node => {
// keep all but thisNode
const thisNode =
node.address === unreachableNode.address &&
node.ip === unreachableNode.ip &&
node.port === unreachableNode.port;
if (thisNode) {
found = true;
}
return !thisNode;
});
if (!found) {
log.warn(
`loki_snodes:::unreachableNode - snode ${unreachableNode.ip}:${
unreachableNode.port
} has already been marked as bad`
);
}
await conversation.updateSwarmNodes(filteredNodes); await conversation.updateSwarmNodes(filteredNodes);
return filteredNodes;
} }
markRandomNodeUnreachable(snode) { markRandomNodeUnreachable(snode) {
@ -108,6 +187,10 @@ class LokiSnodeAPI {
); );
} }
getRandomPoolLength() {
return this.randomSnodePool.length;
}
async updateLastHash(snode, hash, expiresAt) { async updateLastHash(snode, hash, expiresAt) {
await window.Signal.Data.updateLastHash({ snode, hash, expiresAt }); await window.Signal.Data.updateLastHash({ snode, hash, expiresAt });
} }
@ -150,7 +233,11 @@ class LokiSnodeAPI {
try { try {
newSwarmNodes = await this.getSwarmNodes(pubKey); newSwarmNodes = await this.getSwarmNodes(pubKey);
} catch (e) { } catch (e) {
log.error('getFreshSwarmNodes error', e.code, e.message); log.error(
'loki_snodes:::getFreshSwarmNodes - error',
e.code,
e.message
);
// TODO: Handle these errors sensibly // TODO: Handle these errors sensibly
newSwarmNodes = []; newSwarmNodes = [];
} }
@ -177,30 +264,43 @@ class LokiSnodeAPI {
); );
if (!result) { if (!result) {
log.warn( log.warn(
`getSnodesForPubkey lokiRpc on ${snode.ip}:${ `loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${
snode.port snode.port
} returned falsish value`, } returned falsish value`,
result result
); );
return []; return [];
} }
if (!result.snodes) {
// we hit this when snode gives 500s
log.warn(
`loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${
snode.port
} returned falsish value for snodes`,
result
);
return [];
}
const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0'); const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0');
return snodes; return snodes;
} catch (e) { } catch (e) {
this.markRandomNodeUnreachable(snode);
const randomPoolRemainingCount = this.getRandomPoolLength();
log.error( log.error(
'getSnodesForPubkey error', 'loki_snodes:::getSnodesForPubkey - error',
e.code, e.code,
e.message, e.message,
`for ${snode.ip}:${snode.port}` `for ${snode.ip}:${
snode.port
}. ${randomPoolRemainingCount} snodes remaining in randomPool`
); );
this.markRandomNodeUnreachable(snode);
return []; return [];
} }
} }
async getSwarmNodes(pubKey) { async getSwarmNodes(pubKey) {
const snodes = []; const snodes = [];
const questions = [...Array(RANDOM_SNODES_TO_USE).keys()]; const questions = [...Array(RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM).keys()];
await Promise.all( await Promise.all(
questions.map(async () => { questions.map(async () => {
// allow exceptions to pass through upwards // allow exceptions to pass through upwards

@ -223,7 +223,7 @@
if (item) { if (item) {
return item.value; return item.value;
} }
window.log.error('Could not load identityKey from SignalData');
return undefined; return undefined;
}, },
async getLocalRegistrationId() { async getLocalRegistrationId() {

@ -110,7 +110,7 @@
async function createContactSyncProtoMessage(conversations) { async function createContactSyncProtoMessage(conversations) {
// Extract required contacts information out of conversations // Extract required contacts information out of conversations
const sessionContacts = conversations.filter( const sessionContacts = conversations.filter(
c => c.isPrivate() && !c.isSecondaryDevice() c => c.isPrivate() && !c.isSecondaryDevice() && c.isFriend()
); );
if (sessionContacts.length === 0) { if (sessionContacts.length === 0) {

@ -147,7 +147,9 @@
({ authorisations } = primaryDeviceMapping); ({ authorisations } = primaryDeviceMapping);
} }
} }
return authorisations || [];
// filter out any invalid authorisations
return authorisations.filter(a => a && typeof a === 'object') || [];
} }
// if the device is a secondary device, // if the device is a secondary device,
@ -168,6 +170,10 @@
} }
async function savePairingAuthorisation(authorisation) { async function savePairingAuthorisation(authorisation) {
if (!authorisation) {
return;
}
// Ensure that we have a conversation for all the devices // Ensure that we have a conversation for all the devices
const conversation = await ConversationController.getOrCreateAndWait( const conversation = await ConversationController.getOrCreateAndWait(
authorisation.secondaryDevicePubKey, authorisation.secondaryDevicePubKey,

@ -84,6 +84,8 @@
}; };
this.pollServer = async () => { this.pollServer = async () => {
// bg.connect calls mr connect after storage system is ready
window.log.info('http-resource pollServer start');
// This blocking call will return only when all attempts // This blocking call will return only when all attempts
// at reaching snodes are exhausted or a DNS error occured // at reaching snodes are exhausted or a DNS error occured
try { try {
@ -93,25 +95,30 @@
messages => { messages => {
connected = true; connected = true;
messages.forEach(message => { messages.forEach(message => {
const { data } = message; this.handleMessage(message.data);
this.handleMessage(data);
}); });
} }
); );
} catch (e) { } catch (e) {
// we'll try again anyway // we'll try again anyway
window.log.error('http-resource pollServer error', e.code, e.message); window.log.error(
'http-resource pollServer error',
e.code,
e.message,
e.stack
);
} }
connected = false;
if (this.calledStop) { if (this.calledStop) {
// don't restart
return; return;
} }
connected = false;
// Exhausted all our snodes urls, trying again later from scratch // Exhausted all our snodes urls, trying again later from scratch
setTimeout(() => { setTimeout(() => {
window.log.info( window.log.info(
`Exhausted all our snodes urls, trying again in ${EXHAUSTED_SNODES_RETRY_DELAY / `http-resource: Exhausted all our snodes urls, trying again in ${EXHAUSTED_SNODES_RETRY_DELAY /
1000}s from scratch` 1000}s from scratch`
); );
this.pollServer(); this.pollServer();

@ -27,7 +27,9 @@ const {
shell, shell,
} = electron; } = electron;
const appUserModelId = packageJson.build.appId; // FIXME Hardcoding appId to prevent build failrues on release.
// const appUserModelId = packageJson.build.appId;
const appUserModelId = 'com.loki-project.messenger-desktop';
console.log('Set Windows Application User Model ID (AUMID)', { console.log('Set Windows Application User Model ID (AUMID)', {
appUserModelId, appUserModelId,
}); });
@ -427,7 +429,7 @@ async function readyForUpdates() {
// Second, start checking for app updates // Second, start checking for app updates
try { try {
await updater.start(getMainWindow, locale.messages, logger); await updater.start(getMainWindow, userConfig, locale.messages, logger);
} catch (error) { } catch (error) {
const log = logger || console; const log = logger || console;
log.error( log.error(
@ -1090,6 +1092,24 @@ ipc.on('set-media-permissions', (event, value) => {
} }
}); });
// Loki - Auto updating
ipc.on('get-auto-update-setting', event => {
const configValue = userConfig.get('autoUpdate');
// eslint-disable-next-line no-param-reassign
event.returnValue = typeof configValue !== 'boolean' ? true : configValue;
});
ipc.on('set-auto-update-setting', (event, enabled) => {
userConfig.set('autoUpdate', !!enabled);
if (enabled) {
readyForUpdates();
} else {
updater.stop();
isReadyForUpdates = false;
}
});
function getDataFromMainWindow(name, callback) { function getDataFromMainWindow(name, callback) {
ipc.once(`get-success-${name}`, (_event, error, value) => ipc.once(`get-success-${name}`, (_event, error, value) =>
callback(error, value) callback(error, value)

@ -211,8 +211,11 @@ window.getSettingValue = (settingID, comparisonValue = null) => {
// Eg. window.getSettingValue('theme', 'light') // Eg. window.getSettingValue('theme', 'light')
// returns 'false' when the value is 'dark'. // returns 'false' when the value is 'dark'.
// We need to get specific settings from the main process
if (settingID === 'media-permissions') { if (settingID === 'media-permissions') {
return window.getMediaPermissions(); return window.getMediaPermissions();
} else if (settingID === 'auto-update') {
return window.getAutoUpdateEnabled();
} }
const settingVal = window.storage.get(settingID); const settingVal = window.storage.get(settingID);
@ -220,6 +223,12 @@ window.getSettingValue = (settingID, comparisonValue = null) => {
}; };
window.setSettingValue = (settingID, value) => { window.setSettingValue = (settingID, value) => {
// For auto updating we need to pass the value to the main process
if (settingID === 'auto-update') {
window.setAutoUpdateEnabled(value);
return;
}
window.storage.put(settingID, value); window.storage.put(settingID, value);
if (settingID === 'zoom-factor-setting') { if (settingID === 'zoom-factor-setting') {
@ -231,6 +240,11 @@ window.setSettingValue = (settingID, value) => {
window.getMessageTTL = () => window.storage.get('message-ttl', 24); window.getMessageTTL = () => window.storage.get('message-ttl', 24);
window.getMediaPermissions = () => ipc.sendSync('get-media-permissions'); window.getMediaPermissions = () => ipc.sendSync('get-media-permissions');
// Auto update setting
window.getAutoUpdateEnabled = () => ipc.sendSync('get-auto-update-setting');
window.setAutoUpdateEnabled = value =>
ipc.send('set-auto-update-setting', !!value);
ipc.on('get-ready-for-shutdown', async () => { ipc.on('get-ready-for-shutdown', async () => {
const { shutdown } = window.Events || {}; const { shutdown } = window.Events || {};
if (!shutdown) { if (!shutdown) {

@ -1457,6 +1457,7 @@ label {
resize: none; resize: none;
overflow: hidden; overflow: hidden;
user-select: all; user-select: all;
overflow-y: auto;
} }
input { input {

@ -213,7 +213,7 @@ export class Message extends React.PureComponent<Props, State> {
} }
public renderMetadataBadges() { public renderMetadataBadges() {
const { direction, isPublic, senderIsModerator } = this.props; const { direction, isPublic, senderIsModerator, id } = this.props;
const badges = [isPublic && 'Public', senderIsModerator && 'Mod']; const badges = [isPublic && 'Public', senderIsModerator && 'Mod'];
@ -224,7 +224,7 @@ export class Message extends React.PureComponent<Props, State> {
} }
return ( return (
<> <div key={`${id}-${badgeText}`}>
<span className="module-message__metadata__badge--separator"> <span className="module-message__metadata__badge--separator">
&nbsp;&nbsp; &nbsp;&nbsp;
</span> </span>
@ -239,7 +239,7 @@ export class Message extends React.PureComponent<Props, State> {
> >
{badgeText} {badgeText}
</span> </span>
</> </div>
); );
}) })
.filter(i => !!i); .filter(i => !!i);
@ -1064,9 +1064,13 @@ export class Message extends React.PureComponent<Props, State> {
// This id is what connects our triple-dot click with our associated pop-up menu. // This id is what connects our triple-dot click with our associated pop-up menu.
// It needs to be unique. // It needs to be unique.
const triggerId = String(id || `${authorPhoneNumber}-${timestamp}`); // The Date.now() is a workaround to be sure a single triggerID with this id exists
const rightClickTriggerId = `${authorPhoneNumber}-ctx-${timestamp}`; const triggerId = id
? String(`${id}-${Date.now()}`)
: String(`${authorPhoneNumber}-${timestamp}`);
const rightClickTriggerId = id
? String(`${id}-ctx-${Date.now()}`)
: String(`${authorPhoneNumber}-ctx-${timestamp}`);
if (expired) { if (expired) {
return null; return null;
} }

@ -1,5 +1,6 @@
import React from 'react'; import React from 'react';
import { SessionIconButton, SessionIconSize, SessionIconType } from './icon'; import { SessionIconButton, SessionIconSize, SessionIconType } from './icon';
import { ContextMenu, ContextMenuTrigger, MenuItem } from 'react-contextmenu';
interface Props { interface Props {
searchString: string; searchString: string;
@ -16,20 +17,46 @@ export class SessionSearchInput extends React.Component<Props> {
public render() { public render() {
const { searchString } = this.props; const { searchString } = this.props;
const triggerId = 'session-search-input-context';
return ( return (
<div className="session-search-input"> <>
<SessionIconButton <ContextMenuTrigger id={triggerId}>
iconSize={SessionIconSize.Medium} <div className="session-search-input">
iconType={SessionIconType.Search} <SessionIconButton
/> iconSize={SessionIconSize.Medium}
<input iconType={SessionIconType.Search}
value={searchString} />
onChange={e => this.props.onChange(e.target.value)} <input
onKeyDown={this.handleKeyDown} value={searchString}
placeholder={this.props.placeholder} onChange={e => this.props.onChange(e.target.value)}
/> onKeyDown={this.handleKeyDown}
</div> placeholder={this.props.placeholder}
/>
</div>
</ContextMenuTrigger>
<ContextMenu id={triggerId}>
<MenuItem onClick={() => document.execCommand('undo')}>
{window.i18n('editMenuUndo')}
</MenuItem>
<MenuItem onClick={() => document.execCommand('redo')}>
{window.i18n('editMenuRedo')}
</MenuItem>
<hr />
<MenuItem onClick={() => document.execCommand('cut')}>
{window.i18n('editMenuCut')}
</MenuItem>
<MenuItem onClick={() => document.execCommand('copy')}>
{window.i18n('editMenuCopy')}
</MenuItem>
<MenuItem onClick={() => document.execCommand('paste')}>
{window.i18n('editMenuPaste')}
</MenuItem>
<MenuItem onClick={() => document.execCommand('selectAll')}>
{window.i18n('editMenuSelectAll')}
</MenuItem>
</ContextMenu>
</>
); );
} }

@ -496,6 +496,19 @@ export class SettingsView extends React.Component<SettingsViewProps, State> {
content: {}, content: {},
confirmationDialogParams: undefined, confirmationDialogParams: undefined,
}, },
{
id: 'auto-update',
title: window.i18n('autoUpdateSettingTitle'),
description: window.i18n('autoUpdateSettingDescription'),
hidden: false,
type: SessionSettingType.Toggle,
category: SessionSettingCategory.Privacy,
setFn: undefined,
comparisonValue: undefined,
onClick: undefined,
content: {},
confirmationDialogParams: undefined,
},
{ {
id: 'set-password', id: 'set-password',
title: window.i18n('setAccountPasswordTitle'), title: window.i18n('setAccountPasswordTitle'),

@ -1,12 +1,15 @@
import { get as getFromConfig } from 'config'; import { get as getFromConfig } from 'config';
import { BrowserWindow } from 'electron'; import { BrowserWindow } from 'electron';
import { start as startUpdater } from './updater'; import { start as startUpdater, stop as stopUpdater } from './updater';
import { LoggerType, MessagesType } from './common'; import { LoggerType, MessagesType } from './common';
import { UserConfig } from '../../app/user_config';
let initialized = false; let initialized = false;
let config: UserConfig;
export async function start( export async function start(
getMainWindow: () => BrowserWindow, getMainWindow: () => BrowserWindow,
userConfig: UserConfig,
messages?: MessagesType, messages?: MessagesType,
logger?: LoggerType logger?: LoggerType
) { ) {
@ -14,6 +17,7 @@ export async function start(
throw new Error('updater/start: Updates have already been initialized!'); throw new Error('updater/start: Updates have already been initialized!');
} }
initialized = true; initialized = true;
config = userConfig;
if (!messages) { if (!messages) {
throw new Error('updater/start: Must provide messages!'); throw new Error('updater/start: Must provide messages!');
@ -40,8 +44,18 @@ export async function start(
await startUpdater(getMainWindow, messages, logger); await startUpdater(getMainWindow, messages, logger);
} }
export function stop() {
if (initialized) {
stopUpdater();
initialized = false;
}
}
function autoUpdateDisabled() { function autoUpdateDisabled() {
return ( return (
process.mas || !getFromConfig('updatesEnabled') // From Electron: Mac App Store build process.mas || // From Electron: Mac App Store build
!getFromConfig('updatesEnabled') || // Hard coded config
// tslint:disable-next-line: no-backbone-get-set-outside-model
!config.get('autoUpdate') // User setting
); );
} }

@ -3,6 +3,7 @@ import * as fs from 'fs-extra';
import { autoUpdater, UpdateInfo } from 'electron-updater'; import { autoUpdater, UpdateInfo } from 'electron-updater';
import { app, BrowserWindow } from 'electron'; import { app, BrowserWindow } from 'electron';
import { markShouldQuit } from '../../app/window_state'; import { markShouldQuit } from '../../app/window_state';
import { import {
getPrintableError, getPrintableError,
LoggerType, LoggerType,
@ -15,6 +16,8 @@ import { gt as isVersionGreaterThan, parse as parseVersion } from 'semver';
let isUpdating = false; let isUpdating = false;
let downloadIgnored = false; let downloadIgnored = false;
let interval: NodeJS.Timeout | undefined;
let stopped = false;
const SECOND = 1000; const SECOND = 1000;
const MINUTE = SECOND * 60; const MINUTE = SECOND * 60;
@ -25,28 +28,43 @@ export async function start(
messages: MessagesType, messages: MessagesType,
logger: LoggerType logger: LoggerType
) { ) {
if (interval) {
logger.info('auto-update: Already running');
return;
}
logger.info('auto-update: starting checks...'); logger.info('auto-update: starting checks...');
autoUpdater.logger = logger; autoUpdater.logger = logger;
autoUpdater.autoDownload = false; autoUpdater.autoDownload = false;
setInterval(async () => { interval = setInterval(async () => {
try { try {
await checkForUpdates(getMainWindow, messages, logger); await checkForUpdates(getMainWindow, messages, logger);
} catch (error) { } catch (error) {
logger.error('auto-update: error:', getPrintableError(error)); logger.error('auto-update: error:', getPrintableError(error));
} }
}, INTERVAL); }, INTERVAL);
stopped = false;
await checkForUpdates(getMainWindow, messages, logger); await checkForUpdates(getMainWindow, messages, logger);
} }
export function stop() {
if (interval) {
clearInterval(interval);
interval = undefined;
}
stopped = true;
}
async function checkForUpdates( async function checkForUpdates(
getMainWindow: () => BrowserWindow, getMainWindow: () => BrowserWindow,
messages: MessagesType, messages: MessagesType,
logger: LoggerType logger: LoggerType
) { ) {
if (isUpdating || downloadIgnored) { if (stopped || isUpdating || downloadIgnored) {
return; return;
} }

Loading…
Cancel
Save