fix: allow to abort a request globally when a request takes too long

pull/3281/head
Audric Ackermann 10 months ago
parent d5ecc100e8
commit b129c409a7
No known key found for this signature in database

@ -1,5 +1,6 @@
import { isEmpty, isNil, uniq } from 'lodash';
import { PubkeyType, WithGroupPubkey } from 'libsession_util_nodejs';
import AbortController from 'abort-controller';
import {
ConversationNotificationSettingType,
READ_MESSAGE_STATE,
@ -20,7 +21,7 @@ import { DecryptedAttachmentsManager } from '../session/crypto/DecryptedAttachme
import { DisappearingMessageConversationModeType } from '../session/disappearing_messages/types';
import { PubKey } from '../session/types';
import { perfEnd, perfStart } from '../session/utils/Performance';
import { sleepFor } from '../session/utils/Promise';
import { sleepFor, timeoutWithAbort } from '../session/utils/Promise';
import { ed25519Str, fromHexToArray, toHex } from '../session/utils/String';
import { UserSync } from '../session/utils/job_runners/jobs/UserSyncJob';
import { SessionUtilContact } from '../session/utils/libsession/libsession_utils_contacts';
@ -59,6 +60,7 @@ import { GroupUpdateMessageFactory } from '../session/messages/message_factory/g
import { GroupPromote } from '../session/utils/job_runners/jobs/GroupPromoteJob';
import { MessageSender } from '../session/sending';
import { StoreGroupRequestFactory } from '../session/apis/snode_api/factories/StoreGroupRequestFactory';
import { DURATION } from '../session/constants';
export async function copyPublicKeyByConvoId(convoId: string) {
if (OpenGroupUtils.isOpenGroupV2(convoId)) {
@ -1037,11 +1039,17 @@ export async function promoteUsersInGroup({
groupInWrapper
);
const result = await MessageSender.sendEncryptedDataToSnode({
destination: groupPk,
method: 'batch',
sortedSubRequests: storeRequests,
});
const controller = new AbortController();
const result = await timeoutWithAbort(
MessageSender.sendEncryptedDataToSnode({
destination: groupPk,
method: 'batch',
sortedSubRequests: storeRequests,
abortSignal: controller.signal,
}),
2 * DURATION.MINUTES,
controller
);
if (result?.[0].code !== 200) {
window.log.warn('promoteUsersInGroup: failed to store change');

@ -3,6 +3,7 @@
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { compact, isEmpty } from 'lodash';
import pRetry from 'p-retry';
import AbortController from 'abort-controller';
import { UserGroupsWrapperActions } from '../../../webworker/workers/browser/libsession_worker_interface';
import { getSodiumRenderer } from '../../crypto';
import { PubKey } from '../../types';
@ -14,6 +15,7 @@ import { DeleteGroupHashesFactory } from './factories/DeleteGroupHashesRequestFa
import { DeleteUserHashesFactory } from './factories/DeleteUserHashesRequestFactory';
import { SnodePool } from './snodePool';
import { DURATION } from '../../constants';
import { timeoutWithAbort } from '../../utils/Promise';
export const ERROR_CODE_NO_CONNECT = 'ENETUNREACH: No network connection.';
@ -183,12 +185,19 @@ const networkDeleteMessageOurSwarm = async (
async () => {
const snodeToMakeRequestTo = await SnodePool.getNodeFromSwarmOrThrow(request.destination);
const ret = await BatchRequests.doUnsignedSnodeBatchRequestNoRetries(
[request],
snodeToMakeRequestTo,
10 * DURATION.SECONDS,
request.destination,
false
const controller = new AbortController();
const ret = await timeoutWithAbort(
BatchRequests.doUnsignedSnodeBatchRequestNoRetries(
[request],
snodeToMakeRequestTo,
10 * DURATION.SECONDS,
request.destination,
false,
'batch',
controller.signal
),
30 * DURATION.SECONDS,
controller
);
if (!ret || !ret?.[0].body || ret[0].code !== 200) {
@ -331,13 +340,19 @@ const networkDeleteMessagesForGroup = async (
await pRetry(
async () => {
const snodeToMakeRequestTo = await SnodePool.getNodeFromSwarmOrThrow(request.destination);
const ret = await BatchRequests.doUnsignedSnodeBatchRequestNoRetries(
[request],
snodeToMakeRequestTo,
10 * DURATION.SECONDS,
request.destination,
false
const controller = new AbortController();
const ret = await timeoutWithAbort(
BatchRequests.doUnsignedSnodeBatchRequestNoRetries(
[request],
snodeToMakeRequestTo,
10 * DURATION.SECONDS,
request.destination,
false,
'batch',
controller.signal
),
30 * DURATION.SECONDS,
controller
);
if (!ret || !ret?.[0].body || ret[0].code !== 200) {

@ -110,7 +110,8 @@ async function doUnsignedSnodeBatchRequestNoRetries(
timeoutMs: number,
associatedWith: string | null,
allow401s: boolean,
method: MethodBatchType = 'batch'
method: MethodBatchType = 'batch',
abortSignal: MergedAbortSignal | null
): Promise<NotEmptyArrayOfBatchResults> {
const signedSubRequests = await MessageSender.signSubRequests(unsignedSubRequests);
return BatchRequests.doSnodeBatchRequestNoRetries(
@ -119,7 +120,7 @@ async function doUnsignedSnodeBatchRequestNoRetries(
timeoutMs,
associatedWith,
allow401s,
undefined,
abortSignal || undefined,
method
);
}

@ -152,7 +152,8 @@ async function updateExpiryOnNodesNoRetries(
10 * DURATION.SECONDS,
ourPubKey,
false,
'batch'
'batch',
null
);
if (!result || result.length !== expireRequests.length) {

@ -53,7 +53,8 @@ async function getExpiriesFromNodesNoRetries(
10 * DURATION.SECONDS,
associatedWith,
false,
'batch'
'batch',
null
);
if (!result || result.length !== 1) {

@ -20,7 +20,9 @@ async function getSnodePoolFromSnode(targetNode: Snode): Promise<Array<Snode>> {
targetNode,
10 * DURATION.SECONDS,
null,
false
false,
'batch',
null
);
const firstResult = results[0];

@ -18,14 +18,16 @@ async function requestSnodesForPubkeyWithTargetNodeRetryable(
if (!PubKey.is03Pubkey(pubkey) && !PubKey.is05Pubkey(pubkey)) {
throw new Error('invalid pubkey given for swarmFor');
}
const subrequest = new SwarmForSubRequest(pubkey);
const subRequest = new SwarmForSubRequest(pubkey);
const result = await BatchRequests.doUnsignedSnodeBatchRequestNoRetries(
[subrequest],
[subRequest],
targetNode,
10 * DURATION.SECONDS,
pubkey,
false
false,
'batch',
null
);
if (!result || !result.length) {

@ -41,7 +41,9 @@ async function getSessionIDForOnsName(onsNameCase: string) {
targetNode,
10 * DURATION.SECONDS,
null,
false
false,
'batch',
null
);
const firstResult = results[0];
if (!firstResult || firstResult.code !== 200 || !firstResult.body) {

@ -232,7 +232,9 @@ async function retrieveNextMessagesNoRetries(
// yes this is a long timeout for just messages, but 4s timeouts way to often...
10 * DURATION.SECONDS,
associatedWith,
allow401s
allow401s,
'batch',
null
);
try {
if (!results || !isArray(results) || !results.length) {

@ -3,6 +3,7 @@
import { ConvoVolatileType, GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { isEmpty, isNil } from 'lodash';
import AbortController from 'abort-controller';
import { Data } from '../../data/data';
import { OpenGroupData } from '../../data/opengroups';
import { ConversationCollection, ConversationModel } from '../../models/conversation';
@ -46,6 +47,8 @@ import { DisappearingMessages } from '../disappearing_messages';
import { StoreGroupRequestFactory } from '../apis/snode_api/factories/StoreGroupRequestFactory';
import { CONVERSATION_PRIORITIES, ConversationTypeEnum } from '../../models/types';
import { NetworkTime } from '../../util/NetworkTime';
import { timeoutWithAbort } from '../utils/Promise';
import { DURATION } from '../constants';
let instance: ConvoController | null;
@ -678,11 +681,17 @@ async function leaveClosedGroup(groupPk: PubkeyType | GroupPubkeyType, fromSyncM
secretKey: group.secretKey,
}
);
const results = await MessageSender.sendEncryptedDataToSnode({
destination: groupPk,
sortedSubRequests: storeRequests,
method: 'sequence',
});
const controller = new AbortController();
const results = await timeoutWithAbort(
MessageSender.sendEncryptedDataToSnode({
destination: groupPk,
sortedSubRequests: storeRequests,
method: 'sequence',
abortSignal: controller.signal,
}),
2 * DURATION.MINUTES,
controller
);
if (results?.[0].code !== 200) {
throw new Error(

@ -56,6 +56,7 @@ import { EncryptAndWrapMessageResults, MessageWrapper } from './MessageWrapper';
import { stringify } from '../../types/sqlSharedTypes';
import { OpenGroupRequestCommonType } from '../../data/types';
import { NetworkTime } from '../../util/NetworkTime';
import { MergedAbortSignal } from '../apis/snode_api/requestWith';
// ================ SNODE STORE ================
@ -293,7 +294,9 @@ async function sendSingleMessage({
targetNode,
10 * DURATION.SECONDS,
destination,
false
false,
'sequence',
null
);
await handleBatchResultWithSubRequests({ batchResult, subRequests, destination });
@ -414,10 +417,12 @@ async function sendMessagesDataToSnode<T extends PubkeyType | GroupPubkeyType>({
associatedWith,
sortedSubRequests,
method,
abortSignal,
}: {
sortedSubRequests: SortedSubRequestsType<T>;
associatedWith: T;
method: MethodBatchType;
abortSignal: MergedAbortSignal | null;
}): Promise<NotEmptyArrayOfBatchResults> {
if (!associatedWith) {
throw new Error('sendMessagesDataToSnode first sub request pubkey needs to be set');
@ -442,7 +447,8 @@ async function sendMessagesDataToSnode<T extends PubkeyType | GroupPubkeyType>({
6000,
associatedWith,
false,
method
method,
abortSignal
);
if (!responses || !responses.length) {
@ -500,10 +506,12 @@ async function sendEncryptedDataToSnode<T extends GroupPubkeyType | PubkeyType>(
destination,
sortedSubRequests,
method,
abortSignal,
}: {
sortedSubRequests: SortedSubRequestsType<T>; // keeping those as an array because the order needs to be enforced for some (group keys for instance)
destination: T;
method: MethodBatchType;
abortSignal: MergedAbortSignal | null;
}): Promise<NotEmptyArrayOfBatchResults | null> {
try {
const batchResults = await pRetry(
@ -512,6 +520,7 @@ async function sendEncryptedDataToSnode<T extends GroupPubkeyType | PubkeyType>(
sortedSubRequests,
associatedWith: destination,
method,
abortSignal,
});
},
{

@ -2,6 +2,7 @@
/* eslint-disable no-async-promise-executor */
/* eslint-disable @typescript-eslint/no-misused-promises */
import AbortController from 'abort-controller';
import { Snode } from '../../data/types';
type SimpleFunction<T> = (arg: T) => void;
@ -204,6 +205,26 @@ export async function timeout<T>(promise: Promise<T>, timeoutMs: number): Promis
return Promise.race([timeoutPromise, promise]);
}
/**
* Similar to timeout<T>, but will also call controller.abort() when we timeout.
* This can be used to make a request and if it takes longer than X ms, abort it and return the current aborted promise.
*/
export async function timeoutWithAbort<T>(
promise: Promise<T>,
timeoutMs: number,
controller: AbortController
): Promise<T> {
const timeoutPromise = new Promise<T>((_, rej) => {
const wait = setTimeout(() => {
clearTimeout(wait);
controller.abort();
rej(new TaskTimedOutError());
}, timeoutMs);
});
return Promise.race([timeoutPromise, promise]);
}
export const sleepFor = async (ms: number, showLog = false) => {
if (showLog) {
// eslint-disable-next-line no-console

@ -2,6 +2,7 @@
import { WithGroupPubkey } from 'libsession_util_nodejs';
import { compact, isEmpty, isNumber } from 'lodash';
import { v4 } from 'uuid';
import AbortController from 'abort-controller';
import { StringUtils } from '../..';
import { Data } from '../../../../data/data';
import { deleteMessagesFromSwarmOnly } from '../../../../interactions/conversations/unsendingInteractions';
@ -37,6 +38,7 @@ import {
} from '../../../types/with';
import { groupInfoActions } from '../../../../state/ducks/metaGroups';
import { DURATION } from '../../../constants';
import { timeoutWithAbort } from '../../Promise';
const defaultMsBetweenRetries = 10000;
const defaultMaxAttempts = 1;
@ -186,11 +188,18 @@ class GroupPendingRemovalsJob extends PersistedJob<GroupPendingRemovalsPersisted
}
const sortedSubRequests = compact([multiEncryptRequest, ...revokeRequests, ...storeRequests]);
const result = await MessageSender.sendEncryptedDataToSnode({
sortedSubRequests,
destination: groupPk,
method: 'sequence',
});
const controller = new AbortController();
const result = await timeoutWithAbort(
MessageSender.sendEncryptedDataToSnode({
sortedSubRequests,
destination: groupPk,
method: 'sequence',
abortSignal: controller.signal,
}),
2 * DURATION.MINUTES,
controller
);
if (
!result ||

@ -2,6 +2,7 @@
import { GroupPubkeyType, WithGroupPubkey } from 'libsession_util_nodejs';
import { to_hex } from 'libsodium-wrappers-sumo';
import { compact, isArray, isEmpty, isNumber } from 'lodash';
import AbortController from 'abort-controller';
import { UserUtils } from '../..';
import { assertUnreachable } from '../../../../types/sqlSharedTypes';
import { isSignInByLinking } from '../../../../util/storage';
@ -24,7 +25,7 @@ import { WithRevokeSubRequest } from '../../../apis/snode_api/types';
import { ConvoHub } from '../../../conversations';
import { MessageSender } from '../../../sending/MessageSender';
import { PubKey } from '../../../types';
import { allowOnlyOneAtATime } from '../../Promise';
import { allowOnlyOneAtATime, timeoutWithAbort } from '../../Promise';
import { ed25519Str } from '../../String';
import { GroupSuccessfulChange, LibSessionUtil } from '../../libsession/libsession_utils';
import { runners } from '../JobRunner';
@ -34,6 +35,7 @@ import {
PersistedJob,
RunJobResult,
} from '../PersistedJob';
import { DURATION } from '../../../constants';
const defaultMsBetweenRetries = 15000; // a long time between retries, to avoid running multiple jobs at the same time, when one was postponed at the same time as one already planned (5s)
const defaultMaxAttempts = 2;
@ -156,14 +158,21 @@ async function pushChangesToGroupSwarmIfNeeded({
...extraRequests,
]);
const result = await MessageSender.sendEncryptedDataToSnode({
// Note: this is on purpose that supplementalKeysSubRequest is before pendingConfigRequests.
// This is to avoid a race condition where a device is polling while we
// are posting the configs (already encrypted with the new keys)
sortedSubRequests,
destination: groupPk,
method: 'sequence',
});
const controller = new AbortController();
const result = await timeoutWithAbort(
MessageSender.sendEncryptedDataToSnode({
// Note: this is on purpose that supplementalKeysSubRequest is before pendingConfigRequests.
// This is to avoid a race condition where a device is polling while we
// are posting the configs (already encrypted with the new keys)
sortedSubRequests,
destination: groupPk,
method: 'sequence',
abortSignal: controller.signal,
}),
2 * DURATION.MINUTES,
controller
);
const expectedReplyLength =
(supplementalKeysSubRequest ? 1 : 0) + // we are sending all the supplemental keys as a single sub request

@ -3,6 +3,7 @@ import { PubkeyType } from 'libsession_util_nodejs';
import { compact, isArray, isEmpty, isNumber, isString } from 'lodash';
import { v4 } from 'uuid';
import { to_hex } from 'libsodium-wrappers-sumo';
import AbortController from 'abort-controller';
import { UserUtils } from '../..';
import { ConfigDumpData } from '../../../../data/configDump/configDump';
import { UserSyncJobDone } from '../../../../shims/events';
@ -15,7 +16,7 @@ import {
import { DURATION, TTL_DEFAULT } from '../../../constants';
import { ConvoHub } from '../../../conversations';
import { MessageSender } from '../../../sending/MessageSender';
import { allowOnlyOneAtATime } from '../../Promise';
import { allowOnlyOneAtATime, timeoutWithAbort } from '../../Promise';
import { LibSessionUtil, UserSuccessfulChange } from '../../libsession/libsession_utils';
import { runners } from '../JobRunner';
import {
@ -115,11 +116,18 @@ async function pushChangesToUserSwarmIfNeeded() {
})
: undefined;
const result = await MessageSender.sendEncryptedDataToSnode({
sortedSubRequests: compact([...storeRequests, deleteHashesSubRequest]),
destination: us,
method: 'sequence',
});
const controller = new AbortController();
const result = await timeoutWithAbort(
MessageSender.sendEncryptedDataToSnode({
sortedSubRequests: compact([...storeRequests, deleteHashesSubRequest]),
destination: us,
method: 'sequence',
abortSignal: controller.signal,
}),
2 * DURATION.MINUTES,
controller
);
const expectedReplyLength =
changesToPush.messages.length + (changesToPush.allOldHashes.size ? 1 : 0);

Loading…
Cancel
Save