You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
239 lines
8.8 KiB
TypeScript
239 lines
8.8 KiB
TypeScript
|
2 years ago
|
/* eslint-disable no-await-in-loop */
|
||
|
|
import { GroupPubkeyType } from 'libsession_util_nodejs';
|
||
|
2 years ago
|
import { isArray, isEmpty, isNumber } from 'lodash';
|
||
|
2 years ago
|
import { UserUtils } from '../..';
|
||
|
2 years ago
|
import { assertUnreachable } from '../../../../types/sqlSharedTypes';
|
||
|
2 years ago
|
import { isSignInByLinking } from '../../../../util/storage';
|
||
|
2 years ago
|
import { MetaGroupWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface';
|
||
|
2 years ago
|
import { StoreOnNodeData } from '../../../apis/snode_api/SnodeRequestTypes';
|
||
|
2 years ago
|
import { GetNetworkTime } from '../../../apis/snode_api/getNetworkTime';
|
||
|
|
import { SnodeNamespaces } from '../../../apis/snode_api/namespaces';
|
||
|
|
import { TTL_DEFAULT } from '../../../constants';
|
||
|
2 years ago
|
import { ConvoHub } from '../../../conversations';
|
||
|
2 years ago
|
import { MessageSender } from '../../../sending/MessageSender';
|
||
|
|
import { PubKey } from '../../../types';
|
||
|
|
import { allowOnlyOneAtATime } from '../../Promise';
|
||
|
2 years ago
|
import { GroupSuccessfulChange, LibSessionUtil } from '../../libsession/libsession_utils';
|
||
|
2 years ago
|
import { runners } from '../JobRunner';
|
||
|
|
import {
|
||
|
|
AddJobCheckReturn,
|
||
|
|
GroupSyncPersistedData,
|
||
|
|
PersistedJob,
|
||
|
|
RunJobResult,
|
||
|
|
} from '../PersistedJob';
|
||
|
|
|
||
|
|
const defaultMsBetweenRetries = 15000; // a long time between retries, to avoid running multiple jobs at the same time, when one was postponed at the same time as one already planned (5s)
|
||
|
|
const defaultMaxAttempts = 2;
|
||
|
|
|
||
|
|
/**
|
||
|
|
* We want to run each of those jobs at least 3seconds apart.
|
||
|
|
* So every time one of that job finishes, update this timestamp, so we know when adding a new job, what is the next minimun date to run it.
|
||
|
|
*/
|
||
|
2 years ago
|
const lastRunConfigSyncJobTimestamps = new Map<string, number | null>();
|
||
|
2 years ago
|
|
||
|
2 years ago
|
async function confirmPushedAndDump(
|
||
|
2 years ago
|
changes: Array<GroupSuccessfulChange>,
|
||
|
2 years ago
|
groupPk: GroupPubkeyType
|
||
|
|
): Promise<void> {
|
||
|
|
const toConfirm: Parameters<typeof MetaGroupWrapperActions.metaConfirmPushed> = [
|
||
|
|
groupPk,
|
||
|
2 years ago
|
{ groupInfo: null, groupMember: null },
|
||
|
2 years ago
|
];
|
||
|
|
for (let i = 0; i < changes.length; i++) {
|
||
|
|
const change = changes[i];
|
||
|
2 years ago
|
const namespace = change.pushed.namespace;
|
||
|
|
switch (namespace) {
|
||
|
2 years ago
|
case SnodeNamespaces.ClosedGroupInfo: {
|
||
|
2 years ago
|
if (change.pushed.seqno) {
|
||
|
2 years ago
|
toConfirm[1].groupInfo = [change.pushed.seqno.toNumber(), change.updatedHash];
|
||
|
|
}
|
||
|
2 years ago
|
break;
|
||
|
|
}
|
||
|
2 years ago
|
case SnodeNamespaces.ClosedGroupMembers: {
|
||
|
|
toConfirm[1].groupMember = [change.pushed.seqno.toNumber(), change.updatedHash];
|
||
|
2 years ago
|
break;
|
||
|
|
}
|
||
|
2 years ago
|
case SnodeNamespaces.ClosedGroupKeys: {
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
default:
|
||
|
|
assertUnreachable(namespace, 'buildAndSaveDumpsToDB assertUnreachable');
|
||
|
2 years ago
|
}
|
||
|
|
}
|
||
|
2 years ago
|
|
||
|
2 years ago
|
await MetaGroupWrapperActions.metaConfirmPushed(...toConfirm);
|
||
|
2 years ago
|
return LibSessionUtil.saveDumpsToDb(groupPk);
|
||
|
2 years ago
|
}
|
||
|
|
|
||
|
2 years ago
|
async function pushChangesToGroupSwarmIfNeeded(groupPk: GroupPubkeyType): Promise<RunJobResult> {
|
||
|
|
// save the dumps to DB even before trying to push them, so at least we have an up to date dumps in the DB in case of crash, no network etc
|
||
|
2 years ago
|
await LibSessionUtil.saveDumpsToDb(groupPk);
|
||
|
|
const changesToPush = await LibSessionUtil.pendingChangesForGroup(groupPk);
|
||
|
2 years ago
|
// If there are no pending changes then the job can just complete (next time something
|
||
|
|
// is updated we want to try and run immediately so don't scuedule another run in this case)
|
||
|
2 years ago
|
if (isEmpty(changesToPush?.messages)) {
|
||
|
2 years ago
|
return RunJobResult.Success;
|
||
|
|
}
|
||
|
|
|
||
|
2 years ago
|
const msgs: Array<StoreOnNodeData> = changesToPush.messages.map(item => {
|
||
|
2 years ago
|
return {
|
||
|
|
namespace: item.namespace,
|
||
|
|
pubkey: groupPk,
|
||
|
|
networkTimestamp: GetNetworkTime.getNowWithNetworkOffset(),
|
||
|
|
ttl: TTL_DEFAULT.TTL_CONFIG,
|
||
|
2 years ago
|
data: item.ciphertext,
|
||
|
2 years ago
|
};
|
||
|
|
});
|
||
|
|
|
||
|
2 years ago
|
const result = await MessageSender.sendEncryptedDataToSnode(
|
||
|
|
msgs,
|
||
|
|
groupPk,
|
||
|
2 years ago
|
changesToPush.allOldHashes
|
||
|
2 years ago
|
);
|
||
|
2 years ago
|
|
||
|
2 years ago
|
const expectedReplyLength =
|
||
|
2 years ago
|
changesToPush.messages.length + (changesToPush.allOldHashes.size ? 1 : 0);
|
||
|
2 years ago
|
|
||
|
2 years ago
|
// we do a sequence call here. If we do not have the right expected number of results, consider it a failure
|
||
|
|
if (!isArray(result) || result.length !== expectedReplyLength) {
|
||
|
|
window.log.info(
|
||
|
|
`GroupSyncJob: unexpected result length: expected ${expectedReplyLength} but got ${result?.length}`
|
||
|
|
);
|
||
|
2 years ago
|
|
||
|
2 years ago
|
// this might be a 421 error (already handled) so let's retry this request a little bit later
|
||
|
|
return RunJobResult.RetryJobIfPossible;
|
||
|
|
}
|
||
|
|
|
||
|
2 years ago
|
const changes = LibSessionUtil.batchResultsToGroupSuccessfulChange(result, changesToPush);
|
||
|
2 years ago
|
if (isEmpty(changes)) {
|
||
|
|
return RunJobResult.RetryJobIfPossible;
|
||
|
|
}
|
||
|
|
// Now that we have the successful changes, we need to mark them as pushed and
|
||
|
|
// generate any config dumps which need to be stored
|
||
|
|
|
||
|
2 years ago
|
await confirmPushedAndDump(changes, groupPk);
|
||
|
2 years ago
|
return RunJobResult.Success;
|
||
|
|
}
|
||
|
|
|
||
|
2 years ago
|
class GroupSyncJob extends PersistedJob<GroupSyncPersistedData> {
|
||
|
|
constructor({
|
||
|
|
identifier, // this has to be the pubkey to which we
|
||
|
|
nextAttemptTimestamp,
|
||
|
|
maxAttempts,
|
||
|
|
currentRetry,
|
||
|
|
}: Pick<GroupSyncPersistedData, 'identifier'> &
|
||
|
|
Partial<
|
||
|
|
Pick<GroupSyncPersistedData, 'nextAttemptTimestamp' | 'currentRetry' | 'maxAttempts'>
|
||
|
|
>) {
|
||
|
|
super({
|
||
|
|
jobType: 'GroupSyncJobType',
|
||
|
|
identifier,
|
||
|
|
delayBetweenRetries: defaultMsBetweenRetries,
|
||
|
|
maxAttempts: isNumber(maxAttempts) ? maxAttempts : defaultMaxAttempts,
|
||
|
|
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
|
||
|
|
nextAttemptTimestamp: nextAttemptTimestamp || Date.now(),
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
public async run(): Promise<RunJobResult> {
|
||
|
|
const start = Date.now();
|
||
|
|
|
||
|
|
try {
|
||
|
|
const thisJobDestination = this.persistedData.identifier;
|
||
|
2 years ago
|
if (!PubKey.isClosedGroupV2(thisJobDestination)) {
|
||
|
|
return RunJobResult.PermanentFailure;
|
||
|
|
}
|
||
|
2 years ago
|
|
||
|
|
window.log.debug(`GroupSyncJob starting ${thisJobDestination}`);
|
||
|
|
|
||
|
|
const us = UserUtils.getOurPubKeyStrFromCache();
|
||
|
|
const ed25519Key = await UserUtils.getUserED25519KeyPairBytes();
|
||
|
2 years ago
|
const conversation = ConvoHub.use().get(us);
|
||
|
2 years ago
|
if (!us || !conversation || !ed25519Key) {
|
||
|
|
// we check for ed25519Key because it is needed for authenticated requests
|
||
|
|
window.log.warn('did not find our own conversation');
|
||
|
|
return RunJobResult.PermanentFailure;
|
||
|
|
}
|
||
|
|
|
||
|
2 years ago
|
// return await so we catch exceptions in here
|
||
|
|
return await GroupSync.pushChangesToGroupSwarmIfNeeded(thisJobDestination);
|
||
|
2 years ago
|
|
||
|
|
// eslint-disable-next-line no-useless-catch
|
||
|
|
} catch (e) {
|
||
|
|
throw e;
|
||
|
|
} finally {
|
||
|
|
window.log.debug(`ConfigurationSyncJob run() took ${Date.now() - start}ms`);
|
||
|
|
|
||
|
|
// this is a simple way to make sure whatever happens here, we update the lastest timestamp.
|
||
|
|
// (a finally statement is always executed (no matter if exception or returns in other try/catch block)
|
||
|
|
this.updateLastTickTimestamp();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public serializeJob(): GroupSyncPersistedData {
|
||
|
|
const fromParent = super.serializeBase();
|
||
|
|
return fromParent;
|
||
|
|
}
|
||
|
|
|
||
|
|
public addJobCheck(jobs: Array<GroupSyncPersistedData>): AddJobCheckReturn {
|
||
|
|
return this.addJobCheckSameTypeAndIdentifierPresent(jobs);
|
||
|
|
}
|
||
|
|
|
||
|
|
public nonRunningJobsToRemove(_jobs: Array<GroupSyncPersistedData>) {
|
||
|
|
return [];
|
||
|
|
}
|
||
|
|
|
||
|
|
public getJobTimeoutMs(): number {
|
||
|
|
return 20000;
|
||
|
|
}
|
||
|
|
|
||
|
|
private updateLastTickTimestamp() {
|
||
|
2 years ago
|
lastRunConfigSyncJobTimestamps.set(this.persistedData.identifier, Date.now());
|
||
|
2 years ago
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Queue a new Sync Configuration if needed job.
|
||
|
|
* A GroupSyncJob can only be added if there is none of the same type queued already.
|
||
|
|
*/
|
||
|
|
async function queueNewJobIfNeeded(groupPk: GroupPubkeyType) {
|
||
|
|
if (isSignInByLinking()) {
|
||
|
|
window.log.info(`NOT Scheduling GroupSyncJob for ${groupPk} as we are linking a device`);
|
||
|
|
|
||
|
|
return;
|
||
|
|
}
|
||
|
2 years ago
|
const lastRunConfigSyncJobTimestamp = lastRunConfigSyncJobTimestamps.get(groupPk);
|
||
|
2 years ago
|
if (
|
||
|
|
!lastRunConfigSyncJobTimestamp ||
|
||
|
|
lastRunConfigSyncJobTimestamp < Date.now() - defaultMsBetweenRetries
|
||
|
|
) {
|
||
|
|
// window.log.debug('Scheduling GroupSyncJob: ASAP');
|
||
|
|
// we postpone by 1000ms to make sure whoever is adding this job is done with what is needs to do first
|
||
|
|
// this call will make sure that there is only one configuration sync job at all times
|
||
|
|
await runners.groupSyncRunner.addJob(
|
||
|
|
new GroupSyncJob({ identifier: groupPk, nextAttemptTimestamp: Date.now() + 1000 })
|
||
|
|
);
|
||
|
2 years ago
|
return;
|
||
|
2 years ago
|
}
|
||
|
2 years ago
|
|
||
|
|
// if we did run at t=100, and it is currently t=110, the difference is 10
|
||
|
|
const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0);
|
||
|
|
// but we want to run every 30, so what we need is actually `30-10` from now = 20
|
||
|
|
const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 1000);
|
||
|
|
// window.log.debug('Scheduling GroupSyncJob: LATER');
|
||
|
|
|
||
|
|
await runners.groupSyncRunner.addJob(
|
||
|
|
new GroupSyncJob({
|
||
|
|
identifier: groupPk,
|
||
|
|
nextAttemptTimestamp: Date.now() + leftBeforeNextTick,
|
||
|
|
})
|
||
|
|
);
|
||
|
2 years ago
|
}
|
||
|
|
|
||
|
|
export const GroupSync = {
|
||
|
|
GroupSyncJob,
|
||
|
2 years ago
|
pushChangesToGroupSwarmIfNeeded,
|
||
|
2 years ago
|
queueNewJobIfNeeded: (groupPk: GroupPubkeyType) =>
|
||
|
2 years ago
|
allowOnlyOneAtATime(`GroupSyncJob-oneAtAtTime-${groupPk}`, () => queueNewJobIfNeeded(groupPk)),
|
||
|
2 years ago
|
};
|