From b129c409a78a4cfe194d3c2edcebf9209c5c67be Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Mon, 2 Dec 2024 16:57:35 +1100 Subject: [PATCH] fix: allow to abort a request globally when a request takes too long --- ts/interactions/conversationInteractions.ts | 20 ++++++--- ts/session/apis/snode_api/SNodeAPI.ts | 41 +++++++++++++------ ts/session/apis/snode_api/batchRequest.ts | 5 ++- ts/session/apis/snode_api/expireRequest.ts | 3 +- .../apis/snode_api/getExpiriesRequest.ts | 3 +- .../apis/snode_api/getServiceNodesList.ts | 4 +- ts/session/apis/snode_api/getSwarmFor.ts | 8 ++-- ts/session/apis/snode_api/onsResolve.ts | 4 +- ts/session/apis/snode_api/retrieveRequest.ts | 4 +- .../conversations/ConversationController.ts | 19 ++++++--- ts/session/sending/MessageSender.ts | 13 +++++- ts/session/utils/Promise.ts | 21 ++++++++++ .../jobs/GroupPendingRemovalsJob.ts | 19 ++++++--- .../utils/job_runners/jobs/GroupSyncJob.ts | 27 ++++++++---- .../utils/job_runners/jobs/UserSyncJob.ts | 20 ++++++--- 15 files changed, 155 insertions(+), 56 deletions(-) diff --git a/ts/interactions/conversationInteractions.ts b/ts/interactions/conversationInteractions.ts index 4fb24dabe..3efec954e 100644 --- a/ts/interactions/conversationInteractions.ts +++ b/ts/interactions/conversationInteractions.ts @@ -1,5 +1,6 @@ import { isEmpty, isNil, uniq } from 'lodash'; import { PubkeyType, WithGroupPubkey } from 'libsession_util_nodejs'; +import AbortController from 'abort-controller'; import { ConversationNotificationSettingType, READ_MESSAGE_STATE, @@ -20,7 +21,7 @@ import { DecryptedAttachmentsManager } from '../session/crypto/DecryptedAttachme import { DisappearingMessageConversationModeType } from '../session/disappearing_messages/types'; import { PubKey } from '../session/types'; import { perfEnd, perfStart } from '../session/utils/Performance'; -import { sleepFor } from '../session/utils/Promise'; +import { sleepFor, timeoutWithAbort } from '../session/utils/Promise'; import { ed25519Str, fromHexToArray, toHex } from '../session/utils/String'; import { UserSync } from '../session/utils/job_runners/jobs/UserSyncJob'; import { SessionUtilContact } from '../session/utils/libsession/libsession_utils_contacts'; @@ -59,6 +60,7 @@ import { GroupUpdateMessageFactory } from '../session/messages/message_factory/g import { GroupPromote } from '../session/utils/job_runners/jobs/GroupPromoteJob'; import { MessageSender } from '../session/sending'; import { StoreGroupRequestFactory } from '../session/apis/snode_api/factories/StoreGroupRequestFactory'; +import { DURATION } from '../session/constants'; export async function copyPublicKeyByConvoId(convoId: string) { if (OpenGroupUtils.isOpenGroupV2(convoId)) { @@ -1037,11 +1039,17 @@ export async function promoteUsersInGroup({ groupInWrapper ); - const result = await MessageSender.sendEncryptedDataToSnode({ - destination: groupPk, - method: 'batch', - sortedSubRequests: storeRequests, - }); + const controller = new AbortController(); + const result = await timeoutWithAbort( + MessageSender.sendEncryptedDataToSnode({ + destination: groupPk, + method: 'batch', + sortedSubRequests: storeRequests, + abortSignal: controller.signal, + }), + 2 * DURATION.MINUTES, + controller + ); if (result?.[0].code !== 200) { window.log.warn('promoteUsersInGroup: failed to store change'); diff --git a/ts/session/apis/snode_api/SNodeAPI.ts b/ts/session/apis/snode_api/SNodeAPI.ts index 17d321ef7..281c59fc7 100644 --- a/ts/session/apis/snode_api/SNodeAPI.ts +++ b/ts/session/apis/snode_api/SNodeAPI.ts @@ -3,6 +3,7 @@ import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs'; import { compact, isEmpty } from 'lodash'; import pRetry from 'p-retry'; +import AbortController from 'abort-controller'; import { UserGroupsWrapperActions } from '../../../webworker/workers/browser/libsession_worker_interface'; import { getSodiumRenderer } from '../../crypto'; import { PubKey } from '../../types'; @@ -14,6 +15,7 @@ import { DeleteGroupHashesFactory } from './factories/DeleteGroupHashesRequestFa import { DeleteUserHashesFactory } from './factories/DeleteUserHashesRequestFactory'; import { SnodePool } from './snodePool'; import { DURATION } from '../../constants'; +import { timeoutWithAbort } from '../../utils/Promise'; export const ERROR_CODE_NO_CONNECT = 'ENETUNREACH: No network connection.'; @@ -183,12 +185,19 @@ const networkDeleteMessageOurSwarm = async ( async () => { const snodeToMakeRequestTo = await SnodePool.getNodeFromSwarmOrThrow(request.destination); - const ret = await BatchRequests.doUnsignedSnodeBatchRequestNoRetries( - [request], - snodeToMakeRequestTo, - 10 * DURATION.SECONDS, - request.destination, - false + const controller = new AbortController(); + const ret = await timeoutWithAbort( + BatchRequests.doUnsignedSnodeBatchRequestNoRetries( + [request], + snodeToMakeRequestTo, + 10 * DURATION.SECONDS, + request.destination, + false, + 'batch', + controller.signal + ), + 30 * DURATION.SECONDS, + controller ); if (!ret || !ret?.[0].body || ret[0].code !== 200) { @@ -331,13 +340,19 @@ const networkDeleteMessagesForGroup = async ( await pRetry( async () => { const snodeToMakeRequestTo = await SnodePool.getNodeFromSwarmOrThrow(request.destination); - - const ret = await BatchRequests.doUnsignedSnodeBatchRequestNoRetries( - [request], - snodeToMakeRequestTo, - 10 * DURATION.SECONDS, - request.destination, - false + const controller = new AbortController(); + const ret = await timeoutWithAbort( + BatchRequests.doUnsignedSnodeBatchRequestNoRetries( + [request], + snodeToMakeRequestTo, + 10 * DURATION.SECONDS, + request.destination, + false, + 'batch', + controller.signal + ), + 30 * DURATION.SECONDS, + controller ); if (!ret || !ret?.[0].body || ret[0].code !== 200) { diff --git a/ts/session/apis/snode_api/batchRequest.ts b/ts/session/apis/snode_api/batchRequest.ts index 5b287a870..8d123cf76 100644 --- a/ts/session/apis/snode_api/batchRequest.ts +++ b/ts/session/apis/snode_api/batchRequest.ts @@ -110,7 +110,8 @@ async function doUnsignedSnodeBatchRequestNoRetries( timeoutMs: number, associatedWith: string | null, allow401s: boolean, - method: MethodBatchType = 'batch' + method: MethodBatchType = 'batch', + abortSignal: MergedAbortSignal | null ): Promise { const signedSubRequests = await MessageSender.signSubRequests(unsignedSubRequests); return BatchRequests.doSnodeBatchRequestNoRetries( @@ -119,7 +120,7 @@ async function doUnsignedSnodeBatchRequestNoRetries( timeoutMs, associatedWith, allow401s, - undefined, + abortSignal || undefined, method ); } diff --git a/ts/session/apis/snode_api/expireRequest.ts b/ts/session/apis/snode_api/expireRequest.ts index 470f90dc8..26e2fafa7 100644 --- a/ts/session/apis/snode_api/expireRequest.ts +++ b/ts/session/apis/snode_api/expireRequest.ts @@ -152,7 +152,8 @@ async function updateExpiryOnNodesNoRetries( 10 * DURATION.SECONDS, ourPubKey, false, - 'batch' + 'batch', + null ); if (!result || result.length !== expireRequests.length) { diff --git a/ts/session/apis/snode_api/getExpiriesRequest.ts b/ts/session/apis/snode_api/getExpiriesRequest.ts index be573be39..2c11f0e61 100644 --- a/ts/session/apis/snode_api/getExpiriesRequest.ts +++ b/ts/session/apis/snode_api/getExpiriesRequest.ts @@ -53,7 +53,8 @@ async function getExpiriesFromNodesNoRetries( 10 * DURATION.SECONDS, associatedWith, false, - 'batch' + 'batch', + null ); if (!result || result.length !== 1) { diff --git a/ts/session/apis/snode_api/getServiceNodesList.ts b/ts/session/apis/snode_api/getServiceNodesList.ts index e6cf611a3..aa38cb116 100644 --- a/ts/session/apis/snode_api/getServiceNodesList.ts +++ b/ts/session/apis/snode_api/getServiceNodesList.ts @@ -20,7 +20,9 @@ async function getSnodePoolFromSnode(targetNode: Snode): Promise> { targetNode, 10 * DURATION.SECONDS, null, - false + false, + 'batch', + null ); const firstResult = results[0]; diff --git a/ts/session/apis/snode_api/getSwarmFor.ts b/ts/session/apis/snode_api/getSwarmFor.ts index 44be0638c..0c390cbbe 100644 --- a/ts/session/apis/snode_api/getSwarmFor.ts +++ b/ts/session/apis/snode_api/getSwarmFor.ts @@ -18,14 +18,16 @@ async function requestSnodesForPubkeyWithTargetNodeRetryable( if (!PubKey.is03Pubkey(pubkey) && !PubKey.is05Pubkey(pubkey)) { throw new Error('invalid pubkey given for swarmFor'); } - const subrequest = new SwarmForSubRequest(pubkey); + const subRequest = new SwarmForSubRequest(pubkey); const result = await BatchRequests.doUnsignedSnodeBatchRequestNoRetries( - [subrequest], + [subRequest], targetNode, 10 * DURATION.SECONDS, pubkey, - false + false, + 'batch', + null ); if (!result || !result.length) { diff --git a/ts/session/apis/snode_api/onsResolve.ts b/ts/session/apis/snode_api/onsResolve.ts index cc6a6adb5..0fc996a1d 100644 --- a/ts/session/apis/snode_api/onsResolve.ts +++ b/ts/session/apis/snode_api/onsResolve.ts @@ -41,7 +41,9 @@ async function getSessionIDForOnsName(onsNameCase: string) { targetNode, 10 * DURATION.SECONDS, null, - false + false, + 'batch', + null ); const firstResult = results[0]; if (!firstResult || firstResult.code !== 200 || !firstResult.body) { diff --git a/ts/session/apis/snode_api/retrieveRequest.ts b/ts/session/apis/snode_api/retrieveRequest.ts index 4c3b73fdc..a106e90a5 100644 --- a/ts/session/apis/snode_api/retrieveRequest.ts +++ b/ts/session/apis/snode_api/retrieveRequest.ts @@ -232,7 +232,9 @@ async function retrieveNextMessagesNoRetries( // yes this is a long timeout for just messages, but 4s timeouts way to often... 10 * DURATION.SECONDS, associatedWith, - allow401s + allow401s, + 'batch', + null ); try { if (!results || !isArray(results) || !results.length) { diff --git a/ts/session/conversations/ConversationController.ts b/ts/session/conversations/ConversationController.ts index f9f839215..181f656e7 100644 --- a/ts/session/conversations/ConversationController.ts +++ b/ts/session/conversations/ConversationController.ts @@ -3,6 +3,7 @@ import { ConvoVolatileType, GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs'; import { isEmpty, isNil } from 'lodash'; +import AbortController from 'abort-controller'; import { Data } from '../../data/data'; import { OpenGroupData } from '../../data/opengroups'; import { ConversationCollection, ConversationModel } from '../../models/conversation'; @@ -46,6 +47,8 @@ import { DisappearingMessages } from '../disappearing_messages'; import { StoreGroupRequestFactory } from '../apis/snode_api/factories/StoreGroupRequestFactory'; import { CONVERSATION_PRIORITIES, ConversationTypeEnum } from '../../models/types'; import { NetworkTime } from '../../util/NetworkTime'; +import { timeoutWithAbort } from '../utils/Promise'; +import { DURATION } from '../constants'; let instance: ConvoController | null; @@ -678,11 +681,17 @@ async function leaveClosedGroup(groupPk: PubkeyType | GroupPubkeyType, fromSyncM secretKey: group.secretKey, } ); - const results = await MessageSender.sendEncryptedDataToSnode({ - destination: groupPk, - sortedSubRequests: storeRequests, - method: 'sequence', - }); + const controller = new AbortController(); + const results = await timeoutWithAbort( + MessageSender.sendEncryptedDataToSnode({ + destination: groupPk, + sortedSubRequests: storeRequests, + method: 'sequence', + abortSignal: controller.signal, + }), + 2 * DURATION.MINUTES, + controller + ); if (results?.[0].code !== 200) { throw new Error( diff --git a/ts/session/sending/MessageSender.ts b/ts/session/sending/MessageSender.ts index 08d80e4a0..08ebcb5f9 100644 --- a/ts/session/sending/MessageSender.ts +++ b/ts/session/sending/MessageSender.ts @@ -56,6 +56,7 @@ import { EncryptAndWrapMessageResults, MessageWrapper } from './MessageWrapper'; import { stringify } from '../../types/sqlSharedTypes'; import { OpenGroupRequestCommonType } from '../../data/types'; import { NetworkTime } from '../../util/NetworkTime'; +import { MergedAbortSignal } from '../apis/snode_api/requestWith'; // ================ SNODE STORE ================ @@ -293,7 +294,9 @@ async function sendSingleMessage({ targetNode, 10 * DURATION.SECONDS, destination, - false + false, + 'sequence', + null ); await handleBatchResultWithSubRequests({ batchResult, subRequests, destination }); @@ -414,10 +417,12 @@ async function sendMessagesDataToSnode({ associatedWith, sortedSubRequests, method, + abortSignal, }: { sortedSubRequests: SortedSubRequestsType; associatedWith: T; method: MethodBatchType; + abortSignal: MergedAbortSignal | null; }): Promise { if (!associatedWith) { throw new Error('sendMessagesDataToSnode first sub request pubkey needs to be set'); @@ -442,7 +447,8 @@ async function sendMessagesDataToSnode({ 6000, associatedWith, false, - method + method, + abortSignal ); if (!responses || !responses.length) { @@ -500,10 +506,12 @@ async function sendEncryptedDataToSnode( destination, sortedSubRequests, method, + abortSignal, }: { sortedSubRequests: SortedSubRequestsType; // keeping those as an array because the order needs to be enforced for some (group keys for instance) destination: T; method: MethodBatchType; + abortSignal: MergedAbortSignal | null; }): Promise { try { const batchResults = await pRetry( @@ -512,6 +520,7 @@ async function sendEncryptedDataToSnode( sortedSubRequests, associatedWith: destination, method, + abortSignal, }); }, { diff --git a/ts/session/utils/Promise.ts b/ts/session/utils/Promise.ts index 5cc649ad3..ccb46997d 100644 --- a/ts/session/utils/Promise.ts +++ b/ts/session/utils/Promise.ts @@ -2,6 +2,7 @@ /* eslint-disable no-async-promise-executor */ /* eslint-disable @typescript-eslint/no-misused-promises */ +import AbortController from 'abort-controller'; import { Snode } from '../../data/types'; type SimpleFunction = (arg: T) => void; @@ -204,6 +205,26 @@ export async function timeout(promise: Promise, timeoutMs: number): Promis return Promise.race([timeoutPromise, promise]); } +/** + * Similar to timeout, but will also call controller.abort() when we timeout. + * This can be used to make a request and if it takes longer than X ms, abort it and return the current aborted promise. + */ +export async function timeoutWithAbort( + promise: Promise, + timeoutMs: number, + controller: AbortController +): Promise { + const timeoutPromise = new Promise((_, rej) => { + const wait = setTimeout(() => { + clearTimeout(wait); + controller.abort(); + rej(new TaskTimedOutError()); + }, timeoutMs); + }); + + return Promise.race([timeoutPromise, promise]); +} + export const sleepFor = async (ms: number, showLog = false) => { if (showLog) { // eslint-disable-next-line no-console diff --git a/ts/session/utils/job_runners/jobs/GroupPendingRemovalsJob.ts b/ts/session/utils/job_runners/jobs/GroupPendingRemovalsJob.ts index 9a4ec1492..f243fb0b1 100644 --- a/ts/session/utils/job_runners/jobs/GroupPendingRemovalsJob.ts +++ b/ts/session/utils/job_runners/jobs/GroupPendingRemovalsJob.ts @@ -2,6 +2,7 @@ import { WithGroupPubkey } from 'libsession_util_nodejs'; import { compact, isEmpty, isNumber } from 'lodash'; import { v4 } from 'uuid'; +import AbortController from 'abort-controller'; import { StringUtils } from '../..'; import { Data } from '../../../../data/data'; import { deleteMessagesFromSwarmOnly } from '../../../../interactions/conversations/unsendingInteractions'; @@ -37,6 +38,7 @@ import { } from '../../../types/with'; import { groupInfoActions } from '../../../../state/ducks/metaGroups'; import { DURATION } from '../../../constants'; +import { timeoutWithAbort } from '../../Promise'; const defaultMsBetweenRetries = 10000; const defaultMaxAttempts = 1; @@ -186,11 +188,18 @@ class GroupPendingRemovalsJob extends PersistedJob