feat: add GroupInviteJob

pull/2963/head
Audric Ackermann 2 years ago
parent b8876ebbfe
commit f17beaf852

@ -1,32 +1,32 @@
import _ from 'lodash';
import ReactDOM from 'react-dom';
import Backbone from 'backbone'; import Backbone from 'backbone';
import _, { toPairs } from 'lodash';
import ReactDOM from 'react-dom';
import React from 'react';
import nativeEmojiData from '@emoji-mart/data'; import nativeEmojiData from '@emoji-mart/data';
import React from 'react';
import { MessageModel } from '../models/message';
import { isMacOS } from '../OS'; import { isMacOS } from '../OS';
import { SessionInboxView } from '../components/SessionInboxView';
import { SessionRegistrationView } from '../components/registration/SessionRegistrationView';
import { Data } from '../data/data';
import { OpenGroupData } from '../data/opengroups';
import { SettingsKey } from '../data/settings-key';
import { MessageModel } from '../models/message';
import { deleteAllLogs } from '../node/logs';
import { queueAllCached } from '../receiver/receiver'; import { queueAllCached } from '../receiver/receiver';
import { loadKnownBlindedKeys } from '../session/apis/open_group_api/sogsv3/knownBlindedkeys';
import { ConvoHub } from '../session/conversations'; import { ConvoHub } from '../session/conversations';
import { AttachmentDownloads, ToastUtils } from '../session/utils'; import { AttachmentDownloads, ToastUtils } from '../session/utils';
import { getOurPubKeyStrFromCache } from '../session/utils/User'; import { getOurPubKeyStrFromCache } from '../session/utils/User';
import { runners } from '../session/utils/job_runners/JobRunner';
import { LibSessionUtil } from '../session/utils/libsession/libsession_utils';
import { switchPrimaryColorTo } from '../themes/switchPrimaryColor';
import { BlockedNumberController } from '../util'; import { BlockedNumberController } from '../util';
import { initialiseEmojiData } from '../util/emoji';
import { ExpirationTimerOptions } from '../util/expiringMessages'; import { ExpirationTimerOptions } from '../util/expiringMessages';
import { Notifications } from '../util/notifications'; import { Notifications } from '../util/notifications';
import { Registration } from '../util/registration'; import { Registration } from '../util/registration';
import { isSignInByLinking, Storage } from '../util/storage'; import { Storage, isSignInByLinking } from '../util/storage';
import { Data } from '../data/data';
import { SessionRegistrationView } from '../components/registration/SessionRegistrationView';
import { SessionInboxView } from '../components/SessionInboxView';
import { deleteAllLogs } from '../node/logs';
import { OpenGroupData } from '../data/opengroups';
import { loadKnownBlindedKeys } from '../session/apis/open_group_api/sogsv3/knownBlindedkeys';
import { initialiseEmojiData } from '../util/emoji';
import { switchPrimaryColorTo } from '../themes/switchPrimaryColor';
import { LibSessionUtil } from '../session/utils/libsession/libsession_utils';
import { runners } from '../session/utils/job_runners/JobRunner';
import { SettingsKey } from '../data/settings-key';
// Globally disable drag and drop // Globally disable drag and drop
document.body.addEventListener( document.body.addEventListener(
@ -112,12 +112,13 @@ function mapOldThemeToNew(theme: string) {
async function startJobRunners() { async function startJobRunners() {
// start the job runners // start the job runners
await runners.avatarDownloadRunner.loadJobsFromDb(); const pairs = toPairs(runners);
runners.avatarDownloadRunner.startProcessing(); for (let index = 0; index < pairs.length; index++) {
await runners.userSyncRunner.loadJobsFromDb(); const runner = pairs[index][1];
runners.userSyncRunner.startProcessing(); // eslint-disable-next-line no-await-in-loop
await runners.groupSyncRunner.loadJobsFromDb(); await runner.loadJobsFromDb();
runners.groupSyncRunner.startProcessing(); runner.startProcessing();
}
} }
// We need this 'first' check because we don't want to start the app up any other time // We need this 'first' check because we don't want to start the app up any other time

@ -1,11 +1,11 @@
import { import {
GroupMemberGet,
GroupPubkeyType, GroupPubkeyType,
PubkeyType,
Uint8ArrayLen100, Uint8ArrayLen100,
Uint8ArrayLen64, Uint8ArrayLen64,
UserGroupsGet, UserGroupsGet,
} from 'libsession_util_nodejs'; } from 'libsession_util_nodejs';
import { compact, isEmpty } from 'lodash'; import { isEmpty } from 'lodash';
import { MetaGroupWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface'; import { MetaGroupWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface';
import { getSodiumRenderer } from '../../../crypto/MessageEncrypter'; import { getSodiumRenderer } from '../../../crypto/MessageEncrypter';
import { GroupUpdateInviteMessage } from '../../../messages/outgoing/controlMessage/group_v2/to_user/GroupUpdateInviteMessage'; import { GroupUpdateInviteMessage } from '../../../messages/outgoing/controlMessage/group_v2/to_user/GroupUpdateInviteMessage';
@ -18,13 +18,13 @@ import { WithMessagesHashes, WithShortenOrExtend, WithTimestamp } from '../types
import { SignatureShared } from './signatureShared'; import { SignatureShared } from './signatureShared';
import { SnodeSignatureResult } from './snodeSignatures'; import { SnodeSignatureResult } from './snodeSignatures';
async function getGroupInvitesMessages({ async function getGroupInviteMessage({
groupName, groupName,
membersFromWrapper, member,
secretKey, secretKey,
groupPk, groupPk,
}: { }: {
membersFromWrapper: Array<GroupMemberGet>; member: PubkeyType;
groupName: string; groupName: string;
secretKey: Uint8ArrayLen64; // len 64 secretKey: Uint8ArrayLen64; // len 64
groupPk: GroupPubkeyType; groupPk: GroupPubkeyType;
@ -32,11 +32,8 @@ async function getGroupInvitesMessages({
const sodium = await getSodiumRenderer(); const sodium = await getSodiumRenderer();
const timestamp = GetNetworkTime.getNowWithNetworkOffset(); const timestamp = GetNetworkTime.getNowWithNetworkOffset();
const inviteDetails = compact(
await Promise.all(
membersFromWrapper.map(async ({ pubkeyHex: member }) => {
if (UserUtils.isUsFromCache(member)) { if (UserUtils.isUsFromCache(member)) {
return null; throw new Error('getGroupInviteMessage: we cannot invite ourselves');
} }
const tosign = `INVITE${member}${timestamp}`; const tosign = `INVITE${member}${timestamp}`;
@ -51,12 +48,7 @@ async function getGroupInvitesMessages({
adminSignature, adminSignature,
memberAuthData, memberAuthData,
}); });
return invite;
return { member, invite };
})
)
);
return inviteDetails;
} }
type ParamsShared = { type ParamsShared = {
@ -220,6 +212,6 @@ async function generateUpdateExpiryGroupSignature({
export const SnodeGroupSignature = { export const SnodeGroupSignature = {
generateUpdateExpiryGroupSignature, generateUpdateExpiryGroupSignature,
getGroupInvitesMessages, getGroupInviteMessage,
getSnodeGroupSignature, getSnodeGroupSignature,
}; };

@ -5,6 +5,7 @@ import { timeout } from '../Promise';
import { persistedJobFromData } from './JobDeserialization'; import { persistedJobFromData } from './JobDeserialization';
import { import {
AvatarDownloadPersistedData, AvatarDownloadPersistedData,
GroupInvitePersistedData,
GroupSyncPersistedData, GroupSyncPersistedData,
PersistedJob, PersistedJob,
RunJobResult, RunJobResult,
@ -358,9 +359,14 @@ const avatarDownloadRunner = new PersistedJobRunner<AvatarDownloadPersistedData>
'AvatarDownloadJob', 'AvatarDownloadJob',
null null
); );
const groupInviteJobRunner = new PersistedJobRunner<GroupInvitePersistedData>(
'GroupInviteJob',
null
);
export const runners = { export const runners = {
userSyncRunner, userSyncRunner,
groupSyncRunner, groupSyncRunner,
avatarDownloadRunner, avatarDownloadRunner,
groupInviteJobRunner,
}; };

@ -1,9 +1,11 @@
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { cloneDeep, isEmpty } from 'lodash'; import { cloneDeep, isEmpty } from 'lodash';
export type PersistedJobType = export type PersistedJobType =
| 'UserSyncJobType' | 'UserSyncJobType'
| 'GroupSyncJobType' | 'GroupSyncJobType'
| 'AvatarDownloadJobType' | 'AvatarDownloadJobType'
| 'GroupInviteJobType'
| 'FakeSleepForJobType' | 'FakeSleepForJobType'
| 'FakeSleepForJobMultiType'; | 'FakeSleepForJobMultiType';
@ -32,6 +34,12 @@ export interface AvatarDownloadPersistedData extends PersistedJobData {
conversationId: string; conversationId: string;
} }
export interface GroupInvitePersistedData extends PersistedJobData {
jobType: 'GroupInviteJobType';
groupPk: GroupPubkeyType;
member: PubkeyType;
}
export interface UserSyncPersistedData extends PersistedJobData { export interface UserSyncPersistedData extends PersistedJobData {
jobType: 'UserSyncJobType'; jobType: 'UserSyncJobType';
} }
@ -44,7 +52,8 @@ export type TypeOfPersistedData =
| AvatarDownloadPersistedData | AvatarDownloadPersistedData
| FakeSleepJobData | FakeSleepJobData
| FakeSleepForMultiJobData | FakeSleepForMultiJobData
| GroupSyncPersistedData; | GroupSyncPersistedData
| GroupInvitePersistedData;
export type AddJobCheckReturn = 'skipAddSameJobPresent' | 'sameJobDataAlreadyInQueue' | null; export type AddJobCheckReturn = 'skipAddSameJobPresent' | 'sameJobDataAlreadyInQueue' | null;

@ -0,0 +1,144 @@
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { isNumber } from 'lodash';
import { v4 } from 'uuid';
import { UserUtils } from '../..';
import { UserGroupsWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface';
import { SnodeNamespaces } from '../../../apis/snode_api/namespaces';
import { SnodeGroupSignature } from '../../../apis/snode_api/signature/groupSignature';
import { getMessageQueue } from '../../../sending';
import { PubKey } from '../../../types';
import { runners } from '../JobRunner';
import {
AddJobCheckReturn,
GroupInvitePersistedData,
PersistedJob,
RunJobResult,
} from '../PersistedJob';
const defaultMsBetweenRetries = 10000;
const defaultMaxAttemps = 1;
type JobExtraArgs = {
groupPk: GroupPubkeyType;
member: PubkeyType;
};
export function shouldAddGroupInviteJob(args: JobExtraArgs) {
if (UserUtils.isUsFromCache(args.member)) {
return false;
}
return true;
}
async function addGroupInviteJob({ groupPk, member }: JobExtraArgs) {
if (shouldAddGroupInviteJob({ groupPk, member })) {
const groupInviteJob = new GroupInviteJob({
groupPk,
member,
nextAttemptTimestamp: Date.now(),
});
window.log.debug(`addGroupInviteJob: adding group invite for ${groupPk}:${member} `);
await runners.groupInviteJobRunner.addJob(groupInviteJob);
}
}
class GroupInviteJob extends PersistedJob<GroupInvitePersistedData> {
constructor({
groupPk,
member,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
identifier,
}: Pick<GroupInvitePersistedData, 'groupPk' | 'member'> &
Partial<
Pick<
GroupInvitePersistedData,
| 'nextAttemptTimestamp'
| 'identifier'
| 'maxAttempts'
| 'delayBetweenRetries'
| 'currentRetry'
>
>) {
super({
jobType: 'GroupInviteJobType',
identifier: identifier || v4(),
member,
groupPk,
delayBetweenRetries: defaultMsBetweenRetries,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : defaultMaxAttemps,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + defaultMsBetweenRetries,
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
});
}
public async run(): Promise<RunJobResult> {
const { groupPk, member } = this.persistedData;
window.log.info(
`running job ${this.persistedData.jobType} with groupPk:"${groupPk}" member: ${member} id:"${this.persistedData.identifier}" `
);
const group = await UserGroupsWrapperActions.getGroup(this.persistedData.groupPk);
if (!group || !group.secretKey || !group.name) {
window.log.warn(`GroupInviteJob: Did not find group in wrapper or no valid info in wrapper`);
return RunJobResult.PermanentFailure;
}
if (UserUtils.isUsFromCache(member)) {
return RunJobResult.Success; // nothing to do for us, we get the update from our user's libsession wrappers
}
const inviteDetails = await SnodeGroupSignature.getGroupInviteMessage({
groupName: group.name,
member,
secretKey: group.secretKey,
groupPk,
});
if (!inviteDetails) {
window.log.warn(`GroupInviteJob: Did not find group in wrapper or no valid info in wrapper`);
return RunJobResult.PermanentFailure;
}
await getMessageQueue().sendToPubKeyNonDurably({
message: inviteDetails,
namespace: SnodeNamespaces.Default,
pubkey: PubKey.cast(member),
});
// return true so this job is marked as a success and we don't need to retry it
return RunJobResult.Success;
}
public serializeJob(): GroupInvitePersistedData {
return super.serializeBase();
}
public nonRunningJobsToRemove(_jobs: Array<GroupInvitePersistedData>) {
return [];
}
public addJobCheck(jobs: Array<GroupInvitePersistedData>): AddJobCheckReturn {
// avoid adding the same job if the exact same one is already planned
const hasSameJob = jobs.some(j => {
return j.groupPk === this.persistedData.groupPk && j.member === this.persistedData.member;
});
if (hasSameJob) {
return 'skipAddSameJobPresent';
}
return null;
}
public getJobTimeoutMs(): number {
return 15000;
}
}
export const GroupInvite = {
GroupInviteJob,
addGroupInviteJob,
};

@ -3,4 +3,5 @@ export type JobRunnerType =
| 'GroupSyncJob' | 'GroupSyncJob'
| 'FakeSleepForJob' | 'FakeSleepForJob'
| 'FakeSleepForMultiJob' | 'FakeSleepForMultiJob'
| 'AvatarDownloadJob'; | 'AvatarDownloadJob'
| 'GroupInviteJob';

@ -12,15 +12,13 @@ import { ConfigDumpData } from '../../data/configDump/configDump';
import { ConversationTypeEnum } from '../../models/conversationAttributes'; import { ConversationTypeEnum } from '../../models/conversationAttributes';
import { HexString } from '../../node/hexStrings'; import { HexString } from '../../node/hexStrings';
import { getSwarmPollingInstance } from '../../session/apis/snode_api'; import { getSwarmPollingInstance } from '../../session/apis/snode_api';
import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces';
import { SnodeGroupSignature } from '../../session/apis/snode_api/signature/groupSignature';
import { ConvoHub } from '../../session/conversations'; import { ConvoHub } from '../../session/conversations';
import { getMessageQueue } from '../../session/sending';
import { PubKey } from '../../session/types'; import { PubKey } from '../../session/types';
import { UserUtils } from '../../session/utils'; import { UserUtils } from '../../session/utils';
import { getUserED25519KeyPairBytes } from '../../session/utils/User'; import { getUserED25519KeyPairBytes } from '../../session/utils/User';
import { PreConditionFailed } from '../../session/utils/errors'; import { PreConditionFailed } from '../../session/utils/errors';
import { RunJobResult } from '../../session/utils/job_runners/PersistedJob'; import { RunJobResult } from '../../session/utils/job_runners/PersistedJob';
import { GroupInvite } from '../../session/utils/job_runners/jobs/GroupInviteJob';
import { GroupSync } from '../../session/utils/job_runners/jobs/GroupSyncJob'; import { GroupSync } from '../../session/utils/job_runners/jobs/GroupSyncJob';
import { stringify, toFixedUint8ArrayOfLength } from '../../types/sqlSharedTypes'; import { stringify, toFixedUint8ArrayOfLength } from '../../types/sqlSharedTypes';
import { import {
@ -134,7 +132,6 @@ const initNewGroupInWrapper = createAsyncThunk(
// to include them and marks the corresponding wrappers as dirty // to include them and marks the corresponding wrappers as dirty
await MetaGroupWrapperActions.keyRekey(groupPk); await MetaGroupWrapperActions.keyRekey(groupPk);
const convo = await ConvoHub.use().getOrCreateAndWait(groupPk, ConversationTypeEnum.GROUPV2); const convo = await ConvoHub.use().getOrCreateAndWait(groupPk, ConversationTypeEnum.GROUPV2);
await convo.setIsApproved(true, false); await convo.setIsApproved(true, false);
const result = await GroupSync.pushChangesToGroupSwarmIfNeeded(groupPk); const result = await GroupSync.pushChangesToGroupSwarmIfNeeded(groupPk);
@ -148,23 +145,17 @@ const initNewGroupInWrapper = createAsyncThunk(
await convo.commit(); await convo.commit();
convo.updateLastMessage(); convo.updateLastMessage();
dispatch(resetOverlayMode()); dispatch(resetOverlayMode());
await openConversationWithMessages({ conversationKey: groupPk, messageId: null });
// everything is setup for this group, we now need to send the invites to every members, privately and asynchronously, and gracefully handle errors with toasts.
const inviteDetails = await SnodeGroupSignature.getGroupInvitesMessages({ // Everything is setup for this group, we now need to send the invites to each members,
groupName, // privately and asynchronously, and gracefully handle errors with toasts.
membersFromWrapper, // Let's do all of this part of a job to handle app crashes and make sure we
secretKey: groupSecretKey, // can update the groupwrapper with a failed state if a message fails to be sent.
groupPk, for (let index = 0; index < membersFromWrapper.length; index++) {
}); const member = membersFromWrapper[index];
await GroupInvite.addGroupInviteJob({ member: member.pubkeyHex, groupPk });
}
void inviteDetails.map(async detail => { await openConversationWithMessages({ conversationKey: groupPk, messageId: null });
await getMessageQueue().sendToPubKeyNonDurably({
message: detail.invite,
namespace: SnodeNamespaces.Default,
pubkey: PubKey.cast(detail.member),
});
});
return { groupPk: newGroup.pubkeyHex, infos, members: membersFromWrapper }; return { groupPk: newGroup.pubkeyHex, infos, members: membersFromWrapper };
} catch (e) { } catch (e) {

Loading…
Cancel
Save