Added timeouts for requests so they don't for ages. Changed swarmNodes to be a set to work property with merge, now removing contact swarmNodes if they timeout

pull/132/head
Beaudan 6 years ago
parent 24553e29e7
commit bdbdf15469

@ -1,4 +1,3 @@
const fs = require('fs');
const path = require('path'); const path = require('path');
const mkdirp = require('mkdirp'); const mkdirp = require('mkdirp');
const rimraf = require('rimraf'); const rimraf = require('rimraf');

@ -86,7 +86,7 @@
friendRequestStatus: FriendRequestStatusEnum.none, friendRequestStatus: FriendRequestStatusEnum.none,
unlockTimestamp: null, // Timestamp used for expiring friend requests. unlockTimestamp: null, // Timestamp used for expiring friend requests.
sessionResetStatus: SessionResetEnum.none, sessionResetStatus: SessionResetEnum.none,
swarmNodes: [], swarmNodes: new Set([]),
}; };
}, },

@ -656,8 +656,21 @@ async function removeAllSessions(id) {
// Conversation // Conversation
function setifyProperty(data, propertyName) {
if (!data) return data;
const returnData = data;
if (returnData[propertyName]) {
returnData[propertyName] = new Set(returnData[propertyName]);
}
return returnData;
}
async function getSwarmNodesByPubkey(pubkey) { async function getSwarmNodesByPubkey(pubkey) {
return channels.getSwarmNodesByPubkey(pubkey); let swarmNodes = await channels.getSwarmNodesByPubkey(pubkey);
if (Array.isArray(swarmNodes)) {
swarmNodes = new Set(swarmNodes);
}
return swarmNodes;
} }
async function getConversationCount() { async function getConversationCount() {
@ -665,7 +678,11 @@ async function getConversationCount() {
} }
async function saveConversation(data) { async function saveConversation(data) {
await channels.saveConversation(data); const storeData = data;
if (storeData.swarmNodes) {
storeData.swarmNodes = Array.from(storeData.swarmNodes);
}
await channels.saveConversation(storeData);
} }
async function saveConversations(data) { async function saveConversations(data) {
@ -673,7 +690,7 @@ async function saveConversations(data) {
} }
async function getConversationById(id, { Conversation }) { async function getConversationById(id, { Conversation }) {
const data = await channels.getConversationById(id); const data = setifyProperty(await channels.getConversationById(id), 'swarmNodes');
return new Conversation(data); return new Conversation(data);
} }
@ -684,6 +701,9 @@ async function updateConversation(id, data, { Conversation }) {
} }
const merged = merge({}, existing.attributes, data); const merged = merge({}, existing.attributes, data);
if (merged.swarmNodes instanceof Set) {
merged.swarmNodes = Array.from(merged.swarmNodes);
}
await channels.updateConversation(merged); await channels.updateConversation(merged);
} }
@ -704,7 +724,8 @@ async function _removeConversations(ids) {
} }
async function getAllConversations({ ConversationCollection }) { async function getAllConversations({ ConversationCollection }) {
const conversations = await channels.getAllConversations(); const conversations = (await channels.getAllConversations())
.map(c => setifyProperty(c, 'swarmNodes'));
const collection = new ConversationCollection(); const collection = new ConversationCollection();
collection.add(conversations); collection.add(conversations);
@ -717,7 +738,8 @@ async function getAllConversationIds() {
} }
async function getAllPrivateConversations({ ConversationCollection }) { async function getAllPrivateConversations({ ConversationCollection }) {
const conversations = await channels.getAllPrivateConversations(); const conversations = (await channels.getAllPrivateConversations())
.map(c => setifyProperty(c, 'swarmNodes'));
const collection = new ConversationCollection(); const collection = new ConversationCollection();
collection.add(conversations); collection.add(conversations);
@ -725,7 +747,8 @@ async function getAllPrivateConversations({ ConversationCollection }) {
} }
async function getAllGroupsInvolvingId(id, { ConversationCollection }) { async function getAllGroupsInvolvingId(id, { ConversationCollection }) {
const conversations = await channels.getAllGroupsInvolvingId(id); const conversations = (await channels.getAllGroupsInvolvingId(id))
.map(c => setifyProperty(c, 'swarmNodes'));
const collection = new ConversationCollection(); const collection = new ConversationCollection();
collection.add(conversations); collection.add(conversations);
@ -733,7 +756,8 @@ async function getAllGroupsInvolvingId(id, { ConversationCollection }) {
} }
async function searchConversations(query, { ConversationCollection }) { async function searchConversations(query, { ConversationCollection }) {
const conversations = await channels.searchConversations(query); const conversations = (await channels.searchConversations(query))
.map(c => setifyProperty(c, 'swarmNodes'));
const collection = new ConversationCollection(); const collection = new ConversationCollection();
collection.add(conversations); collection.add(conversations);

@ -19,7 +19,7 @@ class LokiMessageAPI {
async sendMessage(pubKey, data, messageTimeStamp, ttl) { async sendMessage(pubKey, data, messageTimeStamp, ttl) {
const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey) const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey)
if (!swarmNodes || swarmNodes.length === 0) { if (!swarmNodes || swarmNodes.size === 0) {
throw Error('No swarm nodes to query!'); throw Error('No swarm nodes to query!');
} }
@ -39,12 +39,13 @@ class LokiMessageAPI {
throw err; throw err;
} }
const requests = swarmNodes.map(async node => { const requests = Array.from(swarmNodes).map(async node => {
// TODO: Confirm sensible timeout
const options = { const options = {
url: `${node}${this.messageServerPort}/store`, url: `${node}${this.messageServerPort}/store`,
type: 'POST', type: 'POST',
responseType: undefined, responseType: undefined,
timeout: undefined, timeout: 5000,
}; };
const fetchOptions = { const fetchOptions = {
@ -100,11 +101,12 @@ class LokiMessageAPI {
let completedRequests = 0; let completedRequests = 0;
const doRequest = async (nodeUrl, nodeData) => { const doRequest = async (nodeUrl, nodeData) => {
// TODO: Confirm sensible timeout
const options = { const options = {
url: `${nodeUrl}${this.messageServerPort}/retrieve`, url: `${nodeUrl}${this.messageServerPort}/retrieve`,
type: 'GET', type: 'GET',
responseType: 'json', responseType: 'json',
timeout: undefined, timeout: 5000,
}; };
const headers = { const headers = {
@ -159,10 +161,10 @@ class LokiMessageAPI {
const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests; const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests;
const ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); const ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
if (Object.keys(ourSwarmNodes).length < remainingRequests) { if (Object.keys(ourSwarmNodes).length < remainingRequests) {
// This means we don't have enough swarm nodes to meet the minimum threshold
if (completedRequests !== 0) { if (completedRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough // TODO: Decide how to handle some completed requests but not enough
} }
return;
} }
await Promise.all( await Promise.all(

@ -34,9 +34,18 @@ class LokiSnodeAPI {
}); });
} }
unreachableNode(pubKey, nodeUrl) { async unreachableNode(pubKey, nodeUrl) {
if (pubKey === window.textsecure.storage.user.getNumber()) { if (pubKey === window.textsecure.storage.user.getNumber()) {
delete this.ourSwarmNodes[nodeUrl]; delete this.ourSwarmNodes[nodeUrl];
return;
}
const conversation = window.ConversationController.get(pubKey);
const swarmNodes = conversation.get('swarmNodes');
if (swarmNodes.delete(nodeUrl)) {
conversation.set({ swarmNodes });
await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, {
Conversation: Whisper.Conversation,
});
} }
} }
@ -55,17 +64,17 @@ class LokiSnodeAPI {
!this.ourSwarmNodes || !this.ourSwarmNodes ||
Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES
) { ) {
this.ourSwarmNodes = {};
// Try refresh our swarm list once // Try refresh our swarm list once
const ourKey = window.textsecure.storage.user.getNumber(); const ourKey = window.textsecure.storage.user.getNumber();
const nodeAddresses = await window.LokiSnodeAPI.getSwarmNodes(ourKey); const nodeAddresses = await window.LokiSnodeAPI.getSwarmNodes(ourKey);
if (!nodeAddresses || nodeAddresses.length === 0) {
throw Error('Could not load our swarm')
}
this.ourSwarmNodes = {};
nodeAddresses.forEach(url => { nodeAddresses.forEach(url => {
this.ourSwarmNodes[url] = {}; this.ourSwarmNodes[url] = {};
}) });
if (!this.ourSwarmNodes || Object.keys(this.ourSwarmNodes).length === 0) {
throw Error('Could not load our swarm')
}
} }
return this.ourSwarmNodes; return this.ourSwarmNodes;
} }
@ -73,7 +82,7 @@ class LokiSnodeAPI {
async getSwarmNodesByPubkey(pubKey) { async getSwarmNodesByPubkey(pubKey) {
const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey); const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey);
// TODO: Check if swarm list is below a threshold rather than empty // TODO: Check if swarm list is below a threshold rather than empty
if (swarmNodes && swarmNodes.length !== 0) { if (swarmNodes && swarmNodes.size !== 0) {
return swarmNodes; return swarmNodes;
} }
return this.replenishSwarm(pubKey); return this.replenishSwarm(pubKey);
@ -83,7 +92,7 @@ class LokiSnodeAPI {
const conversation = window.ConversationController.get(pubKey); const conversation = window.ConversationController.get(pubKey);
if (!(pubKey in this.swarmsPendingReplenish)) { if (!(pubKey in this.swarmsPendingReplenish)) {
this.swarmsPendingReplenish[pubKey] = new Promise(async (resolve) => { this.swarmsPendingReplenish[pubKey] = new Promise(async (resolve) => {
const newSwarmNodes = await this.getSwarmNodes(pubKey); const newSwarmNodes = new Set(await this.getSwarmNodes(pubKey));
conversation.set({ swarmNodes: newSwarmNodes }); conversation.set({ swarmNodes: newSwarmNodes });
await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, { await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, {
Conversation: Whisper.Conversation, Conversation: Whisper.Conversation,
@ -97,12 +106,14 @@ class LokiSnodeAPI {
} }
async getSwarmNodes(pubKey) { async getSwarmNodes(pubKey) {
// TODO: Hit multiple random nodes and merge lists?
const node = await this.getRandomSnodeAddress(); const node = await this.getRandomSnodeAddress();
// TODO: Confirm final API URL and sensible timeout
const options = { const options = {
url: `http://${node}${this.swarmServerPort}/json_rpc`, url: `http://${node}${this.swarmServerPort}/json_rpc`,
type: 'POST', type: 'POST',
responseType: 'json', responseType: 'json',
timeout: undefined, timeout: 5000,
}; };
const body = { const body = {

Loading…
Cancel
Save