fix: remove the whole kind logic and use namespace instead

this is because session doesn't care about the config it receives
anymore and just forwards them to libsession
pull/2873/head
Audric Ackermann 2 years ago
parent 0ef2df801e
commit d9300e67a0

@ -1,4 +1,5 @@
import * as wrappers from 'libsodium-wrappers-sumo';
type LibSodiumWrappers = typeof wrappers;
export async function getSodiumNode() {

@ -1,18 +1,17 @@
/* eslint-disable no-await-in-loop */
import { ContactInfo, UserGroupsGet } from 'libsession_util_nodejs';
import { base64_variants, from_base64 } from 'libsodium-wrappers-sumo';
import { compact, difference, isEmpty, isNil, isNumber, toNumber } from 'lodash';
import { ConfigDumpData } from '../data/configDump/configDump';
import { SettingsKey } from '../data/settings-key';
import { deleteAllMessagesByConvoIdNoConfirmation } from '../interactions/conversationInteractions';
import { CONVERSATION_PRIORITIES, ConversationTypeEnum } from '../models/conversationAttributes';
import { SignalService } from '../protobuf';
import { ClosedGroup } from '../session';
import { getOpenGroupManager } from '../session/apis/open_group_api/opengroupV2/OpenGroupManagerV2';
import { OpenGroupUtils } from '../session/apis/open_group_api/utils';
import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils';
import { getSwarmPollingInstance } from '../session/apis/snode_api';
import { ConvoHub } from '../session/conversations';
import { IncomingMessage } from '../session/messages/incoming/IncomingMessage';
import { ProfileManager } from '../session/profile_manager/ProfileManager';
import { PubKey } from '../session/types';
import { StringUtils, UserUtils } from '../session/utils';
@ -28,13 +27,20 @@ import { assertUnreachable, stringify, toFixedUint8ArrayOfLength } from '../type
import { BlockedNumberController } from '../util';
import { Storage, setLastProfileUpdateTimestamp } from '../util/storage';
// eslint-disable-next-line import/no-unresolved, import/extensions
import { HexString } from '../node/hexStrings';
import {
SnodeNamespace,
SnodeNamespaces,
UserConfigNamespaces,
} from '../session/apis/snode_api/namespaces';
import { RetrieveMessageItemWithNamespace } from '../session/apis/snode_api/types';
import { groupInfoActions } from '../state/ducks/groups';
import {
ConfigWrapperObjectTypesMeta,
ConfigWrapperUser,
getGroupPubkeyFromWrapperType,
isUserConfigWrapperType,
} from '../webworker/workers/browser/libsession_worker_functions';
import { UserConfigKind, isUserKind } from '../types/ProtobufKind';
import {
ContactsWrapperActions,
ConvoInfoVolatileWrapperActions,
@ -46,87 +52,87 @@ import {
import { addKeyPairToCacheAndDBIfNeeded } from './closedGroups';
import { HexKeyPair } from './keypairs';
import { queueAllCachedFromSource } from './receiver';
import { HexString } from '../node/hexStrings';
import { groupInfoActions } from '../state/ducks/groups';
type IncomingUserResult = {
needsPush: boolean;
needsDump: boolean;
kind: UserConfigKind;
publicKey: string;
latestEnvelopeTimestamp: number;
namespace: UserConfigNamespaces;
};
function byUserVariant(
incomingConfigs: Array<IncomingMessage<SignalService.ISharedConfigMessage>>
) {
function byUserNamespace(incomingConfigs: Array<RetrieveMessageItemWithNamespace>) {
const groupedByVariant: Map<
ConfigWrapperUser,
Array<IncomingMessage<SignalService.ISharedConfigMessage>>
UserConfigNamespaces,
Array<RetrieveMessageItemWithNamespace>
> = new Map();
incomingConfigs.forEach(incomingConfig => {
const { kind } = incomingConfig.message;
if (!isUserKind(kind)) {
throw new Error(`Invalid kind when handling userkinds: ${kind}`);
const { namespace } = incomingConfig;
if (!SnodeNamespace.isUserConfigNamespace(namespace)) {
throw new Error(`Invalid namespace on byUserNamespace: ${namespace}`);
}
const wrapperId = LibSessionUtil.userKindToVariant(kind);
if (!groupedByVariant.has(wrapperId)) {
groupedByVariant.set(wrapperId, []);
if (!groupedByVariant.has(namespace)) {
groupedByVariant.set(namespace, []);
}
groupedByVariant.get(wrapperId)?.push(incomingConfig);
groupedByVariant.get(namespace)?.push(incomingConfig);
});
return groupedByVariant;
}
async function printDumpForDebug(prefix: string, variant: ConfigWrapperObjectTypesMeta) {
if (isUserConfigWrapperType(variant)) {
window.log.info(prefix, StringUtils.toHex(await GenericWrapperActions.dump(variant)));
window.log.info(prefix, StringUtils.toHex(await GenericWrapperActions.makeDump(variant)));
return;
}
const metaGroupDumps = await MetaGroupWrapperActions.metaDebugDump(
const metaGroupDumps = await MetaGroupWrapperActions.metaMakeDump(
getGroupPubkeyFromWrapperType(variant)
);
window.log.info(prefix, StringUtils.toHex(metaGroupDumps));
}
async function mergeUserConfigsWithIncomingUpdates(
incomingConfigs: Array<IncomingMessage<SignalService.ISharedConfigMessage>>
incomingConfigs: Array<RetrieveMessageItemWithNamespace>
): Promise<Map<ConfigWrapperUser, IncomingUserResult>> {
// first, group by variant so we do a single merge call
// Note: this call throws if given a non user kind as this functio should only handle user variants/kinds
const groupedByVariant = byUserVariant(incomingConfigs);
// first, group by namesapces so we do a single merge call
// Note: this call throws if given a non user kind as this function should only handle user variants/kinds
const groupedByNamespaces = byUserNamespace(incomingConfigs);
const groupedResults: Map<ConfigWrapperUser, IncomingUserResult> = new Map();
const publicKey = UserUtils.getOurPubKeyStrFromCache();
const us = UserUtils.getOurPubKeyStrFromCache();
try {
for (let index = 0; index < groupedByVariant.size; index++) {
const variant = [...groupedByVariant.keys()][index];
const sameVariant = groupedByVariant.get(variant);
for (let index = 0; index < groupedByNamespaces.size; index++) {
const namespace = [...groupedByNamespaces.keys()][index];
const sameVariant = groupedByNamespaces.get(namespace);
if (!sameVariant?.length) {
continue;
}
const toMerge = sameVariant.map(msg => ({
data: msg.message.data,
hash: msg.messageHash,
data: from_base64(msg.data, base64_variants.ORIGINAL),
hash: msg.hash,
}));
const variant = LibSessionUtil.userNamespaceToVariant(namespace);
if (window.sessionFeatureFlags.debug.debugLibsessionDumps) {
await printDumpForDebug(`printDumpsForDebugging: before merge of ${variant}:`, variant);
}
const mergedCount = await GenericWrapperActions.merge(variant, toMerge);
const hashesMerged = await GenericWrapperActions.merge(variant, toMerge);
const needsDump = await GenericWrapperActions.needsDump(variant);
const needsPush = await GenericWrapperActions.needsPush(variant);
const latestEnvelopeTimestamp = Math.max(...sameVariant.map(m => m.envelopeTimestamp));
const mergedTimestamps = sameVariant
.filter(m => hashesMerged.includes(m.hash))
.map(m => m.timestamp);
const latestEnvelopeTimestamp = Math.max(...mergedTimestamps);
window.log.debug(
`${variant}: needsPush:${needsPush} needsDump:${needsDump}; mergedCount:${mergedCount} `
`${variant}: needsPush:${needsPush} needsDump:${needsDump}; mergedCount:${hashesMerged.length} `
);
if (window.sessionFeatureFlags.debug.debugLibsessionDumps) {
@ -135,8 +141,8 @@ async function mergeUserConfigsWithIncomingUpdates(
const incomingConfResult: IncomingUserResult = {
needsDump,
needsPush,
kind: LibSessionUtil.userVariantToUserKind(variant),
publicKey,
publicKey: us,
namespace,
latestEnvelopeTimestamp: latestEnvelopeTimestamp || Date.now(),
};
groupedResults.set(variant, incomingConfResult);
@ -848,29 +854,32 @@ async function processUserMergingResults(results: Map<ConfigWrapperUser, Incomin
}
try {
const { kind } = incomingResult;
switch (kind) {
case SignalService.SharedConfigMessage.Kind.USER_PROFILE:
const { namespace } = incomingResult;
switch (namespace) {
case SnodeNamespaces.UserProfile:
await handleUserProfileUpdate(incomingResult);
break;
case SignalService.SharedConfigMessage.Kind.CONTACTS:
case SnodeNamespaces.UserContacts:
await handleContactsUpdate();
break;
case SignalService.SharedConfigMessage.Kind.USER_GROUPS:
case SnodeNamespaces.UserGroups:
await handleUserGroupsUpdate(incomingResult);
break;
case SignalService.SharedConfigMessage.Kind.CONVO_INFO_VOLATILE:
case SnodeNamespaces.ConvoInfoVolatile:
await handleConvoInfoVolatileUpdate();
break;
default:
try {
// we catch errors here because an old client knowing about a new type of config coming from the network should not just crash
assertUnreachable(kind, `processUserMergingResults unsupported kind: "${kind}"`);
assertUnreachable(
namespace,
`processUserMergingResults unsupported namespace: "${namespace}"`
);
} catch (e) {
window.log.warn('assertUnreachable failed', e.message);
}
}
const variant = LibSessionUtil.userKindToVariant(kind);
const variant = LibSessionUtil.userNamespaceToVariant(namespace);
try {
await updateLibsessionLatestProcessedUserTimestamp(
variant,
@ -906,7 +915,7 @@ async function processUserMergingResults(results: Map<ConfigWrapperUser, Incomin
}
async function handleUserConfigMessagesViaLibSession(
configMessages: Array<IncomingMessage<SignalService.ISharedConfigMessage>>
configMessages: Array<RetrieveMessageItemWithNamespace>
) {
if (isEmpty(configMessages)) {
return;
@ -915,9 +924,8 @@ async function handleUserConfigMessagesViaLibSession(
window?.log?.debug(
`Handling our sharedConfig message via libsession_util ${JSON.stringify(
configMessages.map(m => ({
kind: m.message.kind,
hash: m.messageHash,
seqno: (m.message.seqno as Long).toNumber(),
hash: m.hash,
namespace: m.namespace,
}))
)}`
);

@ -1,5 +1,4 @@
import { GroupPubkeyType } from 'libsession_util_nodejs';
import { SharedUserConfigMessage } from '../../messages/outgoing/controlMessage/SharedConfigMessage';
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { SnodeNamespaces, SnodeNamespacesGroup } from './namespaces';
export type SwarmForSubRequest = { method: 'get_swarm'; params: { pubkey: string } };
@ -109,15 +108,8 @@ export type DeleteFromNodeWithTimestampParams = {
} & DeleteSigParameters;
export type DeleteByHashesFromNodeParams = { messages: Array<string> } & DeleteSigParameters;
export type StoreOnNodeMessage = {
pubkey: string;
timestamp: number;
namespace: number;
message: SharedUserConfigMessage;
};
export type StoreOnNodeData = {
pubkey: GroupPubkeyType;
pubkey: GroupPubkeyType | PubkeyType;
networkTimestamp: number;
namespace: number;
data: Uint8Array;

@ -76,14 +76,22 @@ export type SnodeNamespacesUser = PickEnum<
SnodeNamespaces.UserContacts | SnodeNamespaces.UserProfile | SnodeNamespaces.Default
>;
export type UserConfigNamespaces = PickEnum<
SnodeNamespaces,
| SnodeNamespaces.UserProfile
| SnodeNamespaces.UserContacts
| SnodeNamespaces.UserGroups
| SnodeNamespaces.ConvoInfoVolatile
>;
/**
* Returns true if that namespace is associated with the config of a user (not his messages, only configs)
*/
// eslint-disable-next-line consistent-return
function isUserConfigNamespace(namespace: SnodeNamespaces) {
function isUserConfigNamespace(namespace: SnodeNamespaces): namespace is UserConfigNamespaces {
switch (namespace) {
case SnodeNamespaces.UserContacts:
case SnodeNamespaces.UserProfile:
case SnodeNamespaces.UserContacts:
case SnodeNamespaces.UserGroups:
case SnodeNamespaces.ConvoInfoVolatile:
return true;

@ -25,7 +25,7 @@ import { ed25519Str } from '../../onions/onionPath';
import { StringUtils, UserUtils } from '../../utils';
import { perfEnd, perfStart } from '../../utils/Performance';
import { LibSessionUtil } from '../../utils/libsession/libsession_utils';
import { SnodeNamespace, SnodeNamespaces } from './namespaces';
import { SnodeNamespace, SnodeNamespaces, UserConfigNamespaces } from './namespaces';
import { PollForGroup, PollForLegacy, PollForUs } from './pollingTypes';
import { SnodeAPIRetrieve } from './retrieveRequest';
import { SwarmPollingGroupConfig } from './swarm_polling_config/SwarmPollingGroupConfig';
@ -579,15 +579,16 @@ export class SwarmPolling {
}
// eslint-disable-next-line consistent-return
public getNamespacesToPollFrom(type: ConversationTypeEnum): Array<SnodeNamespaces> {
public getNamespacesToPollFrom(type: ConversationTypeEnum) {
if (type === ConversationTypeEnum.PRIVATE) {
return [
const toRet: Array<UserConfigNamespaces | SnodeNamespaces.Default> = [
SnodeNamespaces.Default,
SnodeNamespaces.UserProfile,
SnodeNamespaces.UserContacts,
SnodeNamespaces.UserGroups,
SnodeNamespaces.ConvoInfoVolatile,
];
return toRet;
}
if (type === ConversationTypeEnum.GROUP) {
return [SnodeNamespaces.LegacyClosedGroup];

@ -1,64 +0,0 @@
import { compact, toNumber } from 'lodash';
import { RetrieveMessageItem } from '../types';
import { extractWebSocketContent } from '../swarmPolling';
import { SignalService } from '../../../../protobuf';
import { IncomingMessage } from '../../../messages/incoming/IncomingMessage';
import { EnvelopePlus } from '../../../../receiver/types';
function extractWebSocketContents(configMsgs: Array<RetrieveMessageItem>) {
try {
return compact(
configMsgs.map((m: RetrieveMessageItem) => {
return extractWebSocketContent(m.data, m.hash);
})
);
} catch (e) {
window.log.warn('extractWebSocketContents failed with ', e.message);
return [];
}
}
async function decryptSharedConfigMessages(
extractedMsgs: ReturnType<typeof extractWebSocketContents>,
decryptEnvelope: (envelope: EnvelopePlus) => Promise<ArrayBuffer | null>
) {
const allDecryptedConfigMessages: Array<IncomingMessage<SignalService.ISharedConfigMessage>> = [];
for (let index = 0; index < extractedMsgs.length; index++) {
const groupConfigMessage = extractedMsgs[index];
try {
const envelope: EnvelopePlus = SignalService.Envelope.decode(groupConfigMessage.body) as any;
// eslint-disable-next-line no-await-in-loop
const decryptedEnvelope = await decryptEnvelope(envelope);
if (!decryptedEnvelope?.byteLength) {
continue;
}
const content = SignalService.Content.decode(new Uint8Array(decryptedEnvelope));
if (content.sharedConfigMessage) {
const asIncomingMsg: IncomingMessage<SignalService.ISharedConfigMessage> = {
envelopeTimestamp: toNumber(envelope.timestamp),
message: content.sharedConfigMessage,
messageHash: groupConfigMessage.messageHash,
authorOrGroupPubkey: envelope.source,
authorInGroup: envelope.senderIdentity,
};
allDecryptedConfigMessages.push(asIncomingMsg);
} else {
throw new Error(
'received a message to a namespace reserved for user config but not containign a sharedConfigMessage'
);
}
} catch (e) {
window.log.warn(
`failed to decrypt message with hash "${groupConfigMessage.messageHash}": ${e.message}`
);
}
}
return allDecryptedConfigMessages;
}
export const SwarmPollingConfigShared = {
decryptSharedConfigMessages,
extractWebSocketContents,
};

@ -1,31 +1,19 @@
import { ConfigMessageHandler } from '../../../../receiver/configMessage';
import { decryptEnvelopeWithOurKey } from '../../../../receiver/contentMessage';
import { RetrieveMessageItem } from '../types';
import { SwarmPollingConfigShared } from './SwarmPollingConfigShared';
import { RetrieveMessageItemWithNamespace } from '../types';
async function handleUserSharedConfigMessages(
userConfigMessagesMerged: Array<RetrieveMessageItem>
userConfigMessagesMerged: Array<RetrieveMessageItemWithNamespace>
) {
window.log.info(`received userConfigMessagesMerged count: ${userConfigMessagesMerged.length}`);
try {
const extractedUserConfigMessage =
SwarmPollingConfigShared.extractWebSocketContents(userConfigMessagesMerged);
const allDecryptedConfigMessages = await SwarmPollingConfigShared.decryptSharedConfigMessages(
extractedUserConfigMessage,
decryptEnvelopeWithOurKey
);
if (allDecryptedConfigMessages.length) {
if (userConfigMessagesMerged.length) {
try {
window.log.info(
`handleConfigMessagesViaLibSession of "${allDecryptedConfigMessages.length}" messages with libsession`
);
await ConfigMessageHandler.handleUserConfigMessagesViaLibSession(
allDecryptedConfigMessages
`handleConfigMessagesViaLibSession of "${userConfigMessagesMerged.length}" messages with libsession`
);
await ConfigMessageHandler.handleUserConfigMessagesViaLibSession(userConfigMessagesMerged);
} catch (e) {
const allMessageHases = allDecryptedConfigMessages.map(m => m.messageHash).join(',');
const allMessageHases = userConfigMessagesMerged.map(m => m.hash).join(',');
window.log.warn(
`failed to handle messages hashes "${allMessageHases}" with libsession. Error: "${e.message}"`
);

@ -1,59 +0,0 @@
// this is not a very good name, but a configuration message is a message sent to our other devices so sync our current public and closed groups
import Long from 'long';
import { SignalService } from '../../../../protobuf';
import { MessageParams } from '../Message';
import { ContentMessage } from '..';
import { TTL_DEFAULT } from '../../../constants';
import { UserConfigKind } from '../../../../types/ProtobufKind';
interface SharedConfigParams<KindsPicked extends UserConfigKind> extends MessageParams {
seqno: Long;
data: Uint8Array;
kind: KindsPicked;
}
export abstract class SharedConfigMessage<
KindsPicked extends UserConfigKind,
> extends ContentMessage {
public readonly seqno: Long;
public readonly kind: KindsPicked;
public readonly data: Uint8Array;
constructor(params: SharedConfigParams<KindsPicked>) {
super({ timestamp: params.timestamp, identifier: params.identifier });
this.data = params.data;
this.kind = params.kind;
this.seqno = params.seqno;
}
public contentProto(): SignalService.Content {
return new SignalService.Content({
sharedConfigMessage: this.sharedConfigProto(),
});
}
public ttl(): number {
return TTL_DEFAULT.TTL_CONFIG;
}
protected sharedConfigProto(): SignalService.SharedConfigMessage {
return new SignalService.SharedConfigMessage({
data: this.data,
kind: this.kind,
seqno: this.seqno,
});
}
}
export class SharedUserConfigMessage extends SharedConfigMessage<UserConfigKind> {
constructor(params: SharedConfigParams<UserConfigKind>) {
super({
timestamp: params.timestamp,
identifier: params.identifier,
data: params.data,
kind: params.kind,
seqno: params.seqno,
});
}
}

@ -31,7 +31,6 @@ import {
SnodeNamespacesUser,
} from '../apis/snode_api/namespaces';
import { CallMessage } from '../messages/outgoing/controlMessage/CallMessage';
import { SharedConfigMessage } from '../messages/outgoing/controlMessage/SharedConfigMessage';
import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
@ -234,7 +233,6 @@ export class MessageQueue {
if (
!(message instanceof ConfigurationMessage) &&
!(message instanceof UnsendMessage) &&
!(message instanceof SharedConfigMessage) &&
!(message as any)?.syncTarget
) {
throw new Error('Invalid message given to sendSyncMessage');

@ -2,8 +2,8 @@
import { AbortController } from 'abort-controller';
import ByteBuffer from 'bytebuffer';
import { GroupPubkeyType } from 'libsession_util_nodejs';
import _, { isEmpty, isNil, isString, sample, toNumber } from 'lodash';
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import _, { isEmpty, isNil, sample, toNumber } from 'lodash';
import pRetry from 'p-retry';
import { Data } from '../../data/data';
import { SignalService } from '../../protobuf';
@ -17,7 +17,6 @@ import {
import {
NotEmptyArrayOfBatchResults,
StoreOnNodeData,
StoreOnNodeMessage,
StoreOnNodeParams,
StoreOnNodeParamsNoSig,
} from '../apis/snode_api/SnodeRequestTypes';
@ -31,7 +30,6 @@ import { MessageEncrypter } from '../crypto';
import { addMessagePadding } from '../crypto/BufferPadding';
import { ContentMessage } from '../messages/outgoing';
import { ConfigurationMessage } from '../messages/outgoing/controlMessage/ConfigurationMessage';
import { SharedConfigMessage } from '../messages/outgoing/controlMessage/SharedConfigMessage';
import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage';
import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
@ -84,7 +82,6 @@ function isSyncMessage(message: ContentMessage) {
message instanceof ConfigurationMessage ||
message instanceof ClosedGroupNewMessage ||
message instanceof UnsendMessage ||
message instanceof SharedConfigMessage ||
(message as any).syncTarget?.length > 0
) {
return true;
@ -268,7 +265,7 @@ async function sendMessagesDataToSnode(
if (!isEmpty(storeResults)) {
window?.log?.info(
`sendMessagesToSnode - Successfully stored messages to ${ed25519Str(destination)} via ${
`sendMessagesDataToSnode - Successfully stored messages to ${ed25519Str(destination)} via ${
snode.ip
}:${snode.port} on namespaces: ${rightDestination.map(m => m.namespace).join(',')}`
);
@ -278,7 +275,7 @@ async function sendMessagesDataToSnode(
} catch (e) {
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
window?.log?.warn(
`sendMessagesToSnode - "${e.code}:${e.message}" to ${destination} via snode:${snodeStr}`
`sendMessagesDataToSnode - "${e.code}:${e.message}" to ${destination} via snode:${snodeStr}`
);
throw e;
}
@ -354,110 +351,6 @@ async function encryptMessagesAndWrap(
return Promise.all(messages.map(encryptMessageAndWrap));
}
/**
* Send a list of messages to a single service node.
* Used currently only for sending SharedConfigMessage to multiple messages at a time.
*
* @param params the messages to deposit
* @param destination the pubkey we should deposit those message for
* @returns the hashes of successful deposit
*/
async function sendMessagesToSnode(
params: Array<StoreOnNodeMessage>,
destination: string,
messagesHashesToDelete: Set<string> | null
): Promise<NotEmptyArrayOfBatchResults | null> {
try {
const recipient = PubKey.cast(destination);
const encryptedAndWrapped = await encryptMessagesAndWrap(
params.map(m => ({
destination: m.pubkey,
plainTextBuffer: m.message.plainTextBuffer(),
namespace: m.namespace,
ttl: m.message.ttl(),
identifier: m.message.identifier,
isSyncMessage: MessageSender.isSyncMessage(m.message),
}))
);
// first update all the associated timestamps of our messages in DB, if the outgoing messages are associated with one.
await Promise.all(
encryptedAndWrapped.map(async (m, index) => {
// make sure to update the local sent_at timestamp, because sometimes, we will get the just pushed message in the receiver side
// before we return from the await below.
// and the isDuplicate messages relies on sent_at timestamp to be valid.
const found = await Data.getMessageById(m.identifier);
// make sure to not update the sent timestamp if this a currently syncing message
if (found && !found.get('sentSync')) {
found.set({ sent_at: encryptedAndWrapped[index].networkTimestamp });
await found.commit();
}
})
);
const batchResults = await pRetry(
async () => {
return MessageSender.sendMessagesDataToSnode(
encryptedAndWrapped.map(wrapped => ({
pubkey: recipient.key,
data64: wrapped.data64,
ttl: wrapped.ttl,
timestamp: wrapped.networkTimestamp,
namespace: wrapped.namespace,
})),
recipient.key,
messagesHashesToDelete,
messagesHashesToDelete?.size ? 'sequence' : 'batch'
);
},
{
retries: 2,
factor: 1,
minTimeout: MessageSender.getMinRetryTimeout(),
maxTimeout: 1000,
}
);
if (!batchResults || isEmpty(batchResults)) {
throw new Error('result is empty for sendMessagesToSnode');
}
const isDestinationClosedGroup = ConvoHub.use().get(recipient.key)?.isClosedGroup();
await Promise.all(
encryptedAndWrapped.map(async (message, index) => {
// If message also has a sync message, save that hash. Otherwise save the hash from the regular message send i.e. only closed groups in this case.
if (
message.identifier &&
(message.isSyncMessage || isDestinationClosedGroup) &&
batchResults[index] &&
!isEmpty(batchResults[index]) &&
isString(batchResults[index].body.hash)
) {
const hashFoundInResponse = batchResults[index].body.hash;
const foundMessage = await Data.getMessageById(message.identifier);
if (foundMessage) {
await foundMessage.updateMessageHash(hashFoundInResponse);
await foundMessage.commit();
window?.log?.info(
`updated message ${foundMessage.get('id')} with hash: ${foundMessage.get(
'messageHash'
)}`
);
}
}
})
);
return batchResults;
} catch (e) {
window.log.warn(`sendMessagesToSnode failed with ${e.message}`);
return null;
}
}
/**
* Send an array of preencrypted data to the corresponding swarm.
* Used currently only for sending libsession GroupInfo, GroupMembers and groupKeys config updates.
@ -468,7 +361,7 @@ async function sendMessagesToSnode(
*/
async function sendEncryptedDataToSnode(
encryptedData: Array<StoreOnNodeData>,
destination: GroupPubkeyType,
destination: GroupPubkeyType | PubkeyType,
messagesHashesToDelete: Set<string> | null
): Promise<NotEmptyArrayOfBatchResults | null> {
try {
@ -602,7 +495,6 @@ async function sendToOpenGroupV2BlindedRequest(
export const MessageSender = {
sendToOpenGroupV2BlindedRequest,
sendMessagesDataToSnode,
sendMessagesToSnode,
sendEncryptedDataToSnode,
getMinRetryTimeout,
sendToOpenGroupV2,

@ -40,6 +40,7 @@ export function getOurPubKeyStrFromCache(): string {
if (!ourNumber) {
throw new Error('ourNumber is not set');
}
return ourNumber;
}

@ -1,18 +1,26 @@
/* eslint-disable no-await-in-loop */
import { compact, isArray, isEmpty, isNumber, isString } from 'lodash';
import { PubkeyType } from 'libsession_util_nodejs';
import { isArray, isEmpty, isNumber, isString } from 'lodash';
import { v4 } from 'uuid';
import { UserUtils } from '../..';
import { ConfigDumpData } from '../../../../data/configDump/configDump';
import { ConfigurationSyncJobDone } from '../../../../shims/events';
import { ReleasedFeatures } from '../../../../util/releaseFeature';
import { isSignInByLinking } from '../../../../util/storage';
import { GenericWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface';
import { NotEmptyArrayOfBatchResults } from '../../../apis/snode_api/SnodeRequestTypes';
import {
NotEmptyArrayOfBatchResults,
StoreOnNodeData,
} from '../../../apis/snode_api/SnodeRequestTypes';
import { GetNetworkTime } from '../../../apis/snode_api/getNetworkTime';
import { TTL_DEFAULT } from '../../../constants';
import { ConvoHub } from '../../../conversations';
import { SharedUserConfigMessage } from '../../../messages/outgoing/controlMessage/SharedConfigMessage';
import { MessageSender } from '../../../sending/MessageSender';
import { allowOnlyOneAtATime } from '../../Promise';
import { LibSessionUtil, OutgoingConfResult } from '../../libsession/libsession_utils';
import {
LibSessionUtil,
PendingChangesForUs,
UserSingleDestinationChanges,
} from '../../libsession/libsession_utils';
import { runners } from '../JobRunner';
import {
AddJobCheckReturn,
@ -20,7 +28,6 @@ import {
PersistedJob,
RunJobResult,
} from '../PersistedJob';
import { UserConfigKind } from '../../../../types/ProtobufKind';
const defaultMsBetweenRetries = 15000; // a long time between retries, to avoid running multiple jobs at the same time, when one was postponed at the same time as one already planned (5s)
const defaultMaxAttempts = 2;
@ -31,41 +38,19 @@ const defaultMaxAttempts = 2;
*/
let lastRunConfigSyncJobTimestamp: number | null = null;
type SingleDestinationChanges = {
messages: Array<OutgoingConfResult<UserConfigKind, SharedUserConfigMessage>>;
allOldHashes: Array<string>;
};
type SuccessfulChange = {
message: SharedUserConfigMessage;
type UserSuccessfulChange = {
pushed: PendingChangesForUs;
updatedHash: string;
};
/**
* Later in the syncing logic, we want to batch-send all the updates for a pubkey in a single batch call.
* To make this easier, this function prebuilds and merges together all the changes for each pubkey.
*/
async function retrieveSingleDestinationChanges(
destination: string
): Promise<SingleDestinationChanges> {
if (destination !== UserUtils.getOurPubKeyStrFromCache()) {
throw new Error('retrieveSingleDestinationChanges can only be us for ConfigurationSyncJob');
}
const outgoingConfResults = await LibSessionUtil.pendingChangesForUs();
const compactedHashes = compact(outgoingConfResults.map(m => m.oldMessageHashes)).flat();
return { messages: outgoingConfResults, allOldHashes: compactedHashes };
}
/**
* This function is run once we get the results from the multiple batch-send.
*/
function resultsToSuccessfulChange(
result: NotEmptyArrayOfBatchResults | null,
request: SingleDestinationChanges
): Array<SuccessfulChange> {
const successfulChanges: Array<SuccessfulChange> = [];
request: UserSingleDestinationChanges
): Array<UserSuccessfulChange> {
const successfulChanges: Array<UserSuccessfulChange> = [];
/**
* For each batch request, we get as result
@ -84,15 +69,11 @@ function resultsToSuccessfulChange(
const batchResult = result[j];
const messagePostedHashes = batchResult?.body?.hash;
if (
batchResult.code === 200 &&
isString(messagePostedHashes) &&
request.messages?.[j].message
) {
if (batchResult.code === 200 && isString(messagePostedHashes) && request.messages?.[j]) {
// the library keeps track of the hashes to push and pushed using the hashes now
successfulChanges.push({
updatedHash: messagePostedHashes,
message: request.messages?.[j].message,
pushed: request.messages?.[j],
});
}
}
@ -101,16 +82,16 @@ function resultsToSuccessfulChange(
}
async function buildAndSaveDumpsToDB(
changes: Array<SuccessfulChange>,
destination: string
changes: Array<UserSuccessfulChange>,
us: string
): Promise<void> {
for (let i = 0; i < changes.length; i++) {
const change = changes[i];
const variant = LibSessionUtil.userKindToVariant(change.message.kind);
const variant = LibSessionUtil.userNamespaceToVariant(change.pushed.namespace);
const needsDump = await LibSessionUtil.markAsPushed(
variant,
change.message.seqno.toNumber(),
change.pushed.seqno.toNumber(),
change.updatedHash
);
@ -120,13 +101,13 @@ async function buildAndSaveDumpsToDB(
const dump = await GenericWrapperActions.dump(variant);
await ConfigDumpData.saveConfigDump({
data: dump,
publicKey: destination,
publicKey: us,
variant,
});
}
}
async function saveDumpsNeededToDB(destination: string) {
async function saveDumpsNeededToDB(us: string) {
for (let i = 0; i < LibSessionUtil.requiredUserVariants.length; i++) {
const variant = LibSessionUtil.requiredUserVariants[i];
const needsDump = await GenericWrapperActions.needsDump(variant);
@ -137,88 +118,54 @@ async function saveDumpsNeededToDB(destination: string) {
const dump = await GenericWrapperActions.dump(variant);
await ConfigDumpData.saveConfigDump({
data: dump,
publicKey: destination,
publicKey: us,
variant,
});
}
}
class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData> {
constructor({
identifier,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
}: Partial<
Pick<
ConfigurationSyncPersistedData,
'identifier' | 'nextAttemptTimestamp' | 'currentRetry' | 'maxAttempts'
>
>) {
super({
jobType: 'ConfigurationSyncJobType',
identifier: identifier || v4(),
delayBetweenRetries: defaultMsBetweenRetries,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : defaultMaxAttempts,
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now(),
});
}
public async run(): Promise<RunJobResult> {
const start = Date.now();
function triggerConfSyncJobDone() {
window.Whisper.events.trigger(ConfigurationSyncJobDone);
}
try {
window.log.debug(`ConfigurationSyncJob starting ${this.persistedData.identifier}`);
function isPubkey(us: string): us is PubkeyType {
return us.startsWith('05');
}
async function pushChangesToUserSwarmIfNeeded() {
const us = UserUtils.getOurPubKeyStrFromCache();
const ed25519Key = await UserUtils.getUserED25519KeyPairBytes();
const conversation = ConvoHub.use().get(us);
if (!us || !conversation || !ed25519Key) {
// we check for ed25519Key because it is needed for authenticated requests
window.log.warn('did not find our own conversation');
return RunJobResult.PermanentFailure;
if (!isPubkey(us)) {
throw new Error('invalid user pubkey, not right prefix');
}
// TODOLATER add a way to have a few configuration sync jobs running at the same time, but only a single one per pubkey
const thisJobDestination = us;
// save the dumps to DB even before trying to push them, so at least we have an up to date dumps in the DB in case of crash, no network etc
await saveDumpsNeededToDB(thisJobDestination);
const userConfigLibsession = await ReleasedFeatures.checkIsUserConfigFeatureReleased();
// if the feature flag is not enabled, we want to keep updating the dumps, but just not sync them.
if (!userConfigLibsession) {
this.triggerConfSyncJobDone();
return RunJobResult.Success;
}
const singleDestChanges = await retrieveSingleDestinationChanges(thisJobDestination);
await saveDumpsNeededToDB(us);
const singleDestChanges = await LibSessionUtil.pendingChangesForUs();
// If there are no pending changes then the job can just complete (next time something
// is updated we want to try and run immediately so don't scuedule another run in this case)
if (isEmpty(singleDestChanges?.messages)) {
this.triggerConfSyncJobDone();
triggerConfSyncJobDone();
return RunJobResult.Success;
}
const oldHashesToDelete = new Set(singleDestChanges.allOldHashes);
const msgs = singleDestChanges.messages.map(item => {
const msgs: Array<StoreOnNodeData> = singleDestChanges.messages.map(item => {
return {
namespace: item.namespace,
pubkey: thisJobDestination,
timestamp: item.message.timestamp,
ttl: item.message.ttl(),
message: item.message,
pubkey: us,
networkTimestamp: GetNetworkTime.getNowWithNetworkOffset(),
ttl: TTL_DEFAULT.TTL_CONFIG,
data: item.ciphertext,
};
});
const result = await MessageSender.sendMessagesToSnode(
const result = await MessageSender.sendEncryptedDataToSnode(
msgs,
thisJobDestination,
oldHashesToDelete
us,
singleDestChanges.allOldHashes
);
const expectedReplyLength =
singleDestChanges.messages.length + (oldHashesToDelete.size ? 1 : 0);
singleDestChanges.messages.length + (singleDestChanges.allOldHashes.size ? 1 : 0);
// we do a sequence call here. If we do not have the right expected number of results, consider it a failure
if (!isArray(result) || result.length !== expectedReplyLength) {
window.log.info(
@ -235,9 +182,49 @@ class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData>
// Now that we have the successful changes, we need to mark them as pushed and
// generate any config dumps which need to be stored
await buildAndSaveDumpsToDB(changes, thisJobDestination);
this.triggerConfSyncJobDone();
await buildAndSaveDumpsToDB(changes, us);
triggerConfSyncJobDone();
return RunJobResult.Success;
}
class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData> {
constructor({
identifier,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
}: Partial<
Pick<
ConfigurationSyncPersistedData,
'identifier' | 'nextAttemptTimestamp' | 'currentRetry' | 'maxAttempts'
>
>) {
super({
jobType: 'ConfigurationSyncJobType',
identifier: identifier || v4(),
delayBetweenRetries: defaultMsBetweenRetries,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : defaultMaxAttempts,
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now(),
});
}
public async run(): Promise<RunJobResult> {
const start = Date.now();
try {
window.log.debug(`ConfigurationSyncJob starting ${this.persistedData.identifier}`);
const us = UserUtils.getOurPubKeyStrFromCache();
const ed25519Key = await UserUtils.getUserED25519KeyPairBytes();
const conversation = ConvoHub.use().get(us);
if (!us || !conversation || !ed25519Key) {
// we check for ed25519Key because it is needed for authenticated requests
window.log.warn('did not find our own conversation');
return RunJobResult.PermanentFailure;
}
return await pushChangesToUserSwarmIfNeeded();
// eslint-disable-next-line no-useless-catch
} catch (e) {
throw e;
@ -275,10 +262,6 @@ class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData>
private updateLastTickTimestamp() {
lastRunConfigSyncJobTimestamp = Date.now();
}
private triggerConfSyncJobDone() {
window.Whisper.events.trigger(ConfigurationSyncJobDone);
}
}
/**

@ -124,7 +124,6 @@ async function pushChangesToGroupSwarmIfNeeded(groupPk: GroupPubkeyType): Promis
if (isEmpty(singleDestChanges?.messages)) {
return RunJobResult.Success;
}
const oldHashesToDelete = new Set(singleDestChanges.allOldHashes);
const msgs: Array<StoreOnNodeData> = singleDestChanges.messages.map(item => {
return {
@ -136,9 +135,14 @@ async function pushChangesToGroupSwarmIfNeeded(groupPk: GroupPubkeyType): Promis
};
});
const result = await MessageSender.sendEncryptedDataToSnode(msgs, groupPk, oldHashesToDelete);
const result = await MessageSender.sendEncryptedDataToSnode(
msgs,
groupPk,
singleDestChanges.allOldHashes
);
const expectedReplyLength = singleDestChanges.messages.length + (oldHashesToDelete.size ? 1 : 0);
const expectedReplyLength =
singleDestChanges.messages.length + (singleDestChanges.allOldHashes.size ? 1 : 0);
// we do a sequence call here. If we do not have the right expected number of results, consider it a failure
if (!isArray(result) || result.length !== expectedReplyLength) {

@ -6,8 +6,6 @@ import { compact, difference, omit } from 'lodash';
import Long from 'long';
import { UserUtils } from '..';
import { ConfigDumpData } from '../../../data/configDump/configDump';
import { SignalService } from '../../../protobuf';
import { UserConfigKind } from '../../../types/ProtobufKind';
import { assertUnreachable } from '../../../types/sqlSharedTypes';
import {
ConfigWrapperGroupDetailed,
@ -18,12 +16,7 @@ import {
GenericWrapperActions,
MetaGroupWrapperActions,
} from '../../../webworker/workers/browser/libsession_worker_interface';
import { GetNetworkTime } from '../../apis/snode_api/getNetworkTime';
import { SnodeNamespaces } from '../../apis/snode_api/namespaces';
import {
SharedConfigMessage,
SharedUserConfigMessage,
} from '../../messages/outgoing/controlMessage/SharedConfigMessage';
import { SnodeNamespaces, UserConfigNamespaces } from '../../apis/snode_api/namespaces';
import { ed25519Str } from '../../onions/onionPath';
import { PubKey } from '../../types';
import { ConfigurationSync } from '../job_runners/jobs/ConfigurationSyncJob';
@ -35,12 +28,6 @@ const requiredUserVariants: Array<ConfigWrapperUser> = [
'ConvoInfoVolatileConfig',
];
export type OutgoingConfResult<K extends UserConfigKind, T extends SharedConfigMessage<K>> = {
message: T;
namespace: SnodeNamespaces;
oldMessageHashes: Array<string>;
};
async function initializeLibSessionUtilWrappers() {
const keypair = await UserUtils.getUserED25519KeyPairBytes();
if (!keypair || !keypair.privKeyBytes) {
@ -108,17 +95,43 @@ async function initializeLibSessionUtilWrappers() {
// No need to load the meta group wrapper here. We will load them once the SessionInbox is loaded with a redux action
}
async function pendingChangesForUs(): Promise<
Array<OutgoingConfResult<UserConfigKind, SharedUserConfigMessage>>
> {
const us = UserUtils.getOurPubKeyStrFromCache();
export type PendingChangesForUs = {
ciphertext: Uint8Array;
seqno: Long;
namespace: UserConfigNamespaces;
};
type PendingChangesForGroupNonKey = {
data: Uint8Array;
seqno: Long;
namespace: SnodeNamespaces.ClosedGroupInfo | SnodeNamespaces.ClosedGroupMembers;
type: Extract<ConfigWrapperGroupDetailed, 'GroupInfo' | 'GroupMember'>;
};
type PendingChangesForGroupKey = {
data: Uint8Array;
namespace: SnodeNamespaces.ClosedGroupKeys;
type: Extract<ConfigWrapperGroupDetailed, 'GroupKeys'>;
};
export type PendingChangesForGroup = PendingChangesForGroupNonKey | PendingChangesForGroupKey;
type SingleDestinationChanges<T extends PendingChangesForGroup | PendingChangesForUs> = {
messages: Array<T>;
allOldHashes: Set<string>;
};
export type UserSingleDestinationChanges = SingleDestinationChanges<PendingChangesForUs>;
export type GroupSingleDestinationChanges = SingleDestinationChanges<PendingChangesForGroup>;
async function pendingChangesForUs(): Promise<UserSingleDestinationChanges> {
const us = UserUtils.getOurPubKeyStrFromCache();
const dumps = await ConfigDumpData.getAllDumpsWithoutDataFor(us);
// Ensure we always check the required user config types for changes even if there is no dump
// data yet (to deal with first launch cases)
LibSessionUtil.requiredUserVariants.forEach(requiredVariant => {
if (!dumps.find(m => m.publicKey === us && m.variant === requiredVariant)) {
if (!dumps.some(m => m.publicKey === us && m.variant === requiredVariant)) {
dumps.push({
publicKey: us,
variant: requiredVariant,
@ -126,7 +139,7 @@ async function pendingChangesForUs(): Promise<
}
});
const results: Array<OutgoingConfResult<UserConfigKind, SharedUserConfigMessage>> = [];
const results: UserSingleDestinationChanges = { messages: [], allOldHashes: new Set() };
const variantsNeedingPush = new Set<ConfigWrapperUser>();
for (let index = 0; index < dumps.length; index++) {
@ -137,7 +150,6 @@ async function pendingChangesForUs(): Promise<
continue;
}
const needsPush = await GenericWrapperActions.needsPush(variant);
if (!needsPush) {
continue;
}
@ -145,18 +157,15 @@ async function pendingChangesForUs(): Promise<
variantsNeedingPush.add(variant);
const { data, seqno, hashes, namespace } = await GenericWrapperActions.push(variant);
const kind = userVariantToUserKind(variant);
results.push({
message: new SharedUserConfigMessage({
data,
kind,
results.messages.push({
ciphertext: data,
seqno: Long.fromNumber(seqno),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
}),
oldMessageHashes: hashes,
namespace,
});
hashes.forEach(hash => {
results.allOldHashes.add(hash);
});
}
window.log.info(`those variants needs push: "${[...variantsNeedingPush]}"`);
@ -165,28 +174,6 @@ async function pendingChangesForUs(): Promise<
// we link the namespace to the type of what each wrapper needs
type PendingChangesForGroupNonKey = {
data: Uint8Array;
seqno: Long;
timestamp: number;
namespace: SnodeNamespaces.ClosedGroupInfo | SnodeNamespaces.ClosedGroupMembers;
type: Extract<ConfigWrapperGroupDetailed, 'GroupInfo' | 'GroupMember'>;
};
type PendingChangesForGroupKey = {
data: Uint8Array;
timestamp: number;
namespace: SnodeNamespaces.ClosedGroupKeys;
type: Extract<ConfigWrapperGroupDetailed, 'GroupKeys'>;
};
export type PendingChangesForGroup = PendingChangesForGroupNonKey | PendingChangesForGroupKey;
export type GroupSingleDestinationChanges = {
messages: Array<PendingChangesForGroup>;
allOldHashes: Set<string>;
};
async function pendingChangesForGroup(
groupPk: GroupPubkeyType
): Promise<GroupSingleDestinationChanges> {
@ -210,7 +197,6 @@ async function pendingChangesForGroup(
type: 'GroupKeys',
data: groupKeys.data,
namespace: groupKeys.namespace,
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
});
}
@ -219,7 +205,6 @@ async function pendingChangesForGroup(
type: 'GroupInfo',
data: groupInfo.data,
seqno: Long.fromNumber(groupInfo.seqno),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
namespace: groupInfo.namespace,
});
}
@ -228,7 +213,6 @@ async function pendingChangesForGroup(
type: 'GroupMember',
data: groupMember.data,
seqno: Long.fromNumber(groupMember.seqno),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
namespace: groupMember.namespace,
});
}
@ -243,35 +227,19 @@ async function pendingChangesForGroup(
return { messages: results, allOldHashes };
}
// eslint-disable-next-line consistent-return
function userKindToVariant(kind: UserConfigKind): ConfigWrapperUser {
switch (kind) {
case SignalService.SharedConfigMessage.Kind.USER_PROFILE:
function userNamespaceToVariant(namespace: UserConfigNamespaces) {
switch (namespace) {
case SnodeNamespaces.UserProfile:
return 'UserConfig';
case SignalService.SharedConfigMessage.Kind.CONTACTS:
case SnodeNamespaces.UserContacts:
return 'ContactsConfig';
case SignalService.SharedConfigMessage.Kind.USER_GROUPS:
case SnodeNamespaces.UserGroups:
return 'UserGroupsConfig';
case SignalService.SharedConfigMessage.Kind.CONVO_INFO_VOLATILE:
case SnodeNamespaces.ConvoInfoVolatile:
return 'ConvoInfoVolatileConfig';
default:
assertUnreachable(kind, `userKindToVariant: Unsupported variant: "${kind}"`);
}
}
// eslint-disable-next-line consistent-return
function userVariantToUserKind(variant: ConfigWrapperUser) {
switch (variant) {
case 'UserConfig':
return SignalService.SharedConfigMessage.Kind.USER_PROFILE;
case 'ContactsConfig':
return SignalService.SharedConfigMessage.Kind.CONTACTS;
case 'UserGroupsConfig':
return SignalService.SharedConfigMessage.Kind.USER_GROUPS;
case 'ConvoInfoVolatileConfig':
return SignalService.SharedConfigMessage.Kind.CONVO_INFO_VOLATILE;
default:
assertUnreachable(variant, `userVariantToKind: Unsupported kind: "${variant}"`);
assertUnreachable(namespace, `userNamespaceToVariant: Unsupported namespace: "${namespace}"`);
throw new Error('userNamespaceToVariant: Unsupported namespace:');
}
}
@ -304,11 +272,10 @@ async function saveMetaGroupDumpToDb(groupPk: GroupPubkeyType) {
export const LibSessionUtil = {
initializeLibSessionUtilWrappers,
userVariantToUserKind,
userNamespaceToVariant,
requiredUserVariants,
pendingChangesForUs,
pendingChangesForGroup,
userKindToVariant,
markAsPushed,
saveMetaGroupDumpToDb,
};

@ -21,7 +21,6 @@ import {
} from '../../messages/outgoing/controlMessage/ConfigurationMessage';
import { ExpirationTimerUpdateMessage } from '../../messages/outgoing/controlMessage/ExpirationTimerUpdateMessage';
import { MessageRequestResponse } from '../../messages/outgoing/controlMessage/MessageRequestResponse';
import { SharedUserConfigMessage } from '../../messages/outgoing/controlMessage/SharedConfigMessage';
import { UnsendMessage } from '../../messages/outgoing/controlMessage/UnsendMessage';
import {
AttachmentPointerWithUrl,
@ -351,8 +350,7 @@ export type SyncMessageType =
| ExpirationTimerUpdateMessage
| ConfigurationMessage
| MessageRequestResponse
| UnsendMessage
| SharedUserConfigMessage;
| UnsendMessage;
export const buildSyncMessage = (
identifier: string,

@ -145,7 +145,6 @@ describe('GroupSyncJob pendingChangesForGroup', () => {
type: 'GroupKeys',
data: new Uint8Array([3, 2, 1]),
namespace: 13,
timestamp: 1234,
});
// check for the info push content
expect(result.messages[1]).to.be.deep.eq({
@ -153,7 +152,6 @@ describe('GroupSyncJob pendingChangesForGroup', () => {
data: new Uint8Array([1, 2, 3]),
namespace: 12,
seqno: Long.fromInt(pushResults.groupInfo.seqno),
timestamp: 1234,
});
// check for the members pusu content
expect(result.messages[2]).to.be.deep.eq({
@ -161,7 +159,6 @@ describe('GroupSyncJob pendingChangesForGroup', () => {
data: new Uint8Array([1, 2]),
namespace: 14,
seqno: Long.fromInt(pushResults.groupMember.seqno),
timestamp: 1234,
});
});
@ -332,7 +329,7 @@ describe('GroupSyncJob resultsToSuccessfulChange', () => {
];
const request: GroupSingleDestinationChanges = {
allOldHashes: new Set(),
messages: [infoNoData as PendingChangesForGroup, member],
messages: [infoNoData as any as PendingChangesForGroup, member],
};
const results = GroupSync.resultsToSuccessfulChange(batchResults, request);
expect(results).to.be.deep.eq([

@ -1,20 +0,0 @@
import { SignalService } from '../protobuf';
import { PickEnum } from './Enums';
export type UserConfigKind = PickEnum<
SignalService.SharedConfigMessage.Kind,
| SignalService.SharedConfigMessage.Kind.USER_PROFILE
| SignalService.SharedConfigMessage.Kind.CONTACTS
| SignalService.SharedConfigMessage.Kind.USER_GROUPS
| SignalService.SharedConfigMessage.Kind.CONVO_INFO_VOLATILE
>;
export function isUserKind(kind: SignalService.SharedConfigMessage.Kind): kind is UserConfigKind {
const Kind = SignalService.SharedConfigMessage.Kind;
return (
kind === Kind.USER_PROFILE ||
kind === Kind.CONTACTS ||
kind === Kind.USER_GROUPS ||
kind === Kind.CONVO_INFO_VOLATILE
);
}

@ -55,6 +55,7 @@ type GenericWrapperActionsCalls = {
) => Promise<void>;
confirmPushed: GenericWrapperActionsCall<ConfigWrapperUser, 'confirmPushed'>;
dump: GenericWrapperActionsCall<ConfigWrapperUser, 'dump'>;
makeDump: GenericWrapperActionsCall<ConfigWrapperUser, 'makeDump'>;
merge: GenericWrapperActionsCall<ConfigWrapperUser, 'merge'>;
needsDump: GenericWrapperActionsCall<ConfigWrapperUser, 'needsDump'>;
needsPush: GenericWrapperActionsCall<ConfigWrapperUser, 'needsPush'>;
@ -77,6 +78,10 @@ export const GenericWrapperActions: GenericWrapperActionsCalls = {
>,
dump: async (wrapperId: ConfigWrapperUser) =>
callLibSessionWorker([wrapperId, 'dump']) as ReturnType<GenericWrapperActionsCalls['dump']>,
makeDump: async (wrapperId: ConfigWrapperUser) =>
callLibSessionWorker([wrapperId, 'makeDump']) as ReturnType<
GenericWrapperActionsCalls['makeDump']
>,
merge: async (wrapperId: ConfigWrapperUser, toMerge: Array<MergeSingle>) =>
callLibSessionWorker([wrapperId, 'merge', toMerge]) as ReturnType<
GenericWrapperActionsCalls['merge']
@ -97,18 +102,26 @@ export const GenericWrapperActions: GenericWrapperActionsCalls = {
>,
};
export const UserConfigWrapperActions: UserConfigWrapperActionsCalls = {
function createBaseActionsFor(wrapperType: ConfigWrapperUser) {
return {
/* Reuse the GenericWrapperActions with the UserConfig argument */
init: async (ed25519Key: Uint8Array, dump: Uint8Array | null) =>
GenericWrapperActions.init('UserConfig', ed25519Key, dump),
GenericWrapperActions.init(wrapperType, ed25519Key, dump),
confirmPushed: async (seqno: number, hash: string) =>
GenericWrapperActions.confirmPushed('UserConfig', seqno, hash),
dump: async () => GenericWrapperActions.dump('UserConfig'),
merge: async (toMerge: Array<MergeSingle>) => GenericWrapperActions.merge('UserConfig', toMerge),
needsDump: async () => GenericWrapperActions.needsDump('UserConfig'),
needsPush: async () => GenericWrapperActions.needsPush('UserConfig'),
push: async () => GenericWrapperActions.push('UserConfig'),
currentHashes: async () => GenericWrapperActions.currentHashes('UserConfig'),
GenericWrapperActions.confirmPushed(wrapperType, seqno, hash),
dump: async () => GenericWrapperActions.dump(wrapperType),
makeDump: async () => GenericWrapperActions.makeDump(wrapperType),
merge: async (toMerge: Array<MergeSingle>) => GenericWrapperActions.merge(wrapperType, toMerge),
needsDump: async () => GenericWrapperActions.needsDump(wrapperType),
needsPush: async () => GenericWrapperActions.needsPush(wrapperType),
push: async () => GenericWrapperActions.push(wrapperType),
currentHashes: async () => GenericWrapperActions.currentHashes(wrapperType),
};
}
export const UserConfigWrapperActions: UserConfigWrapperActionsCalls = {
/* Reuse the GenericWrapperActions with the UserConfig argument */
...createBaseActionsFor('UserConfig'),
/** UserConfig wrapper specific actions */
getUserInfo: async () =>
@ -144,17 +157,7 @@ export const UserConfigWrapperActions: UserConfigWrapperActionsCalls = {
export const ContactsWrapperActions: ContactsWrapperActionsCalls = {
/* Reuse the GenericWrapperActions with the ContactConfig argument */
init: async (ed25519Key: Uint8Array, dump: Uint8Array | null) =>
GenericWrapperActions.init('ContactsConfig', ed25519Key, dump),
confirmPushed: async (seqno: number, hash: string) =>
GenericWrapperActions.confirmPushed('ContactsConfig', seqno, hash),
dump: async () => GenericWrapperActions.dump('ContactsConfig'),
merge: async (toMerge: Array<MergeSingle>) =>
GenericWrapperActions.merge('ContactsConfig', toMerge),
needsDump: async () => GenericWrapperActions.needsDump('ContactsConfig'),
needsPush: async () => GenericWrapperActions.needsPush('ContactsConfig'),
push: async () => GenericWrapperActions.push('ContactsConfig'),
currentHashes: async () => GenericWrapperActions.currentHashes('ContactsConfig'),
...createBaseActionsFor('ContactsConfig'),
/** ContactsConfig wrapper specific actions */
get: async (pubkeyHex: string) =>
@ -178,18 +181,8 @@ export const ContactsWrapperActions: ContactsWrapperActionsCalls = {
};
export const UserGroupsWrapperActions: UserGroupsWrapperActionsCalls = {
/* Reuse the GenericWrapperActions with the ContactConfig argument */
init: async (ed25519Key: Uint8Array, dump: Uint8Array | null) =>
GenericWrapperActions.init('UserGroupsConfig', ed25519Key, dump),
confirmPushed: async (seqno: number, hash: string) =>
GenericWrapperActions.confirmPushed('UserGroupsConfig', seqno, hash),
dump: async () => GenericWrapperActions.dump('UserGroupsConfig'),
merge: async (toMerge: Array<MergeSingle>) =>
GenericWrapperActions.merge('UserGroupsConfig', toMerge),
needsDump: async () => GenericWrapperActions.needsDump('UserGroupsConfig'),
needsPush: async () => GenericWrapperActions.needsPush('UserGroupsConfig'),
push: async () => GenericWrapperActions.push('UserGroupsConfig'),
currentHashes: async () => GenericWrapperActions.currentHashes('UserGroupsConfig'),
/* Reuse the GenericWrapperActions with the UserGroupsConfig argument */
...createBaseActionsFor('UserGroupsConfig'),
/** UserGroups wrapper specific actions */
@ -275,18 +268,8 @@ export const UserGroupsWrapperActions: UserGroupsWrapperActionsCalls = {
};
export const ConvoInfoVolatileWrapperActions: ConvoInfoVolatileWrapperActionsCalls = {
/* Reuse the GenericWrapperActions with the ContactConfig argument */
init: async (ed25519Key: Uint8Array, dump: Uint8Array | null) =>
GenericWrapperActions.init('ConvoInfoVolatileConfig', ed25519Key, dump),
confirmPushed: async (seqno: number, hash: string) =>
GenericWrapperActions.confirmPushed('ConvoInfoVolatileConfig', seqno, hash),
dump: async () => GenericWrapperActions.dump('ConvoInfoVolatileConfig'),
merge: async (toMerge: Array<MergeSingle>) =>
GenericWrapperActions.merge('ConvoInfoVolatileConfig', toMerge),
needsDump: async () => GenericWrapperActions.needsDump('ConvoInfoVolatileConfig'),
needsPush: async () => GenericWrapperActions.needsPush('ConvoInfoVolatileConfig'),
push: async () => GenericWrapperActions.push('ConvoInfoVolatileConfig'),
currentHashes: async () => GenericWrapperActions.currentHashes('ConvoInfoVolatileConfig'),
/* Reuse the GenericWrapperActions with the ConvoInfoVolatileConfig argument */
...createBaseActionsFor('ConvoInfoVolatileConfig'),
/** ConvoInfoVolatile wrapper specific actions */
// 1o1
@ -389,9 +372,9 @@ export const MetaGroupWrapperActions: MetaGroupWrapperActionsCalls = {
callLibSessionWorker([`MetaGroupConfig-${groupPk}`, 'metaDump']) as Promise<
ReturnType<MetaGroupWrapperActionsCalls['metaDump']>
>,
metaDebugDump: async (groupPk: GroupPubkeyType) =>
callLibSessionWorker([`MetaGroupConfig-${groupPk}`, 'metaDebugDump']) as Promise<
ReturnType<MetaGroupWrapperActionsCalls['metaDebugDump']>
metaMakeDump: async (groupPk: GroupPubkeyType) =>
callLibSessionWorker([`MetaGroupConfig-${groupPk}`, 'metaMakeDump']) as Promise<
ReturnType<MetaGroupWrapperActionsCalls['metaMakeDump']>
>,
metaConfirmPushed: async (
groupPk: GroupPubkeyType,

Loading…
Cancel
Save