Merge pull request #344 from BeaudanBrown/close-polling

Start attempt at closing long polling connections when offline event is triggered
pull/362/head
Beaudan Campbell-Brown 6 years ago committed by GitHub
commit 3a04bab5f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -253,13 +253,25 @@ class LokiMessageAPI {
return false; return false;
} }
async openRetrieveConnection(callback) { async openRetrieveConnection(stopPollingPromise, callback) {
while (!_.isEmpty(this.ourSwarmNodes)) { let stopPollingResult = false;
// When message_receiver restarts from onoffline/ononline events it closes
// http-resources, which will then resolve the stopPollingPromise with true. We then
// want to cancel these polling connections because new ones will be created
// eslint-disable-next-line more/no-then
stopPollingPromise.then(result => {
stopPollingResult = result;
});
while (!stopPollingResult && !_.isEmpty(this.ourSwarmNodes)) {
const address = Object.keys(this.ourSwarmNodes)[0]; const address = Object.keys(this.ourSwarmNodes)[0];
const nodeData = this.ourSwarmNodes[address]; const nodeData = this.ourSwarmNodes[address];
delete this.ourSwarmNodes[address]; delete this.ourSwarmNodes[address];
let successiveFailures = 0; let successiveFailures = 0;
while (successiveFailures < MAX_ACCEPTABLE_FAILURES) { while (
!stopPollingResult &&
successiveFailures < MAX_ACCEPTABLE_FAILURES
) {
await sleepFor(successiveFailures * 1000); await sleepFor(successiveFailures * 1000);
try { try {
@ -332,7 +344,7 @@ class LokiMessageAPI {
return result.messages || []; return result.messages || [];
} }
async startLongPolling(numConnections, callback) { async startLongPolling(numConnections, stopPolling, callback) {
this.ourSwarmNodes = {}; this.ourSwarmNodes = {};
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
if (nodes.length < numConnections) { if (nodes.length < numConnections) {
@ -353,7 +365,7 @@ class LokiMessageAPI {
const promises = []; const promises = [];
for (let i = 0; i < numConnections; i += 1) for (let i = 0; i < numConnections; i += 1)
promises.push(this.openRetrieveConnection(callback)); promises.push(this.openRetrieveConnection(stopPolling, callback));
// blocks until all snodes in our swarms have been removed from the list // blocks until all snodes in our swarms have been removed from the list
// or if there is network issues (ENOUTFOUND due to lokinet) // or if there is network issues (ENOUTFOUND due to lokinet)

@ -49,6 +49,11 @@
handleRequest = request => request.respond(404, 'Not found'); handleRequest = request => request.respond(404, 'Not found');
} }
let connected = true; let connected = true;
this.calledStop = false;
let resolveStopPolling;
const stopPolling = new Promise(res => {
resolveStopPolling = res;
});
this.handleMessage = (message, options = {}) => { this.handleMessage = (message, options = {}) => {
try { try {
@ -83,18 +88,26 @@
// This blocking call will return only when all attempts // This blocking call will return only when all attempts
// at reaching snodes are exhausted or a DNS error occured // at reaching snodes are exhausted or a DNS error occured
try { try {
await server.startLongPolling(NUM_CONCURRENT_CONNECTIONS, messages => { await server.startLongPolling(
NUM_CONCURRENT_CONNECTIONS,
stopPolling,
messages => {
connected = true; connected = true;
callback(connected); callback(connected);
messages.forEach(message => { messages.forEach(message => {
const { data } = message; const { data } = message;
this.handleMessage(data); this.handleMessage(data);
}); });
}); }
);
} catch (e) { } catch (e) {
// we'll try again anyway // we'll try again anyway
} }
if (this.calledStop) {
return;
}
connected = false; connected = false;
// Exhausted all our snodes urls, trying again later from scratch // Exhausted all our snodes urls, trying again later from scratch
setTimeout(() => { setTimeout(() => {
@ -105,5 +118,10 @@
this.isConnected = function isConnected() { this.isConnected = function isConnected() {
return connected; return connected;
}; };
this.close = () => {
this.calledStop = true;
resolveStopPolling(true);
};
}; };
})(); })();

@ -190,6 +190,10 @@ MessageReceiver.prototype.extend({
localLokiServer.close(); localLokiServer.close();
} }
if (this.httpPollingResource) {
this.httpPollingResource.close();
}
return this.drain(); return this.drain();
}, },
onopen() { onopen() {

Loading…
Cancel
Save