feat: refactored swarm polling to use only retrieveNextMessages again

created verifyBatchRequestResults function
pull/3056/head
William Grant 1 year ago
parent 1e40f1b608
commit d8cc0c79ea

@ -8,6 +8,7 @@ import { SnodeNamespace, SnodeNamespaces } from './namespaces';
import { TTL_DEFAULT } from '../../constants'; import { TTL_DEFAULT } from '../../constants';
import { UserUtils } from '../../utils'; import { UserUtils } from '../../utils';
import { import {
NotEmptyArrayOfBatchResults,
RetrieveLegacyClosedGroupSubRequestType, RetrieveLegacyClosedGroupSubRequestType,
RetrieveSubRequestType, RetrieveSubRequestType,
UpdateExpiryOnNodeSubRequest, UpdateExpiryOnNodeSubRequest,
@ -102,34 +103,11 @@ async function buildRetrieveRequest(
return retrieveRequestsParams; return retrieveRequestsParams;
} }
async function retrieveNextMessages( function verifyBatchRequestResults(
targetNode: Snode, targetNode: Snode,
lastHashes: Array<string>,
associatedWith: string,
namespaces: Array<SnodeNamespaces>, namespaces: Array<SnodeNamespaces>,
ourPubkey: string, results: NotEmptyArrayOfBatchResults
configHashesToBump: Array<string> | null ) {
): Promise<RetrieveMessagesResultsBatched> {
if (namespaces.length !== lastHashes.length) {
throw new Error('namespaces and last hashes do not match');
}
const retrieveRequestsParams = await buildRetrieveRequest(
lastHashes,
associatedWith,
namespaces,
ourPubkey,
configHashesToBump
);
// let exceptions bubble up
// no retry for this one as this a call we do every few seconds while polling for messages
const results = await doSnodeBatchRequest(
retrieveRequestsParams,
targetNode,
4000,
associatedWith
);
if (!results || !results.length) { if (!results || !results.length) {
window?.log?.warn( window?.log?.warn(
`_retrieveNextMessages - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}` `_retrieveNextMessages - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}`
@ -150,83 +128,47 @@ async function retrieveNextMessages(
const firstResult = results[0]; const firstResult = results[0];
if (firstResult.code !== 200) { if (firstResult.code !== 200) {
window?.log?.warn(`retrieveNextMessages result is not 200 but ${firstResult.code}`); window?.log?.warn(`_retrieveNextMessages result is not 200 but ${firstResult.code}`);
throw new Error( throw new Error(
`_retrieveNextMessages - retrieve result is not 200 with ${targetNode.ip}:${targetNode.port} but ${firstResult.code}` `_retrieveNextMessages - retrieve result is not 200 with ${targetNode.ip}:${targetNode.port} but ${firstResult.code}`
); );
} }
try { return firstResult;
// we rely on the code of the first one to check for online status
const bodyFirstResult = firstResult.body;
if (!window.inboxStore?.getState().onionPaths.isOnline) {
window.inboxStore?.dispatch(updateIsOnline(true));
}
GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t);
// merge results with their corresponding namespaces
return results.map((result, index) => ({
code: result.code,
messages: result.body as RetrieveMessagesResultsContent,
namespace: namespaces[index],
}));
} catch (e) {
window?.log?.warn('exception while parsing json of nextMessage:', e);
if (!window.inboxStore?.getState().onionPaths.isOnline) {
window.inboxStore?.dispatch(updateIsOnline(true));
}
throw new Error(
`_retrieveNextMessages - exception while parsing json of nextMessage ${targetNode.ip}:${targetNode.port}: ${e?.message}`
);
}
} }
async function retrieveDisplayName( async function retrieveNextMessages(
targetNode: Snode, targetNode: Snode,
ourPubkey: string lastHashes: Array<string>,
associatedWith: string,
namespaces: Array<SnodeNamespaces>,
ourPubkey: string,
configHashesToBump: Array<string> | null
): Promise<RetrieveMessagesResultsBatched> { ): Promise<RetrieveMessagesResultsBatched> {
if (namespaces.length !== lastHashes.length) {
throw new Error('namespaces and last hashes do not match');
}
const retrieveRequestsParams = await buildRetrieveRequest( const retrieveRequestsParams = await buildRetrieveRequest(
[], lastHashes,
ourPubkey, associatedWith,
[SnodeNamespaces.UserProfile], namespaces,
ourPubkey, ourPubkey,
[] configHashesToBump
); );
// let exceptions bubble up // let exceptions bubble up
// no retry for this one as this a call we do every few seconds through polling // no retry for this one as this a call we do every few seconds while polling for messages
const results = await doSnodeBatchRequest(retrieveRequestsParams, targetNode, 4000, ourPubkey);
if (!results || !results.length) {
window?.log?.warn(
`retrieveDisplayName - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}`
);
throw new Error(
`retrieveDisplayName - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}`
);
}
// the +1 is to take care of the extra `expire` method added once user config is released
if (results.length !== 1 && results.length !== 2) {
throw new Error(
`We asked for updates about a message but got results of length ${results.length}`
);
}
// do a basic check to know if we have something kind of looking right (status 200 should always be there for a retrieve)
const firstResult = results[0];
if (firstResult.code !== 200) { const results = await doSnodeBatchRequest(
window?.log?.warn(`retrieveDisplayName result is not 200 but ${firstResult.code}`); retrieveRequestsParams,
throw new Error( targetNode,
`retrieveDisplayName - retrieve result is not 200 with ${targetNode.ip}:${targetNode.port} but ${firstResult.code}` 4000,
associatedWith
); );
}
try { try {
// we rely on the code of the first one to check for online status // we rely on the code of the first one to check for online status
const firstResult = verifyBatchRequestResults(targetNode, namespaces, results);
const bodyFirstResult = firstResult.body; const bodyFirstResult = firstResult.body;
if (!window.inboxStore?.getState().onionPaths.isOnline) { if (!window.inboxStore?.getState().onionPaths.isOnline) {
window.inboxStore?.dispatch(updateIsOnline(true)); window.inboxStore?.dispatch(updateIsOnline(true));
@ -235,22 +177,20 @@ async function retrieveDisplayName(
GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t); GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t);
// merge results with their corresponding namespaces // merge results with their corresponding namespaces
const resultsWithNamespaces = results.map(result => ({ return results.map((result, index) => ({
code: result.code, code: result.code,
messages: result.body as RetrieveMessagesResultsContent, messages: result.body as RetrieveMessagesResultsContent,
namespace: SnodeNamespaces.UserProfile, namespace: namespaces[index],
})); }));
return resultsWithNamespaces;
} catch (e) { } catch (e) {
window?.log?.warn('retrieveDisplayName:', e); window?.log?.warn('exception while parsing json of nextMessage:', e);
if (!window.inboxStore?.getState().onionPaths.isOnline) { if (!window.inboxStore?.getState().onionPaths.isOnline) {
window.inboxStore?.dispatch(updateIsOnline(true)); window.inboxStore?.dispatch(updateIsOnline(true));
} }
throw new Error( throw new Error(
`retrieveDisplayName - exception while parsing json of nextMessage ${targetNode.ip}:${targetNode.port}: ${e?.message}` `_retrieveNextMessages - exception while parsing json of nextMessage ${targetNode.ip}:${targetNode.port}: ${e?.message}`
); );
} }
} }
export const SnodeAPIRetrieve = { retrieveNextMessages, retrieveDisplayName }; export const SnodeAPIRetrieve = { retrieveNextMessages };

@ -225,18 +225,7 @@ export class SwarmPolling {
namespaces: Array<SnodeNamespaces> namespaces: Array<SnodeNamespaces>
) { ) {
const polledPubkey = pubkey.key; const polledPubkey = pubkey.key;
const toPollFrom = await this.getNodesToPollFrom(pubkey.key);
const swarmSnodes = await snodePool.getSwarmFor(polledPubkey);
// Select nodes for which we already have lastHashes
const alreadyPolled = swarmSnodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]);
let toPollFrom = alreadyPolled.length ? alreadyPolled[0] : null;
// If we need more nodes, select randomly from the remaining nodes:
if (!toPollFrom) {
const notPolled = difference(swarmSnodes, alreadyPolled);
toPollFrom = sample(notPolled) as Snode;
}
let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null; let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null;
try { try {
@ -558,6 +547,22 @@ export class SwarmPolling {
} }
} }
private async getNodesToPollFrom(polledPubkey: string) {
const swarmSnodes = await snodePool.getSwarmFor(polledPubkey);
// Select nodes for which we already have lastHashes
const alreadyPolled = swarmSnodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]);
let toPollFrom = alreadyPolled.length ? alreadyPolled[0] : null;
// If we need more nodes, select randomly from the remaining nodes:
if (!toPollFrom) {
const notPolled = difference(swarmSnodes, alreadyPolled);
toPollFrom = sample(notPolled) as Snode;
}
return toPollFrom;
}
private loadGroupIds() { private loadGroupIds() {
const convos = getConversationController().getConversations(); const convos = getConversationController().getConversations();
@ -667,18 +672,7 @@ export class SwarmPolling {
} }
const pubkey = UserUtils.getOurPubKeyFromCache(); const pubkey = UserUtils.getOurPubKeyFromCache();
const polledPubkey = pubkey.key; const toPollFrom = await this.getNodesToPollFrom(pubkey.key);
const swarmSnodes = await snodePool.getSwarmFor(polledPubkey);
// Select nodes for which we already have lastHashes
const alreadyPolled = swarmSnodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]);
let toPollFrom = alreadyPolled.length ? alreadyPolled[0] : null;
// If we need more nodes, select randomly from the remaining nodes:
if (!toPollFrom) {
const notPolled = difference(swarmSnodes, alreadyPolled);
toPollFrom = sample(notPolled) as Snode;
}
if (abortSignal?.aborted) { if (abortSignal?.aborted) {
throw new NotFoundError( throw new NotFoundError(
@ -686,9 +680,13 @@ export class SwarmPolling {
); );
} }
const resultsFromUserProfile = await SnodeAPIRetrieve.retrieveDisplayName( const resultsFromUserProfile = await SnodeAPIRetrieve.retrieveNextMessages(
toPollFrom, toPollFrom,
pubkey.key [''],
pubkey.key,
[SnodeNamespaces.UserProfile],
pubkey.key,
null
); );
// check if we just fetched the details from the config namespaces. // check if we just fetched the details from the config namespaces.
@ -721,7 +719,6 @@ export class SwarmPolling {
); );
} }
// window.log.debug(`[pollOnceForOurDisplayName] displayName found ${displayName}`);
return displayName; return displayName;
} catch (e) { } catch (e) {
if (e.message === ERROR_CODE_NO_CONNECT) { if (e.message === ERROR_CODE_NO_CONNECT) {

Loading…
Cancel
Save