From c623e2e49e71b2b9c7e2b4272571ef41764ee111 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Tue, 28 Mar 2023 17:00:05 +1100 Subject: [PATCH] fix: make the ConfSyncJob fetch be per destination --- ts/session/sending/MessageSender.ts | 10 +- .../job_runners/jobs/ConfigurationSyncJob.ts | 146 ++++++++---------- .../utils/libsession/libsession_utils.ts | 2 - 3 files changed, 68 insertions(+), 90 deletions(-) diff --git a/ts/session/sending/MessageSender.ts b/ts/session/sending/MessageSender.ts index 372484749..0830d5ca9 100644 --- a/ts/session/sending/MessageSender.ts +++ b/ts/session/sending/MessageSender.ts @@ -183,7 +183,7 @@ async function send( async function sendMessagesDataToSnode( params: Array, destination: string, - oldMessageHashes: Set | null + messagesHashesToDelete: Set | null ): Promise { const rightDestination = params.filter(m => m.pubkey === destination); const swarm = await getSwarmFor(destination); @@ -215,10 +215,10 @@ async function sendMessagesDataToSnode( ); const signedDeleteOldHashesRequest = - oldMessageHashes && oldMessageHashes.size + messagesHashesToDelete && messagesHashesToDelete.size ? await SnodeSignature.getSnodeSignatureByHashesParams({ method: 'delete' as const, - messages: [...oldMessageHashes], + messages: [...messagesHashesToDelete], pubkey: destination, }) : null; @@ -354,7 +354,7 @@ async function encryptMessagesAndWrap( async function sendMessagesToSnode( params: Array, destination: string, - oldMessageHashes: Set | null + messagesHashesToDelete: Set | null ): Promise { try { const recipient = PubKey.cast(destination); @@ -397,7 +397,7 @@ async function sendMessagesToSnode( namespace: wrapped.namespace, })), recipient.key, - oldMessageHashes + messagesHashesToDelete ); }, { diff --git a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts index 48d1f6494..c107829a3 100644 --- a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts +++ b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts @@ -1,4 +1,4 @@ -import { compact, groupBy, isArray, isEmpty, isNumber, isString } from 'lodash'; +import { compact, isArray, isEmpty, isNumber, isString } from 'lodash'; import { v4 } from 'uuid'; import { UserUtils } from '../..'; import { ConfigDumpData } from '../../../../data/configDump/configDump'; @@ -27,14 +27,12 @@ const defaultMaxAttempts = 3; let lastRunConfigSyncJobTimestamp: number | null = null; export type SingleDestinationChanges = { - destination: string; messages: Array; allOldHashes: Array; }; type SuccessfulChange = { message: SharedConfigMessage; - publicKey: string; updatedHash: string; }; @@ -42,33 +40,22 @@ type SuccessfulChange = { * Later in the syncing logic, we want to batch-send all the updates for a pubkey in a single batch call. * To make this easier, this function prebuilds and merges together all the changes for each pubkey. */ -async function retrieveSingleDestinationChanges(): Promise> { - const outgoingConfResults = await LibSessionUtil.pendingChangesForPubkey( - UserUtils.getOurPubKeyStrFromCache() - ); +async function retrieveSingleDestinationChanges( + destination: string +): Promise { + const outgoingConfResults = await LibSessionUtil.pendingChangesForPubkey(destination); - const groupedByDestination = groupBy(outgoingConfResults, m => m.destination); + const compactedHashes = compact(outgoingConfResults.map(m => m.oldMessageHashes)).flat(); - const singleDestChanges: Array = Object.keys(groupedByDestination).map( - destination => { - const messages = groupedByDestination[destination]; - // the delete hashes sub request can be done accross namespaces, so we can do a single one of it with all the hashes to remove (per pubkey) - const hashes = compact(messages.map(m => m.oldMessageHashes)).flat(); - - return { allOldHashes: hashes, destination, messages }; - } - ); - - return singleDestChanges; + return { messages: outgoingConfResults, allOldHashes: compactedHashes }; } /** * This function is run once we get the results from the multiple batch-send. - * For each results, it checks wha */ function resultsToSuccessfulChange( - allResults: Array>, - requests: Array + result: NotEmptyArrayOfBatchResults | null, + request: SingleDestinationChanges ): Array { const successfulChanges: Array = []; @@ -82,58 +69,44 @@ function resultsToSuccessfulChange( */ try { - for (let i = 0; i < allResults.length; i++) { - const result = allResults[i]; - - // the batch send was rejected. Let's skip handling those results altogether. Another job will handle the retry logic. - if (result.status !== 'fulfilled') { - continue; - } - - const resultValue = result.value; - if (!resultValue) { - continue; - } - - const request = requests?.[i]; - if (!result) { - continue; - } + if (!result?.length) { + return successfulChanges; + } - for (let j = 0; j < resultValue.length; j++) { - const batchResult = resultValue[j]; - const messagePostedHashes = batchResult?.body?.hash; - - if ( - batchResult.code === 200 && - isString(messagePostedHashes) && - request.messages?.[j].message && - request.destination - ) { - // the library keeps track of the hashes to push and pushed using the hashes now - successfulChanges.push({ - publicKey: request.destination, - updatedHash: messagePostedHashes, - message: request.messages?.[j].message, - }); - } + for (let j = 0; j < result.length; j++) { + const batchResult = result[j]; + const messagePostedHashes = batchResult?.body?.hash; + + if ( + batchResult.code === 200 && + isString(messagePostedHashes) && + request.messages?.[j].message + ) { + // the library keeps track of the hashes to push and pushed using the hashes now + successfulChanges.push({ + updatedHash: messagePostedHashes, + message: request.messages?.[j].message, + }); } } + + return successfulChanges; } catch (e) { throw e; } - - return successfulChanges; } -async function buildAndSaveDumpsToDB(changes: Array): Promise { +async function buildAndSaveDumpsToDB( + changes: Array, + destination: string +): Promise { for (let i = 0; i < changes.length; i++) { const change = changes[i]; const variant = LibSessionUtil.kindToVariant(change.message.kind); const needsDump = await LibSessionUtil.markAsPushed( variant, - change.publicKey, + destination, change.message.seqno.toNumber(), change.updatedHash ); @@ -144,7 +117,7 @@ async function buildAndSaveDumpsToDB(changes: Array): Promise< const dump = await GenericWrapperActions.dump(variant); await ConfigDumpData.saveConfigDump({ data: dump, - publicKey: change.publicKey, + publicKey: destination, variant, }); } @@ -207,44 +180,51 @@ class ConfigurationSyncJob extends PersistedJob } } - const singleDestChanges = await retrieveSingleDestinationChanges(); + // TODO add a way to have a few configuration sync jobs running at the same time, but only a single one per pubkey + const thisJobDestination = us; + + const singleDestChanges = await retrieveSingleDestinationChanges(thisJobDestination); // If there are no pending changes then the job can just complete (next time something // is updated we want to try and run immediately so don't scuedule another run in this case) - - if (isEmpty(singleDestChanges)) { + if (isEmpty(singleDestChanges?.messages)) { return RunJobResult.Success; } - - const allResults = await Promise.allSettled( - singleDestChanges.map(async dest => { - const msgs = dest.messages.map(item => { - return { - namespace: item.namespace, - pubkey: item.destination, - timestamp: item.message.timestamp, - ttl: item.message.ttl(), - message: item.message, - }; - }); - const asSet = new Set(dest.allOldHashes); - return MessageSender.sendMessagesToSnode(msgs, dest.destination, asSet); - }) + const oldHashesToDelete = new Set(singleDestChanges.allOldHashes); + const msgs = singleDestChanges.messages.map(item => { + return { + namespace: item.namespace, + pubkey: thisJobDestination, + timestamp: item.message.timestamp, + ttl: item.message.ttl(), + message: item.message, + }; + }); + + const result = await MessageSender.sendMessagesToSnode( + msgs, + thisJobDestination, + oldHashesToDelete ); + const expectedReplyLength = + singleDestChanges.messages.length + (oldHashesToDelete.size ? 1 : 0); // we do a sequence call here. If we do not have the right expected number of results, consider it a failure - if (!isArray(allResults) || allResults.length !== singleDestChanges.length) { + if (!isArray(result) || result.length !== expectedReplyLength) { + window.log.info( + `ConfigurationSyncJob: unexpected result length: expected ${expectedReplyLength} but got ${result?.length}` + ); return RunJobResult.RetryJobIfPossible; } - const changes = resultsToSuccessfulChange(allResults, singleDestChanges); + const changes = resultsToSuccessfulChange(result, singleDestChanges); if (isEmpty(changes)) { return RunJobResult.RetryJobIfPossible; } // Now that we have the successful changes, we need to mark them as pushed and // generate any config dumps which need to be stored - await buildAndSaveDumpsToDB(changes); + await buildAndSaveDumpsToDB(changes, thisJobDestination); return RunJobResult.Success; } catch (e) { throw e; @@ -300,7 +280,7 @@ async function queueNewJobIfNeeded() { ); window.log.debug('Scheduling ConfSyncJob: ASAP'); } else { - // if we did run at t=100, and it is currently t=110, diff is 10 + // if we did run at t=100, and it is currently t=110, the difference is 10 const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0); // but we want to run every 30, so what we need is actually `30-10` from now = 20 const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 0); diff --git a/ts/session/utils/libsession/libsession_utils.ts b/ts/session/utils/libsession/libsession_utils.ts index 3d56f9212..8244652d8 100644 --- a/ts/session/utils/libsession/libsession_utils.ts +++ b/ts/session/utils/libsession/libsession_utils.ts @@ -35,7 +35,6 @@ export type IncomingConfResult = { export type OutgoingConfResult = { message: SharedConfigMessage; namespace: SnodeNamespaces; - destination: string; oldMessageHashes: Array; }; @@ -121,7 +120,6 @@ async function pendingChangesForPubkey(pubkey: string): Promise