From f86b3689ba8fd8ff1b6096dab8e93a95af184126 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Tue, 12 Sep 2023 15:13:05 +1000 Subject: [PATCH] fix: add GroupSyncJob to push changes for group --- ts/mains/main_renderer.tsx | 2 + ts/session/apis/snode_api/namespaces.ts | 23 +++--- ts/session/apis/snode_api/swarmPolling.ts | 2 +- ts/session/sending/MessageSender.ts | 15 ++-- .../utils/job_runners/jobs/GroupConfigJob.ts | 79 +++++++------------ .../utils/libsession/libsession_utils.ts | 68 +++++++++++----- ts/state/ducks/groupInfos.ts | 3 + 7 files changed, 106 insertions(+), 86 deletions(-) diff --git a/ts/mains/main_renderer.tsx b/ts/mains/main_renderer.tsx index ba2a7b516..03b15a90a 100644 --- a/ts/mains/main_renderer.tsx +++ b/ts/mains/main_renderer.tsx @@ -116,6 +116,8 @@ async function startJobRunners() { runners.avatarDownloadRunner.startProcessing(); await runners.configurationSyncRunner.loadJobsFromDb(); runners.configurationSyncRunner.startProcessing(); + await runners.groupSyncRunner.loadJobsFromDb(); + runners.groupSyncRunner.startProcessing(); } // We need this 'first' check because we don't want to start the app up any other time diff --git a/ts/session/apis/snode_api/namespaces.ts b/ts/session/apis/snode_api/namespaces.ts index db099c1cd..1c9ac770e 100644 --- a/ts/session/apis/snode_api/namespaces.ts +++ b/ts/session/apis/snode_api/namespaces.ts @@ -34,17 +34,22 @@ export enum SnodeNamespaces { /** * This is the namespace used to sync the closed group details for each closed group */ - ClosedGroupInfo = 11, + ClosedGroupMessages = 11, + + /** + * This is the namespace used to sync the closed group details for each closed group + */ + ClosedGroupKeys = 12, /** * This is the namespace used to sync the members for each closed group */ - ClosedGroupMembers = 12, + ClosedGroupInfo = 13, /** * This is the namespace used to sync the keys for each closed group */ - ClosedGroupKeys = 13, + ClosedGroupMembers = 14, } export type SnodeNamespacesGroup = PickEnum< @@ -78,6 +83,7 @@ function isUserConfigNamespace(namespace: SnodeNamespaces) { case SnodeNamespaces.ClosedGroupInfo: case SnodeNamespaces.ClosedGroupKeys: case SnodeNamespaces.ClosedGroupMembers: + case SnodeNamespaces.ClosedGroupMessages: case SnodeNamespaces.LegacyClosedGroup: return false; @@ -99,6 +105,7 @@ function isGroupConfigNamespace(namespace: SnodeNamespaces) { case SnodeNamespaces.UserGroups: case SnodeNamespaces.ConvoInfoVolatile: case SnodeNamespaces.LegacyClosedGroup: + case SnodeNamespaces.ClosedGroupMessages: return false; case SnodeNamespaces.ClosedGroupInfo: case SnodeNamespaces.ClosedGroupKeys: @@ -119,20 +126,18 @@ function isGroupConfigNamespace(namespace: SnodeNamespaces) { function namespacePriority(namespace: SnodeNamespaces): number { switch (namespace) { case SnodeNamespaces.Default: + case SnodeNamespaces.ClosedGroupMessages: return 10; - case SnodeNamespaces.UserContacts: - return 1; - case SnodeNamespaces.UserProfile: - return 1; case SnodeNamespaces.UserGroups: - return 1; case SnodeNamespaces.ConvoInfoVolatile: + case SnodeNamespaces.UserProfile: + case SnodeNamespaces.UserContacts: return 1; case SnodeNamespaces.LegacyClosedGroup: case SnodeNamespaces.ClosedGroupInfo: case SnodeNamespaces.ClosedGroupMembers: case SnodeNamespaces.ClosedGroupKeys: - return 10; + return 1; default: try { diff --git a/ts/session/apis/snode_api/swarmPolling.ts b/ts/session/apis/snode_api/swarmPolling.ts index 16b1214a0..8cb7b46e9 100644 --- a/ts/session/apis/snode_api/swarmPolling.ts +++ b/ts/session/apis/snode_api/swarmPolling.ts @@ -189,9 +189,9 @@ export class SwarmPolling { if (isV3) { return this.pollOnceForKey(key, ConversationTypeEnum.GROUPV3, [ SnodeNamespaces.Default, - SnodeNamespaces.ClosedGroupKeys, SnodeNamespaces.ClosedGroupInfo, SnodeNamespaces.ClosedGroupMembers, + SnodeNamespaces.ClosedGroupKeys, // keys are fetched last to avoid race conditions when someone deposits them ]); } return this.pollOnceForKey(key, ConversationTypeEnum.GROUP, [ diff --git a/ts/session/sending/MessageSender.ts b/ts/session/sending/MessageSender.ts index abeb5a4e8..1f4cbed11 100644 --- a/ts/session/sending/MessageSender.ts +++ b/ts/session/sending/MessageSender.ts @@ -2,6 +2,7 @@ import { AbortController } from 'abort-controller'; import ByteBuffer from 'bytebuffer'; +import { GroupPubkeyType } from 'libsession_util_nodejs'; import _, { isEmpty, isNil, isString, sample, toNumber } from 'lodash'; import pRetry from 'p-retry'; import { Data } from '../../data/data'; @@ -12,9 +13,6 @@ import { sendMessageOnionV4BlindedRequest, sendSogsMessageOnionV4, } from '../apis/open_group_api/sogsv3/sogsV3SendMessage'; -import { GetNetworkTime } from '../apis/snode_api/getNetworkTime'; -import { SnodeNamespace, SnodeNamespaces } from '../apis/snode_api/namespaces'; -import { getSwarmFor } from '../apis/snode_api/snodePool'; import { NotEmptyArrayOfBatchResults, StoreOnNodeData, @@ -22,6 +20,9 @@ import { StoreOnNodeParams, StoreOnNodeParamsNoSig, } from '../apis/snode_api/SnodeRequestTypes'; +import { GetNetworkTime } from '../apis/snode_api/getNetworkTime'; +import { SnodeNamespace, SnodeNamespaces } from '../apis/snode_api/namespaces'; +import { getSwarmFor } from '../apis/snode_api/snodePool'; import { SnodeSignature, SnodeSignatureResult } from '../apis/snode_api/snodeSignatures'; import { SnodeAPIStore } from '../apis/snode_api/storeMessage'; import { getConversationController } from '../conversations'; @@ -29,17 +30,15 @@ import { MessageEncrypter } from '../crypto'; import { addMessagePadding } from '../crypto/BufferPadding'; import { ContentMessage } from '../messages/outgoing'; import { ConfigurationMessage } from '../messages/outgoing/controlMessage/ConfigurationMessage'; -import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage'; import { SharedConfigMessage } from '../messages/outgoing/controlMessage/SharedConfigMessage'; import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage'; +import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage'; import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage'; import { ed25519Str } from '../onions/onionPath'; import { PubKey } from '../types'; import { RawMessage } from '../types/RawMessage'; -import { EmptySwarmError } from '../utils/errors'; import { fromUInt8ArrayToBase64 } from '../utils/String'; -import { GroupPubkeyType } from 'libsession_util_nodejs'; -import { to_base64 } from 'libsodium-wrappers-sumo'; +import { EmptySwarmError } from '../utils/errors'; // ================ SNODE STORE ================ @@ -468,7 +467,7 @@ async function sendEncryptedDataToSnode( return MessageSender.sendMessagesDataToSnode( encryptedData.map(content => ({ pubkey: destination, - data64: to_base64(content.data), + data64: ByteBuffer.wrap(content.data).toString('base64'), ttl: content.ttl, timestamp: content.networkTimestamp, namespace: content.namespace, diff --git a/ts/session/utils/job_runners/jobs/GroupConfigJob.ts b/ts/session/utils/job_runners/jobs/GroupConfigJob.ts index 5959fa51e..3eab28c95 100644 --- a/ts/session/utils/job_runners/jobs/GroupConfigJob.ts +++ b/ts/session/utils/job_runners/jobs/GroupConfigJob.ts @@ -1,6 +1,6 @@ /* eslint-disable no-await-in-loop */ import { GroupPubkeyType } from 'libsession_util_nodejs'; -import { compact, isArray, isEmpty, isNumber, isString } from 'lodash'; +import { isArray, isEmpty, isNumber, isString } from 'lodash'; import { UserUtils } from '../..'; import { ConfigDumpData } from '../../../../data/configDump/configDump'; import { ReleasedFeatures } from '../../../../util/releaseFeature'; @@ -17,7 +17,11 @@ import { getConversationController } from '../../../conversations'; import { MessageSender } from '../../../sending/MessageSender'; import { PubKey } from '../../../types'; import { allowOnlyOneAtATime } from '../../Promise'; -import { LibSessionUtil, PendingChangesForGroup } from '../../libsession/libsession_utils'; +import { + GroupSingleDestinationChanges, + LibSessionUtil, + PendingChangesForGroup, +} from '../../libsession/libsession_utils'; import { runners } from '../JobRunner'; import { AddJobCheckReturn, @@ -33,42 +37,13 @@ const defaultMaxAttempts = 2; * We want to run each of those jobs at least 3seconds apart. * So every time one of that job finishes, update this timestamp, so we know when adding a new job, what is the next minimun date to run it. */ -let lastRunConfigSyncJobTimestamp: number | null = null; - -type GroupSingleDestinationChanges = { - messages: Array; - allOldHashes: Array; -}; +const lastRunConfigSyncJobTimestamps = new Map(); type SuccessfulChange = { pushed: PendingChangesForGroup; updatedHash: string; }; -/** - * Later in the syncing logic, we want to batch-send all the updates for a pubkey in a single batch call. - * To make this easier, this function prebuilds and merges together all the changes for each pubkey. - */ -async function retrieveGroupSingleDestinationChanges( - groupPk: GroupPubkeyType -): Promise { - const outgoingConfResults = await LibSessionUtil.pendingChangesForGroup(groupPk); - - const compactedHashes = compact([...outgoingConfResults].map(m => m[1].oldMessageHashes)).flat(); - const sortedMessagesKeyFirst = compact( - [...outgoingConfResults.keys()] - .sort((a, b) => { - if (a === 'GroupKeys') return -1; - if (b === 'GroupKeys') return 1; - return 0; - }) - .map(key => { - return outgoingConfResults.get(key); - }) - ); - return { messages: sortedMessagesKeyFirst, allOldHashes: compactedHashes }; -} - /** * This function is run once we get the results from the multiple batch-send. */ @@ -94,7 +69,10 @@ function resultsToSuccessfulChange( for (let j = 0; j < result.length; j++) { const batchResult = result[j]; const messagePostedHashes = batchResult?.body?.hash; - console.warn('messagePostedHashes', messagePostedHashes); + console.error( + 'this might be wrong as the key message is first now messagePostedHashes', + messagePostedHashes + ); if (batchResult.code === 200 && isString(messagePostedHashes) && request.messages?.[j].data) { // libsession keeps track of the hashes to push and pushed using the hashes now @@ -116,6 +94,7 @@ async function buildAndSaveDumpsToDB( groupPk, { groupInfo: null, groupKeys: null, groupMember: null }, ]; + debugger; for (let i = 0; i < changes.length; i++) { const change = changes[i]; @@ -212,7 +191,7 @@ class GroupSyncJob extends PersistedJob { if (!newGroupsReleased) { return RunJobResult.Success; } - const singleDestChanges = await retrieveGroupSingleDestinationChanges(thisJobDestination); + const singleDestChanges = await LibSessionUtil.pendingChangesForGroup(thisJobDestination); // If there are no pending changes then the job can just complete (next time something // is updated we want to try and run immediately so don't scuedule another run in this case) @@ -287,7 +266,7 @@ class GroupSyncJob extends PersistedJob { } private updateLastTickTimestamp() { - lastRunConfigSyncJobTimestamp = Date.now(); + lastRunConfigSyncJobTimestamps.set(this.persistedData.identifier, Date.now()); } } @@ -301,6 +280,7 @@ async function queueNewJobIfNeeded(groupPk: GroupPubkeyType) { return; } + const lastRunConfigSyncJobTimestamp = lastRunConfigSyncJobTimestamps.get(groupPk); if ( !lastRunConfigSyncJobTimestamp || lastRunConfigSyncJobTimestamp < Date.now() - defaultMsBetweenRetries @@ -311,24 +291,25 @@ async function queueNewJobIfNeeded(groupPk: GroupPubkeyType) { await runners.groupSyncRunner.addJob( new GroupSyncJob({ identifier: groupPk, nextAttemptTimestamp: Date.now() + 1000 }) ); - } else { - // if we did run at t=100, and it is currently t=110, the difference is 10 - const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0); - // but we want to run every 30, so what we need is actually `30-10` from now = 20 - const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 1000); - // window.log.debug('Scheduling GroupSyncJob: LATER'); - - await runners.groupSyncRunner.addJob( - new GroupSyncJob({ - identifier: groupPk, - nextAttemptTimestamp: Date.now() + leftBeforeNextTick, - }) - ); + return; } + + // if we did run at t=100, and it is currently t=110, the difference is 10 + const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0); + // but we want to run every 30, so what we need is actually `30-10` from now = 20 + const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 1000); + // window.log.debug('Scheduling GroupSyncJob: LATER'); + + await runners.groupSyncRunner.addJob( + new GroupSyncJob({ + identifier: groupPk, + nextAttemptTimestamp: Date.now() + leftBeforeNextTick, + }) + ); } export const GroupSync = { GroupSyncJob, queueNewJobIfNeeded: (groupPk: GroupPubkeyType) => - allowOnlyOneAtATime('GroupSyncJob-oneAtAtTime' + groupPk, () => queueNewJobIfNeeded(groupPk)), + allowOnlyOneAtATime(`GroupSyncJob-oneAtAtTime-${groupPk}`, () => queueNewJobIfNeeded(groupPk)), }; diff --git a/ts/session/utils/libsession/libsession_utils.ts b/ts/session/utils/libsession/libsession_utils.ts index ded070533..175781e80 100644 --- a/ts/session/utils/libsession/libsession_utils.ts +++ b/ts/session/utils/libsession/libsession_utils.ts @@ -2,12 +2,13 @@ /* eslint-disable import/extensions */ /* eslint-disable import/no-unresolved */ import { GroupPubkeyType } from 'libsession_util_nodejs'; -import { difference, omit } from 'lodash'; +import { compact, difference, omit } from 'lodash'; import Long from 'long'; import { UserUtils } from '..'; import { ConfigDumpData } from '../../../data/configDump/configDump'; import { HexString } from '../../../node/hexStrings'; import { SignalService } from '../../../protobuf'; +import { UserConfigKind } from '../../../types/ProtobufKind'; import { assertUnreachable, toFixedUint8ArrayOfLength } from '../../../types/sqlSharedTypes'; import { ConfigWrapperGroupDetailed, @@ -27,10 +28,10 @@ import { SharedConfigMessage, SharedUserConfigMessage, } from '../../messages/outgoing/controlMessage/SharedConfigMessage'; +import { ed25519Str } from '../../onions/onionPath'; import { PubKey } from '../../types'; import { getUserED25519KeyPairBytes } from '../User'; import { ConfigurationSync } from '../job_runners/jobs/ConfigurationSyncJob'; -import { UserConfigKind } from '../../../types/ProtobufKind'; const requiredUserVariants: Array = [ 'UserConfig', @@ -198,57 +199,86 @@ async function pendingChangesForUs(): Promise< return results; } -export type PendingChangesForGroup = { +type PendingChangesForGroupShared = { data: Uint8Array; seqno: Long; timestamp: number; - oldMessageHashes: Array; namespace: SnodeNamespaces; }; +type PendingChangesForGroupNonKey = PendingChangesForGroupShared & { + type: Extract; +}; + +type PendingChangesForGroupKey = Pick< + PendingChangesForGroupShared, + 'data' | 'namespace' | 'timestamp' +> & { type: Extract }; + +export type PendingChangesForGroup = PendingChangesForGroupNonKey | PendingChangesForGroupKey; + +export type GroupSingleDestinationChanges = { + messages: Array; + allOldHashes: Set; +}; + async function pendingChangesForGroup( groupPk: GroupPubkeyType -): Promise> { - const results: Map = new Map(); - const variantsNeedingPush = new Set(); - +): Promise { + const results = new Array(); if (!PubKey.isClosedGroupV3(groupPk)) { throw new Error(`pendingChangesForGroup only works for user or 03 group pubkeys`); } - // one of the wrapper behind the metagroup needs a push const needsPush = await MetaGroupWrapperActions.needsPush(groupPk); // we probably need to add the GROUP_KEYS check here if (!needsPush) { - return results; + return { messages: results, allOldHashes: new Set() }; + } + + const { groupInfo, groupMember, groupKeys } = await MetaGroupWrapperActions.push(groupPk); + debugger; + + // Note: We need the keys to be pushed first to avoid a race condition + if (groupKeys) { + results.push({ + type: 'GroupKeys', + data: groupKeys.data, + namespace: groupKeys.namespace, + timestamp: GetNetworkTime.getNowWithNetworkOffset(), + }); } - const { groupInfo, groupMember } = await MetaGroupWrapperActions.push(groupPk); if (groupInfo) { - variantsNeedingPush.add('GroupInfo'); - results.set('GroupInfo', { + results.push({ + type: 'GroupInfo', data: groupInfo.data, seqno: Long.fromNumber(groupInfo.seqno), timestamp: GetNetworkTime.getNowWithNetworkOffset(), - oldMessageHashes: groupInfo.hashes, namespace: groupInfo.namespace, }); } if (groupMember) { - variantsNeedingPush.add('GroupMember'); - results.set('GroupMember', { + results.push({ + type: 'GroupMember', data: groupMember.data, seqno: Long.fromNumber(groupMember.seqno), timestamp: GetNetworkTime.getNowWithNetworkOffset(), - oldMessageHashes: groupMember.hashes, namespace: groupMember.namespace, }); } - window.log.info(`those variants needs push: "${[...variantsNeedingPush]}"`); + window.log.debug( + `${ed25519Str(groupPk)} those group variants needs push: "${results.map(m => m.type)}"` + ); - return results; + const memberHashes = compact(groupMember?.hashes) || []; + const infoHashes = compact(groupInfo?.hashes) || []; + const allOldHashes = new Set([...infoHashes, ...memberHashes]); + + console.error('compactedHashes', [...allOldHashes]); + return { messages: results, allOldHashes }; } function userKindToVariant(kind: UserConfigKind): ConfigWrapperUser { diff --git a/ts/state/ducks/groupInfos.ts b/ts/state/ducks/groupInfos.ts index 422cec21b..31c2b8333 100644 --- a/ts/state/ducks/groupInfos.ts +++ b/ts/state/ducks/groupInfos.ts @@ -11,6 +11,7 @@ import { } from '../../webworker/workers/browser/libsession_worker_interface'; import { toFixedUint8ArrayOfLength } from '../../types/sqlSharedTypes'; import { uniq } from 'lodash'; +import { GroupSync } from '../../session/utils/job_runners/jobs/GroupConfigJob'; type GroupInfoGetWithId = GroupInfoGet & { id: GroupPubkeyType }; @@ -76,6 +77,8 @@ const initNewGroupInfoInWrapper = createAsyncThunk( console.warn('store the v3 identityPrivatekeypair as part of the wrapper only?'); + await GroupSync.queueNewJobIfNeeded(newGroup.pubkeyHex); + const us = UserUtils.getOurPubKeyStrFromCache(); // Ensure the current user is a member and admin const members = uniq([...groupDetails.members, us]);