fix: add GroupSyncJob to push changes for group

pull/2873/head
Audric Ackermann 2 years ago
parent 42913371df
commit f86b3689ba

@ -116,6 +116,8 @@ async function startJobRunners() {
runners.avatarDownloadRunner.startProcessing();
await runners.configurationSyncRunner.loadJobsFromDb();
runners.configurationSyncRunner.startProcessing();
await runners.groupSyncRunner.loadJobsFromDb();
runners.groupSyncRunner.startProcessing();
}
// We need this 'first' check because we don't want to start the app up any other time

@ -34,17 +34,22 @@ export enum SnodeNamespaces {
/**
* This is the namespace used to sync the closed group details for each closed group
*/
ClosedGroupInfo = 11,
ClosedGroupMessages = 11,
/**
* This is the namespace used to sync the closed group details for each closed group
*/
ClosedGroupKeys = 12,
/**
* This is the namespace used to sync the members for each closed group
*/
ClosedGroupMembers = 12,
ClosedGroupInfo = 13,
/**
* This is the namespace used to sync the keys for each closed group
*/
ClosedGroupKeys = 13,
ClosedGroupMembers = 14,
}
export type SnodeNamespacesGroup = PickEnum<
@ -78,6 +83,7 @@ function isUserConfigNamespace(namespace: SnodeNamespaces) {
case SnodeNamespaces.ClosedGroupInfo:
case SnodeNamespaces.ClosedGroupKeys:
case SnodeNamespaces.ClosedGroupMembers:
case SnodeNamespaces.ClosedGroupMessages:
case SnodeNamespaces.LegacyClosedGroup:
return false;
@ -99,6 +105,7 @@ function isGroupConfigNamespace(namespace: SnodeNamespaces) {
case SnodeNamespaces.UserGroups:
case SnodeNamespaces.ConvoInfoVolatile:
case SnodeNamespaces.LegacyClosedGroup:
case SnodeNamespaces.ClosedGroupMessages:
return false;
case SnodeNamespaces.ClosedGroupInfo:
case SnodeNamespaces.ClosedGroupKeys:
@ -119,20 +126,18 @@ function isGroupConfigNamespace(namespace: SnodeNamespaces) {
function namespacePriority(namespace: SnodeNamespaces): number {
switch (namespace) {
case SnodeNamespaces.Default:
case SnodeNamespaces.ClosedGroupMessages:
return 10;
case SnodeNamespaces.UserContacts:
return 1;
case SnodeNamespaces.UserProfile:
return 1;
case SnodeNamespaces.UserGroups:
return 1;
case SnodeNamespaces.ConvoInfoVolatile:
case SnodeNamespaces.UserProfile:
case SnodeNamespaces.UserContacts:
return 1;
case SnodeNamespaces.LegacyClosedGroup:
case SnodeNamespaces.ClosedGroupInfo:
case SnodeNamespaces.ClosedGroupMembers:
case SnodeNamespaces.ClosedGroupKeys:
return 10;
return 1;
default:
try {

@ -189,9 +189,9 @@ export class SwarmPolling {
if (isV3) {
return this.pollOnceForKey(key, ConversationTypeEnum.GROUPV3, [
SnodeNamespaces.Default,
SnodeNamespaces.ClosedGroupKeys,
SnodeNamespaces.ClosedGroupInfo,
SnodeNamespaces.ClosedGroupMembers,
SnodeNamespaces.ClosedGroupKeys, // keys are fetched last to avoid race conditions when someone deposits them
]);
}
return this.pollOnceForKey(key, ConversationTypeEnum.GROUP, [

@ -2,6 +2,7 @@
import { AbortController } from 'abort-controller';
import ByteBuffer from 'bytebuffer';
import { GroupPubkeyType } from 'libsession_util_nodejs';
import _, { isEmpty, isNil, isString, sample, toNumber } from 'lodash';
import pRetry from 'p-retry';
import { Data } from '../../data/data';
@ -12,9 +13,6 @@ import {
sendMessageOnionV4BlindedRequest,
sendSogsMessageOnionV4,
} from '../apis/open_group_api/sogsv3/sogsV3SendMessage';
import { GetNetworkTime } from '../apis/snode_api/getNetworkTime';
import { SnodeNamespace, SnodeNamespaces } from '../apis/snode_api/namespaces';
import { getSwarmFor } from '../apis/snode_api/snodePool';
import {
NotEmptyArrayOfBatchResults,
StoreOnNodeData,
@ -22,6 +20,9 @@ import {
StoreOnNodeParams,
StoreOnNodeParamsNoSig,
} from '../apis/snode_api/SnodeRequestTypes';
import { GetNetworkTime } from '../apis/snode_api/getNetworkTime';
import { SnodeNamespace, SnodeNamespaces } from '../apis/snode_api/namespaces';
import { getSwarmFor } from '../apis/snode_api/snodePool';
import { SnodeSignature, SnodeSignatureResult } from '../apis/snode_api/snodeSignatures';
import { SnodeAPIStore } from '../apis/snode_api/storeMessage';
import { getConversationController } from '../conversations';
@ -29,17 +30,15 @@ import { MessageEncrypter } from '../crypto';
import { addMessagePadding } from '../crypto/BufferPadding';
import { ContentMessage } from '../messages/outgoing';
import { ConfigurationMessage } from '../messages/outgoing/controlMessage/ConfigurationMessage';
import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage';
import { SharedConfigMessage } from '../messages/outgoing/controlMessage/SharedConfigMessage';
import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage';
import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
import { ed25519Str } from '../onions/onionPath';
import { PubKey } from '../types';
import { RawMessage } from '../types/RawMessage';
import { EmptySwarmError } from '../utils/errors';
import { fromUInt8ArrayToBase64 } from '../utils/String';
import { GroupPubkeyType } from 'libsession_util_nodejs';
import { to_base64 } from 'libsodium-wrappers-sumo';
import { EmptySwarmError } from '../utils/errors';
// ================ SNODE STORE ================
@ -468,7 +467,7 @@ async function sendEncryptedDataToSnode(
return MessageSender.sendMessagesDataToSnode(
encryptedData.map(content => ({
pubkey: destination,
data64: to_base64(content.data),
data64: ByteBuffer.wrap(content.data).toString('base64'),
ttl: content.ttl,
timestamp: content.networkTimestamp,
namespace: content.namespace,

@ -1,6 +1,6 @@
/* eslint-disable no-await-in-loop */
import { GroupPubkeyType } from 'libsession_util_nodejs';
import { compact, isArray, isEmpty, isNumber, isString } from 'lodash';
import { isArray, isEmpty, isNumber, isString } from 'lodash';
import { UserUtils } from '../..';
import { ConfigDumpData } from '../../../../data/configDump/configDump';
import { ReleasedFeatures } from '../../../../util/releaseFeature';
@ -17,7 +17,11 @@ import { getConversationController } from '../../../conversations';
import { MessageSender } from '../../../sending/MessageSender';
import { PubKey } from '../../../types';
import { allowOnlyOneAtATime } from '../../Promise';
import { LibSessionUtil, PendingChangesForGroup } from '../../libsession/libsession_utils';
import {
GroupSingleDestinationChanges,
LibSessionUtil,
PendingChangesForGroup,
} from '../../libsession/libsession_utils';
import { runners } from '../JobRunner';
import {
AddJobCheckReturn,
@ -33,42 +37,13 @@ const defaultMaxAttempts = 2;
* We want to run each of those jobs at least 3seconds apart.
* So every time one of that job finishes, update this timestamp, so we know when adding a new job, what is the next minimun date to run it.
*/
let lastRunConfigSyncJobTimestamp: number | null = null;
type GroupSingleDestinationChanges = {
messages: Array<PendingChangesForGroup>;
allOldHashes: Array<string>;
};
const lastRunConfigSyncJobTimestamps = new Map<string, number | null>();
type SuccessfulChange = {
pushed: PendingChangesForGroup;
updatedHash: string;
};
/**
* Later in the syncing logic, we want to batch-send all the updates for a pubkey in a single batch call.
* To make this easier, this function prebuilds and merges together all the changes for each pubkey.
*/
async function retrieveGroupSingleDestinationChanges(
groupPk: GroupPubkeyType
): Promise<GroupSingleDestinationChanges> {
const outgoingConfResults = await LibSessionUtil.pendingChangesForGroup(groupPk);
const compactedHashes = compact([...outgoingConfResults].map(m => m[1].oldMessageHashes)).flat();
const sortedMessagesKeyFirst = compact(
[...outgoingConfResults.keys()]
.sort((a, b) => {
if (a === 'GroupKeys') return -1;
if (b === 'GroupKeys') return 1;
return 0;
})
.map(key => {
return outgoingConfResults.get(key);
})
);
return { messages: sortedMessagesKeyFirst, allOldHashes: compactedHashes };
}
/**
* This function is run once we get the results from the multiple batch-send.
*/
@ -94,7 +69,10 @@ function resultsToSuccessfulChange(
for (let j = 0; j < result.length; j++) {
const batchResult = result[j];
const messagePostedHashes = batchResult?.body?.hash;
console.warn('messagePostedHashes', messagePostedHashes);
console.error(
'this might be wrong as the key message is first now messagePostedHashes',
messagePostedHashes
);
if (batchResult.code === 200 && isString(messagePostedHashes) && request.messages?.[j].data) {
// libsession keeps track of the hashes to push and pushed using the hashes now
@ -116,6 +94,7 @@ async function buildAndSaveDumpsToDB(
groupPk,
{ groupInfo: null, groupKeys: null, groupMember: null },
];
debugger;
for (let i = 0; i < changes.length; i++) {
const change = changes[i];
@ -212,7 +191,7 @@ class GroupSyncJob extends PersistedJob<GroupSyncPersistedData> {
if (!newGroupsReleased) {
return RunJobResult.Success;
}
const singleDestChanges = await retrieveGroupSingleDestinationChanges(thisJobDestination);
const singleDestChanges = await LibSessionUtil.pendingChangesForGroup(thisJobDestination);
// If there are no pending changes then the job can just complete (next time something
// is updated we want to try and run immediately so don't scuedule another run in this case)
@ -287,7 +266,7 @@ class GroupSyncJob extends PersistedJob<GroupSyncPersistedData> {
}
private updateLastTickTimestamp() {
lastRunConfigSyncJobTimestamp = Date.now();
lastRunConfigSyncJobTimestamps.set(this.persistedData.identifier, Date.now());
}
}
@ -301,6 +280,7 @@ async function queueNewJobIfNeeded(groupPk: GroupPubkeyType) {
return;
}
const lastRunConfigSyncJobTimestamp = lastRunConfigSyncJobTimestamps.get(groupPk);
if (
!lastRunConfigSyncJobTimestamp ||
lastRunConfigSyncJobTimestamp < Date.now() - defaultMsBetweenRetries
@ -311,24 +291,25 @@ async function queueNewJobIfNeeded(groupPk: GroupPubkeyType) {
await runners.groupSyncRunner.addJob(
new GroupSyncJob({ identifier: groupPk, nextAttemptTimestamp: Date.now() + 1000 })
);
} else {
// if we did run at t=100, and it is currently t=110, the difference is 10
const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0);
// but we want to run every 30, so what we need is actually `30-10` from now = 20
const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 1000);
// window.log.debug('Scheduling GroupSyncJob: LATER');
await runners.groupSyncRunner.addJob(
new GroupSyncJob({
identifier: groupPk,
nextAttemptTimestamp: Date.now() + leftBeforeNextTick,
})
);
return;
}
// if we did run at t=100, and it is currently t=110, the difference is 10
const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0);
// but we want to run every 30, so what we need is actually `30-10` from now = 20
const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 1000);
// window.log.debug('Scheduling GroupSyncJob: LATER');
await runners.groupSyncRunner.addJob(
new GroupSyncJob({
identifier: groupPk,
nextAttemptTimestamp: Date.now() + leftBeforeNextTick,
})
);
}
export const GroupSync = {
GroupSyncJob,
queueNewJobIfNeeded: (groupPk: GroupPubkeyType) =>
allowOnlyOneAtATime('GroupSyncJob-oneAtAtTime' + groupPk, () => queueNewJobIfNeeded(groupPk)),
allowOnlyOneAtATime(`GroupSyncJob-oneAtAtTime-${groupPk}`, () => queueNewJobIfNeeded(groupPk)),
};

@ -2,12 +2,13 @@
/* eslint-disable import/extensions */
/* eslint-disable import/no-unresolved */
import { GroupPubkeyType } from 'libsession_util_nodejs';
import { difference, omit } from 'lodash';
import { compact, difference, omit } from 'lodash';
import Long from 'long';
import { UserUtils } from '..';
import { ConfigDumpData } from '../../../data/configDump/configDump';
import { HexString } from '../../../node/hexStrings';
import { SignalService } from '../../../protobuf';
import { UserConfigKind } from '../../../types/ProtobufKind';
import { assertUnreachable, toFixedUint8ArrayOfLength } from '../../../types/sqlSharedTypes';
import {
ConfigWrapperGroupDetailed,
@ -27,10 +28,10 @@ import {
SharedConfigMessage,
SharedUserConfigMessage,
} from '../../messages/outgoing/controlMessage/SharedConfigMessage';
import { ed25519Str } from '../../onions/onionPath';
import { PubKey } from '../../types';
import { getUserED25519KeyPairBytes } from '../User';
import { ConfigurationSync } from '../job_runners/jobs/ConfigurationSyncJob';
import { UserConfigKind } from '../../../types/ProtobufKind';
const requiredUserVariants: Array<ConfigWrapperUser> = [
'UserConfig',
@ -198,57 +199,86 @@ async function pendingChangesForUs(): Promise<
return results;
}
export type PendingChangesForGroup = {
type PendingChangesForGroupShared = {
data: Uint8Array;
seqno: Long;
timestamp: number;
oldMessageHashes: Array<string>;
namespace: SnodeNamespaces;
};
type PendingChangesForGroupNonKey = PendingChangesForGroupShared & {
type: Extract<ConfigWrapperGroupDetailed, 'GroupInfo' | 'GroupMember'>;
};
type PendingChangesForGroupKey = Pick<
PendingChangesForGroupShared,
'data' | 'namespace' | 'timestamp'
> & { type: Extract<ConfigWrapperGroupDetailed, 'GroupKeys'> };
export type PendingChangesForGroup = PendingChangesForGroupNonKey | PendingChangesForGroupKey;
export type GroupSingleDestinationChanges = {
messages: Array<PendingChangesForGroup>;
allOldHashes: Set<string>;
};
async function pendingChangesForGroup(
groupPk: GroupPubkeyType
): Promise<Map<ConfigWrapperGroupDetailed, PendingChangesForGroup>> {
const results: Map<ConfigWrapperGroupDetailed, PendingChangesForGroup> = new Map();
const variantsNeedingPush = new Set<ConfigWrapperGroupDetailed>();
): Promise<GroupSingleDestinationChanges> {
const results = new Array<PendingChangesForGroup>();
if (!PubKey.isClosedGroupV3(groupPk)) {
throw new Error(`pendingChangesForGroup only works for user or 03 group pubkeys`);
}
// one of the wrapper behind the metagroup needs a push
const needsPush = await MetaGroupWrapperActions.needsPush(groupPk);
// we probably need to add the GROUP_KEYS check here
if (!needsPush) {
return results;
return { messages: results, allOldHashes: new Set() };
}
const { groupInfo, groupMember, groupKeys } = await MetaGroupWrapperActions.push(groupPk);
debugger;
// Note: We need the keys to be pushed first to avoid a race condition
if (groupKeys) {
results.push({
type: 'GroupKeys',
data: groupKeys.data,
namespace: groupKeys.namespace,
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
});
}
const { groupInfo, groupMember } = await MetaGroupWrapperActions.push(groupPk);
if (groupInfo) {
variantsNeedingPush.add('GroupInfo');
results.set('GroupInfo', {
results.push({
type: 'GroupInfo',
data: groupInfo.data,
seqno: Long.fromNumber(groupInfo.seqno),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
oldMessageHashes: groupInfo.hashes,
namespace: groupInfo.namespace,
});
}
if (groupMember) {
variantsNeedingPush.add('GroupMember');
results.set('GroupMember', {
results.push({
type: 'GroupMember',
data: groupMember.data,
seqno: Long.fromNumber(groupMember.seqno),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
oldMessageHashes: groupMember.hashes,
namespace: groupMember.namespace,
});
}
window.log.info(`those variants needs push: "${[...variantsNeedingPush]}"`);
window.log.debug(
`${ed25519Str(groupPk)} those group variants needs push: "${results.map(m => m.type)}"`
);
return results;
const memberHashes = compact(groupMember?.hashes) || [];
const infoHashes = compact(groupInfo?.hashes) || [];
const allOldHashes = new Set([...infoHashes, ...memberHashes]);
console.error('compactedHashes', [...allOldHashes]);
return { messages: results, allOldHashes };
}
function userKindToVariant(kind: UserConfigKind): ConfigWrapperUser {

@ -11,6 +11,7 @@ import {
} from '../../webworker/workers/browser/libsession_worker_interface';
import { toFixedUint8ArrayOfLength } from '../../types/sqlSharedTypes';
import { uniq } from 'lodash';
import { GroupSync } from '../../session/utils/job_runners/jobs/GroupConfigJob';
type GroupInfoGetWithId = GroupInfoGet & { id: GroupPubkeyType };
@ -76,6 +77,8 @@ const initNewGroupInfoInWrapper = createAsyncThunk(
console.warn('store the v3 identityPrivatekeypair as part of the wrapper only?');
await GroupSync.queueNewJobIfNeeded(newGroup.pubkeyHex);
const us = UserUtils.getOurPubKeyStrFromCache();
// Ensure the current user is a member and admin
const members = uniq([...groupDetails.members, us]);

Loading…
Cancel
Save