diff --git a/ts/components/leftpane/ActionsPanel.tsx b/ts/components/leftpane/ActionsPanel.tsx index 247128de9..1bb547ee3 100644 --- a/ts/components/leftpane/ActionsPanel.tsx +++ b/ts/components/leftpane/ActionsPanel.tsx @@ -48,6 +48,8 @@ import { initializeLibSessionUtilWrappers } from '../../session/utils/libsession import { isDarkTheme } from '../../state/selectors/theme'; import { ThemeStateType } from '../../themes/constants/colors'; import { switchThemeTo } from '../../themes/switchTheme'; +import { AvatarDownloadJob } from '../../session/utils/job_runners/jobs/AvatarDownloadJob'; +import { runners } from '../../session/utils/job_runners/JobRunner'; const Section = (props: { type: SectionType }) => { const ourNumber = useSelector(getOurNumber); @@ -62,6 +64,20 @@ const Section = (props: { type: SectionType }) => { const handleClick = async () => { /* tslint:disable:no-void-expression */ if (type === SectionType.Profile) { + const us = UserUtils.getOurPubKeyStrFromCache(); + const ourConvo = getConversationController().get(us); + + const job = new AvatarDownloadJob({ + conversationId: us, + currentRetry: 0, + delayBetweenRetries: 3000, + maxAttempts: 3, + profileKeyHex: ourConvo.get('profileKey') || null, + profilePictureUrl: ourConvo.get('avatarPointer') || null, + }); + await runners.avatarDownloadRunner.loadJobsFromDb(); + runners.avatarDownloadRunner.startProcessing(); + await runners.avatarDownloadRunner.addJob(job); dispatch(editProfileModal({})); } else if (type === SectionType.ColorMode) { const currentTheme = String(window.Events.getThemeSetting()); diff --git a/ts/receiver/configMessage.ts b/ts/receiver/configMessage.ts index a612c13f4..69f965454 100644 --- a/ts/receiver/configMessage.ts +++ b/ts/receiver/configMessage.ts @@ -11,6 +11,7 @@ import { import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils'; import { getConversationController } from '../session/conversations'; import { IncomingMessage } from '../session/messages/incoming/IncomingMessage'; +import { ProfileManager } from '../session/profile_manager/ProfileManager'; import { UserUtils } from '../session/utils'; import { toHex } from '../session/utils/String'; import { configurationMessageReceived, trigger } from '../shims/events'; @@ -21,7 +22,6 @@ import { callLibSessionWorker } from '../webworker/workers/browser/libsession_wo import { removeFromCache } from './cache'; import { handleNewClosedGroup } from './closedGroups'; import { EnvelopePlus } from './types'; -import { appendFetchAvatarAndProfileJob, updateOurProfileSync } from './userProfileImageUpdates'; type IncomingConfResult = { needsPush: boolean; @@ -101,10 +101,9 @@ async function handleUserProfileUpdate(result: IncomingConfResult) { const picUpdate = !isEmpty(updatedProfilePicture.key) && !isEmpty(updatedProfilePicture.url); - // trigger an update of our profileName and picture if there is one. - // this call checks for differences between updating anything - void updateOurProfileSync( - { displayName: updatedUserName, profilePicture: picUpdate ? updatedProfilePicture.url : null }, + await ProfileManager.updateOurProfileSync( + updatedUserName, + picUpdate ? updatedProfilePicture.url : null, picUpdate ? updatedProfilePicture.key : null ); } @@ -157,18 +156,16 @@ async function handleContactsUpdate(result: IncomingConfResult) { await existingConvo.setNickname(wrapperConvo.nickname || null, false); changes = true; } - // make sure to write the changes to the database now as the `appendFetchAvatarAndProfileJob` call below might take some time before getting run + // make sure to write the changes to the database now as the `AvatarDownloadJob` below might take some time before getting run if (changes) { await existingConvo.commit(); } - // we still need to handle the the `name` and the `profilePicture` but those are currently made asynchronously - void appendFetchAvatarAndProfileJob( + // we still need to handle the the `name` (sync) and the `profilePicture` (asynchronous) + await ProfileManager.updateProfileOfContact( existingConvo.id, - { - displayName: wrapperConvo.name, - profilePicture: wrapperConvo.profilePicture?.url || null, - }, + wrapperConvo.name, + wrapperConvo.profilePicture?.url || null, wrapperConvo.profilePicture?.key || null ); } @@ -243,11 +240,8 @@ async function handleOurProfileUpdate( ); const { profileKey, profilePicture, displayName } = configMessage; - const lokiProfile = { - displayName, - profilePicture, - }; - await updateOurProfileSync(lokiProfile, profileKey); + await ProfileManager.updateOurProfileSync(displayName, profilePicture, profileKey); + await setLastProfileUpdateTimestamp(_.toNumber(sentAt)); // do not trigger a signin by linking if the display name is empty if (displayName) { @@ -398,10 +392,11 @@ const handleContactFromConfig = async ( await BlockedNumberController.unblock(contactConvo.id); } - void appendFetchAvatarAndProfileJob( + await ProfileManager.updateProfileOfContact( contactConvo.id, - profileInDataMessage, - contactReceived.profileKey + profileInDataMessage.displayName || undefined, + profileInDataMessage.profilePicture || null, + contactReceived.profileKey || null ); } catch (e) { window?.log?.warn('failed to handle a new closed group from configuration message'); diff --git a/ts/receiver/contentMessage.ts b/ts/receiver/contentMessage.ts index 04958329d..876890d5b 100644 --- a/ts/receiver/contentMessage.ts +++ b/ts/receiver/contentMessage.ts @@ -27,8 +27,8 @@ import { } from '../interactions/conversations/unsendingInteractions'; import { ConversationTypeEnum } from '../models/conversationAttributes'; import { findCachedBlindedMatchOrLookupOnAllServers } from '../session/apis/open_group_api/sogsv3/knownBlindedkeys'; -import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates'; import { IncomingMessage } from '../session/messages/incoming/IncomingMessage'; +import { ProfileManager } from '../session/profile_manager/ProfileManager'; export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageHash: string) { try { @@ -700,9 +700,10 @@ async function handleMessageRequestResponse( } if (messageRequestResponse.profile && !isEmpty(messageRequestResponse.profile)) { - void appendFetchAvatarAndProfileJob( + await ProfileManager.updateProfileOfContact( conversationToApprove.id, - messageRequestResponse.profile, + messageRequestResponse.profile.displayName, + messageRequestResponse.profile.profilePicture, messageRequestResponse.profileKey ); } diff --git a/ts/receiver/dataMessage.ts b/ts/receiver/dataMessage.ts index ae5394e82..a6e142916 100644 --- a/ts/receiver/dataMessage.ts +++ b/ts/receiver/dataMessage.ts @@ -18,11 +18,11 @@ import { } from '../models/messageFactory'; import { MessageModel } from '../models/message'; import { isUsFromCache } from '../session/utils/User'; -import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates'; import { toLogFormat } from '../types/attachments/Errors'; import { ConversationTypeEnum } from '../models/conversationAttributes'; import { Reactions } from '../util/reactions'; import { Action, Reaction } from '../types/Reaction'; +import { ProfileManager } from '../session/profile_manager/ProfileManager'; function cleanAttachment(attachment: any) { return { @@ -216,10 +216,10 @@ export async function handleSwarmDataMessage( cleanDataMessage.profile && cleanDataMessage.profileKey?.length ) { - // do not await this - void appendFetchAvatarAndProfileJob( + await ProfileManager.updateProfileOfContact( senderConversationModel.id, - cleanDataMessage.profile, + cleanDataMessage.profile.displayName, + cleanDataMessage.profile.profilePicture, cleanDataMessage.profileKey ); } diff --git a/ts/receiver/queuedJob.ts b/ts/receiver/queuedJob.ts index b5bb6f505..39aebf1ab 100644 --- a/ts/receiver/queuedJob.ts +++ b/ts/receiver/queuedJob.ts @@ -13,9 +13,9 @@ import { showMessageRequestBanner } from '../state/ducks/userConfig'; import { MessageDirection } from '../models/messageType'; import { LinkPreviews } from '../util/linkPreviews'; import { GoogleChrome } from '../util'; -import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates'; import { ConversationTypeEnum } from '../models/conversationAttributes'; import { getUsBlindedInThatServer } from '../session/apis/open_group_api/sogsv3/knownBlindedkeys'; +import { ProfileManager } from '../session/profile_manager/ProfileManager'; function contentTypeSupported(type: string): boolean { const Chrome = GoogleChrome; @@ -393,9 +393,10 @@ export async function handleMessageJob( // the only profile we don't update with what is coming here is ours, // as our profile is shared accross our devices with a ConfigurationMessage if (messageModel.isIncoming() && regularDataMessage.profile) { - void appendFetchAvatarAndProfileJob( + await ProfileManager.updateProfileOfContact( sendingDeviceConversation.id, - regularDataMessage.profile, + regularDataMessage.profile.displayName, + regularDataMessage.profile.profilePicture, regularDataMessage.profileKey ); } diff --git a/ts/receiver/userProfileImageUpdates.ts b/ts/receiver/userProfileImageUpdates.ts deleted file mode 100644 index 34e8d4f98..000000000 --- a/ts/receiver/userProfileImageUpdates.ts +++ /dev/null @@ -1,164 +0,0 @@ -import Queue from 'queue-promise'; -import ByteBuffer from 'bytebuffer'; -import _ from 'lodash'; - -import { downloadAttachment } from './attachments'; - -import { allowOnlyOneAtATime, hasAlreadyOneAtaTimeMatching } from '../session/utils/Promise'; -import { toHex } from '../session/utils/String'; -import { processNewAttachment } from '../types/MessageAttachment'; -import { MIME } from '../types'; -import { autoScaleForIncomingAvatar } from '../util/attachmentsUtil'; -import { decryptProfile } from '../util/crypto/profileEncrypter'; -import { SignalService } from '../protobuf'; -import { getConversationController } from '../session/conversations'; -import { UserUtils } from '../session/utils'; - -const queue = new Queue({ - concurrent: 1, - interval: 500, -}); - -queue.on('reject', error => { - window.log.warn('[profileupdate] task profile image update failed with', error); -}); - -export async function appendFetchAvatarAndProfileJob( - conversationId: string, - profileInDataMessage: SignalService.DataMessage.ILokiProfile, - profileKey?: Uint8Array | null -) { - if (!conversationId) { - window?.log?.warn('[profileupdate] Cannot update profile with empty convoid'); - return; - } - const oneAtaTimeStr = `appendFetchAvatarAndProfileJob:${conversationId}`; - - if (hasAlreadyOneAtaTimeMatching(oneAtaTimeStr)) { - return; - } - const task = allowOnlyOneAtATime(oneAtaTimeStr, async () => { - return createOrUpdateProfile(conversationId, profileInDataMessage, profileKey); - }); - - queue.enqueue(async () => task); -} - -/** - * This function should be used only when we have to do a sync update to our conversation with a new profile/avatar image or display name - * It tries to fetch the profile image, scale it, save it, and update the conversationModel - */ -export async function updateOurProfileSync( - profileInDataMessage: SignalService.DataMessage.ILokiProfile, - profileKey?: Uint8Array | null -) { - const ourConvo = getConversationController().get(UserUtils.getOurPubKeyStrFromCache()); - if (!ourConvo?.id) { - window?.log?.warn('[profileupdate] Cannot update our profile with empty convoid'); - return; - } - - const oneAtaTimeStr = `appendFetchAvatarAndProfileJob:${ourConvo.id}`; - return allowOnlyOneAtATime(oneAtaTimeStr, async () => { - return createOrUpdateProfile(ourConvo.id, profileInDataMessage, profileKey); - }); -} - -/** - * Creates a new profile from the profile provided. Creates the profile if it doesn't exist. - */ -async function createOrUpdateProfile( - conversationId: string, - profileInDataMessage: SignalService.DataMessage.ILokiProfile, - profileKey?: Uint8Array | null -) { - const conversation = getConversationController().get(conversationId); - if (!conversation) { - return; - } - if (!conversation.isPrivate()) { - window.log.warn('createOrUpdateProfile can only be used for private convos'); - return; - } - - const existingDisplayName = conversation.get('displayNameInProfile'); - const newDisplayName = profileInDataMessage.displayName; - - let changes = false; - if (existingDisplayName !== newDisplayName) { - changes = true; - conversation.set('displayNameInProfile', newDisplayName || undefined); - } - - if (profileInDataMessage.profilePicture && profileKey) { - const prevPointer = conversation.get('avatarPointer'); - const needsUpdate = - !prevPointer || !_.isEqual(prevPointer, profileInDataMessage.profilePicture); - - if (needsUpdate) { - try { - window.log.debug(`[profileupdate] starting downloading task for ${conversation.id}`); - const downloaded = await downloadAttachment({ - url: profileInDataMessage.profilePicture, - isRaw: true, - }); - - // null => use placeholder with color and first letter - let path = null; - if (profileKey) { - // Convert profileKey to ArrayBuffer, if needed - const encoding = typeof profileKey === 'string' ? 'base64' : undefined; - try { - const profileKeyArrayBuffer = ByteBuffer.wrap(profileKey, encoding).toArrayBuffer(); - const decryptedData = await decryptProfile(downloaded.data, profileKeyArrayBuffer); - window.log.info( - `[profileupdate] about to auto scale avatar for convo ${conversation.id}` - ); - - const scaledData = await autoScaleForIncomingAvatar(decryptedData); - const upgraded = await processNewAttachment({ - data: await scaledData.blob.arrayBuffer(), - contentType: MIME.IMAGE_UNKNOWN, // contentType is mostly used to generate previews and screenshot. We do not care for those in this case. - }); - // Only update the convo if the download and decrypt is a success - conversation.set('avatarPointer', profileInDataMessage.profilePicture); - conversation.set('profileKey', toHex(profileKey)); - ({ path } = upgraded); - } catch (e) { - window?.log?.error(`[profileupdate] Could not decrypt profile image: ${e}`); - } - } - conversation.set({ avatarInProfile: path || undefined }); - - changes = true; - } catch (e) { - window.log.warn( - `[profileupdate] Failed to download attachment at ${profileInDataMessage.profilePicture}. Maybe it expired? ${e.message}` - ); - // do not return here, we still want to update the display name even if the avatar failed to download - } - } - } else if (profileKey) { - conversation.set({ avatarInProfile: undefined }); - } - - if (conversation.id === UserUtils.getOurPubKeyStrFromCache()) { - // make sure the settings which should already set to `true` are - if ( - !conversation.get('isTrustedForAttachmentDownload') || - !conversation.get('isApproved') || - !conversation.get('didApproveMe') - ) { - conversation.set({ - isTrustedForAttachmentDownload: true, - isApproved: true, - didApproveMe: true, - }); - changes = true; - } - } - - if (changes) { - await conversation.commit(); - } -} diff --git a/ts/session/profile_manager/ProfileManager.ts b/ts/session/profile_manager/ProfileManager.ts new file mode 100644 index 000000000..e428d557f --- /dev/null +++ b/ts/session/profile_manager/ProfileManager.ts @@ -0,0 +1,73 @@ +import { to_hex } from 'libsodium-wrappers-sumo'; +import { isEmpty } from 'lodash'; +import { getConversationController } from '../conversations'; +import { UserUtils } from '../utils'; +import { runners } from '../utils/job_runners/JobRunner'; +import { + AvatarDownloadJob, + shouldAddAvatarDownloadJob, +} from '../utils/job_runners/jobs/AvatarDownloadJob'; + +/** + * This can be used to update our conversation display name with the given name right away, and plan an AvatarDownloadJob to retrieve the new avatar if needed to download it + */ +async function updateOurProfileSync( + displayName: string | undefined, + profileUrl: string | null, + profileKey: Uint8Array | null +) { + const ourConvo = getConversationController().get(UserUtils.getOurPubKeyStrFromCache()); + if (!ourConvo?.id) { + window?.log?.warn('[profileupdate] Cannot update our profile with empty convoid'); + return; + } + + return updateProfileOfContact( + UserUtils.getOurPubKeyStrFromCache(), + displayName, + profileUrl, + profileKey + ); +} + +/** + * This can be used to update the display name of the given pubkey right away, and plan an AvatarDownloadJob to retrieve the new avatar if needed to download it. + */ +async function updateProfileOfContact( + pubkey: string, + displayName: string | null | undefined, + profileUrl: string | null | undefined, + profileKey: Uint8Array | null | undefined +) { + const conversation = getConversationController().get(pubkey); + + if (!conversation || !conversation.isPrivate()) { + window.log.warn('updateProfileOfContact can only be used for existing and private convos'); + return; + } + + const existingDisplayName = conversation.get('displayNameInProfile'); + + // avoid setting the display name to an invalid value + if (existingDisplayName !== displayName && !isEmpty(displayName)) { + conversation.set('displayNameInProfile', displayName || undefined); + await conversation.commit(); + } + // add an avatar download job only if needed + + const profileKeyHex = !profileKey || isEmpty(profileKey) ? null : to_hex(profileKey); + if (shouldAddAvatarDownloadJob({ pubkey, profileUrl, profileKeyHex })) { + const avatarDownloadJob = new AvatarDownloadJob({ + conversationId: pubkey, + profileKeyHex, + profilePictureUrl: profileUrl || null, + }); + + await runners.avatarDownloadRunner.addJob(avatarDownloadJob); + } +} + +export const ProfileManager = { + updateOurProfileSync, + updateProfileOfContact, +}; diff --git a/ts/session/utils/job_runners/JobDeserialization.ts b/ts/session/utils/job_runners/JobDeserialization.ts index 0e800613e..ed991b578 100644 --- a/ts/session/utils/job_runners/JobDeserialization.ts +++ b/ts/session/utils/job_runners/JobDeserialization.ts @@ -3,40 +3,29 @@ import { FakeSleepForJob, FakeSleepForMultiJob, } from '../../../test/session/unit/utils/job_runner/FakeSleepForJob'; +import { AvatarDownloadJob } from './jobs/AvatarDownloadJob'; import { ConfigurationSyncJob } from './jobs/ConfigurationSyncJob'; -import { Persistedjob, PersistedJobType, SerializedPersistedJob } from './PersistedJob'; +import { PersistedJob, TypeOfPersistedData } from './PersistedJob'; -export function persistedJobFromData(data: SerializedPersistedJob): Persistedjob | null { +export function persistedJobFromData( + data: T +): PersistedJob | null { if (!data || isEmpty(data.jobType) || !isString(data?.jobType)) { return null; } - const jobType: PersistedJobType = data.jobType as PersistedJobType; - switch (jobType) { + + switch (data.jobType) { case 'ConfigurationSyncJobType': - return new ConfigurationSyncJob({ - maxAttempts: data.maxAttempts, - identifier: data.identifier, - nextAttemptTimestamp: data.nextAttemptTimestamp, - currentRetry: data.currentRetry, - }); + return (new ConfigurationSyncJob(data) as unknown) as PersistedJob; + + case 'AvatarDownloadJobType': + return (new AvatarDownloadJob(data) as unknown) as PersistedJob; case 'FakeSleepForJobType': - return new FakeSleepForJob({ - maxAttempts: data.maxAttempts, - identifier: data.identifier, - nextAttemptTimestamp: data.nextAttemptTimestamp, - currentRetry: data.currentRetry, - }); + return (new FakeSleepForJob(data) as unknown) as PersistedJob; case 'FakeSleepForJobMultiType': - return new FakeSleepForMultiJob({ - maxAttempts: data.maxAttempts, - identifier: data.identifier, - nextAttemptTimestamp: data.nextAttemptTimestamp, - currentRetry: data.currentRetry, - returnResult: data.returnResult, - sleepDuration: data.sleepDuration, - }); + return (new FakeSleepForMultiJob(data) as unknown) as PersistedJob; default: - console.warn('unknown persisted job type:', jobType); + console.warn('unknown persisted job type:', (data as any).jobType); return null; } } diff --git a/ts/session/utils/job_runners/JobRunner.ts b/ts/session/utils/job_runners/JobRunner.ts index 6e305f409..aa7838143 100644 --- a/ts/session/utils/job_runners/JobRunner.ts +++ b/ts/session/utils/job_runners/JobRunner.ts @@ -2,7 +2,12 @@ import { cloneDeep, compact, isArray, isString } from 'lodash'; import { Data } from '../../../data/data'; import { persistedJobFromData } from './JobDeserialization'; import { JobRunnerType } from './jobs/JobRunnerType'; -import { Persistedjob, SerializedPersistedJob } from './PersistedJob'; +import { + AvatarDownloadPersistedData, + ConfigurationSyncPersistedData, + PersistedJob, + TypeOfPersistedData, +} from './PersistedJob'; /** * 'job_in_progress' if there is already a job in progress @@ -15,10 +20,10 @@ export type StartProcessingResult = 'job_in_progress' | 'job_deferred' | 'job_st export type AddJobResult = 'job_deferred' | 'job_started'; export type JobEventListener = { - onJobSuccess: (job: SerializedPersistedJob) => void; - onJobDeferred: (job: SerializedPersistedJob) => void; - onJobError: (job: SerializedPersistedJob) => void; - onJobStarted: (job: SerializedPersistedJob) => void; + onJobSuccess: (job: TypeOfPersistedData) => void; + onJobDeferred: (job: TypeOfPersistedData) => void; + onJobError: (job: TypeOfPersistedData) => void; + onJobStarted: (job: TypeOfPersistedData) => void; }; /** @@ -31,26 +36,26 @@ export type JobEventListener = { * * */ -export class PersistedJobRunner { +export class PersistedJobRunner { private isInit = false; - private jobsScheduled: Array = []; + private jobsScheduled: Array> = []; private isStarted = false; private readonly jobRunnerType: JobRunnerType; private nextJobStartTimer: NodeJS.Timeout | null = null; - private currentJob: Persistedjob | null = null; + private currentJob: PersistedJob | null = null; private readonly jobEventsListener: JobEventListener | null; constructor(jobRunnerType: JobRunnerType, jobEventsListener: JobEventListener | null) { this.jobRunnerType = jobRunnerType; this.jobEventsListener = jobEventsListener; - window.log.warn('new runner'); + window?.log?.warn(`new runner of type ${jobRunnerType} built`); } public async loadJobsFromDb() { if (this.isInit) { - throw new Error('job runner already init'); + return; } - let jobsArray: Array = []; + let jobsArray: Array = []; const found = await Data.getItemById(this.getJobRunnerItemId()); if (found && found.value && isString(found.value)) { const asStr = found.value; @@ -67,7 +72,7 @@ export class PersistedJobRunner { jobsArray = []; } } - const jobs: Array = compact(jobsArray.map(persistedJobFromData)); + const jobs: Array> = compact(jobsArray.map(persistedJobFromData)); this.jobsScheduled = cloneDeep(jobs); // make sure the list is sorted this.sortJobsList(); @@ -75,29 +80,42 @@ export class PersistedJobRunner { } public async addJob( - job: Persistedjob + job: PersistedJob ): Promise<'type_exists' | 'identifier_exists' | AddJobResult> { this.assertIsInitialized(); - if (job.singleJobInQueue) { - // make sure there is no job with that same type already scheduled. - if (this.jobsScheduled.find(j => j.jobType === job.jobType)) { - console.info( - `job runner has already a job with type:"${job.jobType}" planned so not adding another one` - ); - return 'type_exists'; - } - return this.addJobUnchecked(job); + if (this.jobsScheduled.find(j => j.persistedData.identifier === job.persistedData.identifier)) { + window.log.info( + `job runner has already a job with id:"${job.persistedData.identifier}" planned so not adding another one` + ); + return 'identifier_exists'; } - // make sure there is no job with that same identifier already . - if (this.jobsScheduled.find(j => j.identifier === job.identifier)) { - console.info( - `job runner has already a job with id:"${job.identifier}" planned so not adding another one` + const serializedNonRunningJobs = this.jobsScheduled + .filter(j => j !== this.currentJob) + .map(k => k.serializeJob()); + + const addJobChecks = job.addJobCheck(serializedNonRunningJobs); + if (addJobChecks === 'skipAsJobTypeAlreadyPresent') { + window.log.warn( + `job runner has already a job with type:"${job.persistedData.jobType}" planned so not adding another one` ); - return 'identifier_exists'; + return 'type_exists'; + } + + // if addJobCheck returned 'removeJobsFromQueue it means that job logic estimates some jobs have to remove before adding that one. + // so let's grab the jobs to remove, remove them, and then add that new job nevertheless + if (addJobChecks === 'removeJobsFromQueue') { + // fetch all the jobs which we should remove and remove them + const toRemove = job.nonRunningJobsToRemove(serializedNonRunningJobs); + this.deleteJobsByIdentifier(toRemove.map(m => m.identifier)); + this.sortJobsList(); + await this.writeJobsToDB(); } - console.info(`job runner adding type :"${job.jobType}" `); + + // make sure there is no job with that same identifier already . + + window.log.info(`job runner adding type :"${job.persistedData.jobType}" `); return this.addJobUnchecked(job); } @@ -145,14 +163,16 @@ export class PersistedJobRunner { public startProcessing(): StartProcessingResult { if (this.isStarted) { - throw new Error('startProcessing already called'); + return this.planNextJob(); } this.isStarted = true; return this.planNextJob(); } private sortJobsList() { - this.jobsScheduled.sort((a, b) => a.nextAttemptTimestamp - b.nextAttemptTimestamp); + this.jobsScheduled.sort( + (a, b) => a.persistedData.nextAttemptTimestamp - b.persistedData.nextAttemptTimestamp + ); } private async writeJobsToDB() { @@ -164,7 +184,8 @@ export class PersistedJobRunner { }); } - private async addJobUnchecked(job: Persistedjob) { + private async addJobUnchecked(job: PersistedJob) { + console.warn('job', job); this.jobsScheduled.push(cloneDeep(job)); this.sortJobsList(); await this.writeJobsToDB(); @@ -202,6 +223,7 @@ export class PersistedJobRunner { return 'no_job'; } } + if (this.currentJob) { return 'job_in_progress'; } @@ -211,7 +233,7 @@ export class PersistedJobRunner { return 'no_job'; } - if (nextJob.nextAttemptTimestamp <= Date.now()) { + if (nextJob.persistedData.nextAttemptTimestamp <= Date.now()) { if (this.nextJobStartTimer) { global.clearTimeout(this.nextJobStartTimer); this.nextJobStartTimer = null; @@ -233,18 +255,20 @@ export class PersistedJobRunner { this.nextJobStartTimer = null; } void this.runNextJob(); - }, Math.max(nextJob.nextAttemptTimestamp - Date.now(), 1)); + }, Math.max(nextJob.persistedData.nextAttemptTimestamp - Date.now(), 1)); return 'job_deferred'; } - private deleteJobByIdentifier(identifier: string) { - const jobIndex = this.jobsScheduled.findIndex(f => f.identifier === identifier); - console.info('deleteJobByIdentifier job', identifier, ' index', jobIndex); + private deleteJobsByIdentifier(identifiers: Array) { + identifiers.forEach(identifier => { + const jobIndex = this.jobsScheduled.findIndex(f => f.persistedData.identifier === identifier); + window.log.info('deleteJobsByIdentifier job', identifier, ' index', jobIndex); - if (jobIndex >= 0) { - this.jobsScheduled.splice(jobIndex, 1); - } + if (jobIndex >= 0) { + this.jobsScheduled.splice(jobIndex, 1); + } + }); } private async runNextJob() { @@ -256,10 +280,10 @@ export class PersistedJobRunner { const nextJob = this.jobsScheduled[0]; // if the time is 101, and that task is to be run at t=101, we need to start it right away. - if (nextJob.nextAttemptTimestamp > Date.now()) { + if (nextJob.persistedData.nextAttemptTimestamp > Date.now()) { window.log.warn( 'next job is not due to be run just yet. Going idle.', - nextJob.nextAttemptTimestamp - Date.now() + nextJob.persistedData.nextAttemptTimestamp - Date.now() ); this.planNextJob(); return; @@ -274,25 +298,26 @@ export class PersistedJobRunner { this.jobEventsListener?.onJobStarted(this.currentJob.serializeJob()); const success = await this.currentJob.runJob(); + if (!success) { - throw new Error(`job ${nextJob.identifier} failed`); + throw new Error(`job ${nextJob.persistedData.identifier} failed`); } // here the job did not throw and didn't return false. Consider it OK then and remove it from the list of jobs to run. - this.deleteJobByIdentifier(this.currentJob.identifier); + this.deleteJobsByIdentifier([this.currentJob.persistedData.identifier]); await this.writeJobsToDB(); } catch (e) { - // either the job throw or didn't return 'OK' - if (nextJob.currentRetry >= nextJob.maxAttempts - 1) { + if (nextJob.persistedData.currentRetry >= nextJob.persistedData.maxAttempts - 1) { // we cannot restart this job anymore. Remove the entry completely - this.deleteJobByIdentifier(nextJob.identifier); + this.deleteJobsByIdentifier([nextJob.persistedData.identifier]); if (this.jobEventsListener && this.currentJob) { this.jobEventsListener.onJobError(this.currentJob.serializeJob()); } } else { - nextJob.currentRetry = nextJob.currentRetry + 1; + nextJob.persistedData.currentRetry = nextJob.persistedData.currentRetry + 1; // that job can be restarted. Plan a retry later with the already defined retry - nextJob.nextAttemptTimestamp = Date.now() + nextJob.delayBetweenRetries; + nextJob.persistedData.nextAttemptTimestamp = + Date.now() + nextJob.persistedData.delayBetweenRetries; if (this.jobEventsListener && this.currentJob) { this.jobEventsListener.onJobDeferred(this.currentJob.serializeJob()); } @@ -322,8 +347,16 @@ export class PersistedJobRunner { } } -const configurationSyncRunner = new PersistedJobRunner('ConfigurationSyncJob', null); +const configurationSyncRunner = new PersistedJobRunner( + 'ConfigurationSyncJob', + null +); +const avatarDownloadRunner = new PersistedJobRunner( + 'AvatarDownloadJob', + null +); export const runners = { configurationSyncRunner, + avatarDownloadRunner, }; diff --git a/ts/session/utils/job_runners/PersistedJob.ts b/ts/session/utils/job_runners/PersistedJob.ts index 632429b4f..b9f3350c6 100644 --- a/ts/session/utils/job_runners/PersistedJob.ts +++ b/ts/session/utils/job_runners/PersistedJob.ts @@ -1,75 +1,84 @@ -import { isEmpty } from 'lodash'; +import { cloneDeep, isEmpty } from 'lodash'; export type PersistedJobType = | 'ConfigurationSyncJobType' + | 'AvatarDownloadJobType' | 'FakeSleepForJobType' | 'FakeSleepForJobMultiType'; -export type SerializedPersistedJob = { - // we need at least those as they are needed to do lookups of the list of jobs. - jobType: string; +interface PersistedJobData { + jobType: PersistedJobType; identifier: string; nextAttemptTimestamp: number; - maxAttempts: number; // to run try to run it twice, set this to 2. - currentRetry: number; // - // then we can have other details on a specific type of job case - [key: string]: any; -}; - -export abstract class Persistedjob { - public readonly identifier: string; - public readonly singleJobInQueue: boolean; - public readonly delayBetweenRetries: number; - public readonly maxAttempts: number; - public readonly jobType: PersistedJobType; - public currentRetry: number; - public nextAttemptTimestamp: number; + delayBetweenRetries: number; + maxAttempts: number; // to try to run this job twice, set this to 2. + currentRetry: number; +} + +export interface FakeSleepJobData extends PersistedJobData { + jobType: 'FakeSleepForJobType'; + returnResult: boolean; + sleepDuration: number; +} +export interface FakeSleepForMultiJobData extends PersistedJobData { + jobType: 'FakeSleepForJobMultiType'; + returnResult: boolean; + sleepDuration: number; +} + +export interface AvatarDownloadPersistedData extends PersistedJobData { + jobType: 'AvatarDownloadJobType'; + conversationId: string; + profileKeyHex: string | null; + profilePictureUrl: string | null; +} + +export interface ConfigurationSyncPersistedData extends PersistedJobData { + jobType: 'ConfigurationSyncJobType'; +} + +export type TypeOfPersistedData = + | ConfigurationSyncPersistedData + | AvatarDownloadPersistedData + | FakeSleepJobData + | FakeSleepForMultiJobData; + +/** + * This class can be used to save and run jobs from the database. + * Every child class must take the minimum amount of arguments, and make sure they are unlikely to change. + * For instance, don't have the attachments to downloads as arguments, just the messageId and the index. + * Don't have the new profileImage url for an avatar download job, just the conversationId. + * + * It is the role of the job to fetch the latest data, and decide if a process is needed or not + * If the job throws or returns false, it will be retried by the corresponding job runner. + */ +export abstract class PersistedJob { + public persistedData: T; private runningPromise: Promise | null = null; - public constructor({ - maxAttempts, - delayBetweenRetries, - identifier, - singleJobInQueue, - jobType, - nextAttemptTimestamp, - }: { - identifier: string; - maxAttempts: number; - delayBetweenRetries: number; - singleJobInQueue: boolean; - jobType: PersistedJobType; - nextAttemptTimestamp: number; - currentRetry: number; - }) { - this.identifier = identifier; - this.jobType = jobType; - this.delayBetweenRetries = delayBetweenRetries; - this.maxAttempts = maxAttempts; - this.currentRetry = 0; - this.singleJobInQueue = singleJobInQueue; - this.nextAttemptTimestamp = nextAttemptTimestamp; - - if (maxAttempts < 1) { + public constructor(data: T) { + if (data.maxAttempts < 1) { throw new Error('maxAttempts must be >= 1'); } - if (isEmpty(identifier)) { + if (isEmpty(data.identifier)) { throw new Error('identifier must be not empty'); } - if (isEmpty(jobType)) { - throw new Error('identifier must be not empty'); + if (isEmpty(data.jobType)) { + throw new Error('jobType must be not empty'); } - if (delayBetweenRetries <= 0) { + if (data.delayBetweenRetries <= 0) { throw new Error('delayBetweenRetries must be at least > 0'); } - if (nextAttemptTimestamp <= 0) { + if (data.nextAttemptTimestamp <= 0) { throw new Error('nextAttemptTimestamp must be set and > 0'); } + + this.persistedData = data; } public async runJob() { @@ -86,30 +95,40 @@ export abstract class Persistedjob { public async waitForCurrentTry() { try { // tslint:disable-next-line: no-promise-as-boolean - return this.runningPromise || Promise.resolve(); + return this.runningPromise || Promise.resolve(true); } catch (e) { window.log.warn('waitForCurrentTry got an error: ', e.message); - return Promise.resolve(); + return Promise.resolve(true); } } /** * This one must be reimplemented in the child class, and must first call `super.serializeBase()` */ - public abstract serializeJob(): SerializedPersistedJob; - - protected abstract run(): Promise; // must return true if that job is a success and doesn't need to be retried - - protected serializeBase(): SerializedPersistedJob { - return { - // those are mandatory - jobType: this.jobType, - identifier: this.identifier, - nextAttemptTimestamp: this.nextAttemptTimestamp, - maxAttempts: this.maxAttempts, - currentRetry: this.currentRetry, - delayBetweenRetries: this.delayBetweenRetries, - singleJobInQueue: this.singleJobInQueue, - }; + public abstract serializeJob(): T; + + public abstract nonRunningJobsToRemove(jobs: Array): Array; + + public abstract addJobCheck( + jobs: Array + ): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null; + + public addJobCheckSameTypePresent(jobs: Array): 'skipAsJobTypeAlreadyPresent' | null { + return jobs.some(j => j.jobType === this.persistedData.jobType) + ? 'skipAsJobTypeAlreadyPresent' + : null; + } + + /** + * This function will be called by the runner do run the logic of that job. + * It **must** return true if that job is a success and doesn't need to be retried. + * If it returns false, or throws, it will be retried (if not reach the retries limit yet). + * + * Note: you should check the this.isAborted() to know if you should cancel the current processing of your logic. + */ + protected abstract run(): Promise; + + protected serializeBase(): T { + return cloneDeep(this.persistedData); } } diff --git a/ts/session/utils/job_runners/jobs/AvatarDownloadJob.ts b/ts/session/utils/job_runners/jobs/AvatarDownloadJob.ts new file mode 100644 index 000000000..7fa074011 --- /dev/null +++ b/ts/session/utils/job_runners/jobs/AvatarDownloadJob.ts @@ -0,0 +1,233 @@ +import { isEmpty, isEqual, isNumber } from 'lodash'; +import { v4 } from 'uuid'; +import { UserUtils } from '../..'; +import { downloadAttachment } from '../../../../receiver/attachments'; +import { MIME } from '../../../../types'; +import { processNewAttachment } from '../../../../types/MessageAttachment'; +import { autoScaleForIncomingAvatar } from '../../../../util/attachmentsUtil'; +import { decryptProfile } from '../../../../util/crypto/profileEncrypter'; +import { getConversationController } from '../../../conversations'; +import { fromHexToArray } from '../../String'; +import { AvatarDownloadPersistedData, PersistedJob } from '../PersistedJob'; + +const defaultMsBetweenRetries = 10000; +const defaultMaxAttemps = 3; + +/** + * Returns true if given those details we should add an Avatar Download Job to the list of jobs to run + */ +export function shouldAddAvatarDownloadJob({ + profileKeyHex, + profileUrl, + pubkey, +}: { + pubkey: string; + profileUrl: string | null | undefined; + profileKeyHex: string | null | undefined; +}) { + const conversation = getConversationController().get(pubkey); + if (!conversation) { + // return true so we do not retry this task. + window.log.warn('shouldAddAvatarDownloadJob did not corresponding conversation'); + + return false; + } + if (!conversation.isPrivate()) { + window.log.warn('shouldAddAvatarDownloadJob can only be used for private convos currently'); + return false; + } + if (profileUrl && !isEmpty(profileKeyHex)) { + const prevPointer = conversation.get('avatarPointer'); + const needsUpdate = !prevPointer || !isEqual(prevPointer, profileUrl); + + return needsUpdate; + } + return false; +} + +/** + * This job can be used to add the downloading of the avatar of a conversation to the list of jobs to be run. + * The conversationId is used as identifier so we can only have a single job per conversation. + * When the jobRunners starts this job, the job first checks if a download is required or not (avatarPointer changed and wasn't already downloaded). + * If yes, it downloads the new avatar, decrypt it and store it before updating the conversation with the new url,profilekey and local file storage. + */ +export class AvatarDownloadJob extends PersistedJob { + constructor({ + conversationId, + nextAttemptTimestamp, + maxAttempts, + currentRetry, + profileKeyHex, + profilePictureUrl, + identifier, + }: Pick & { + conversationId: string; + } & Partial< + Pick< + AvatarDownloadPersistedData, + | 'nextAttemptTimestamp' + | 'identifier' + | 'maxAttempts' + | 'delayBetweenRetries' + | 'currentRetry' + > + >) { + super({ + jobType: 'AvatarDownloadJobType', + identifier: identifier || v4(), + conversationId, + delayBetweenRetries: defaultMsBetweenRetries, + maxAttempts: isNumber(maxAttempts) ? maxAttempts : defaultMaxAttemps, + nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + defaultMsBetweenRetries, + currentRetry: isNumber(currentRetry) ? currentRetry : 0, + profileKeyHex, + profilePictureUrl, + }); + } + + // tslint:disable-next-line: cyclomatic-complexity + public async run(): Promise { + const convoId = this.persistedData.conversationId; + + window.log.warn( + `running job ${this.persistedData.jobType} with conversationId:"${convoId}" id:"${this.persistedData.identifier}" ` + ); + + if (!this.persistedData.identifier || !convoId) { + // return true so we do not retry this task. + return true; + } + + let conversation = getConversationController().get(convoId); + if (!conversation) { + // return true so we do not retry this task. + window.log.warn('AvatarDownloadJob did not corresponding conversation'); + + return true; + } + if (!conversation.isPrivate()) { + window.log.warn('AvatarDownloadJob can only be used for private convos currently'); + return true; + } + let changes = false; + + const shouldRunJob = shouldAddAvatarDownloadJob({ + pubkey: convoId, + profileKeyHex: this.persistedData.profileKeyHex, + profileUrl: this.persistedData.profilePictureUrl, + }); + if (!shouldRunJob) { + // return true so we do not retry this task. + window.log.warn('AvatarDownloadJob shouldAddAvatarDownloadJob said no'); + + return true; + } + + if (this.persistedData.profilePictureUrl && this.persistedData.profileKeyHex) { + const prevPointer = conversation.get('avatarPointer'); + const needsUpdate = + !prevPointer || !isEqual(prevPointer, this.persistedData.profilePictureUrl); + + if (needsUpdate) { + try { + window.log.debug(`[profileupdate] starting downloading task for ${conversation.id}`); + const downloaded = await downloadAttachment({ + url: this.persistedData.profilePictureUrl, + isRaw: true, + }); + conversation = getConversationController().getOrThrow(convoId); + + // null => use placeholder with color and first letter + let path = null; + + try { + const profileKeyArrayBuffer = fromHexToArray(this.persistedData.profileKeyHex); + const decryptedData = await decryptProfile(downloaded.data, profileKeyArrayBuffer); + + window.log.info( + `[profileupdate] about to auto scale avatar for convo ${conversation.id}` + ); + + const scaledData = await autoScaleForIncomingAvatar(decryptedData); + + const upgraded = await processNewAttachment({ + data: await scaledData.blob.arrayBuffer(), + contentType: MIME.IMAGE_UNKNOWN, // contentType is mostly used to generate previews and screenshot. We do not care for those in this case. + }); + conversation = getConversationController().getOrThrow(convoId); + + // Only update the convo if the download and decrypt is a success + conversation.set('avatarPointer', this.persistedData.profilePictureUrl); + conversation.set('profileKey', this.persistedData.profileKeyHex || undefined); + ({ path } = upgraded); + } catch (e) { + window?.log?.error(`[profileupdate] Could not decrypt profile image: ${e}`); + } + + conversation.set({ avatarInProfile: path || undefined }); + + changes = true; + } catch (e) { + window.log.warn( + `[profileupdate] Failed to download attachment at ${this.persistedData.profilePictureUrl}. Maybe it expired? ${e.message}` + ); + // do not return here, we still want to update the display name even if the avatar failed to download + } + } + } else { + if ( + conversation.get('avatarInProfile') || + conversation.get('avatarPointer') || + conversation.get('profileKey') + ) { + changes = true; + conversation.set({ + avatarInProfile: undefined, + avatarPointer: undefined, + profileKey: undefined, + }); + } + } + + if (conversation.id === UserUtils.getOurPubKeyStrFromCache()) { + // make sure the settings which should already set to `true` are + if ( + !conversation.get('isTrustedForAttachmentDownload') || + !conversation.get('isApproved') || + !conversation.get('didApproveMe') + ) { + conversation.set({ + isTrustedForAttachmentDownload: true, + isApproved: true, + didApproveMe: true, + }); + changes = true; + } + } + + if (changes) { + await conversation.commit(); + } + + // return true so this job is marked as a success + return true; + } + + public serializeJob(): AvatarDownloadPersistedData { + return super.serializeBase(); + } + + public nonRunningJobsToRemove(jobs: Array) { + // for an avatar download job, we want to remove any job matching the same conversationID. + return jobs.filter(j => j.conversationId === this.persistedData.conversationId); + } + + public addJobCheck( + jobs: Array + ): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null { + if (this.nonRunningJobsToRemove(jobs).length) { + return 'removeJobsFromQueue'; + } + return null; + } +} diff --git a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts index 88ce0c650..cb5b8778d 100644 --- a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts +++ b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts @@ -1,45 +1,58 @@ -import { isNumber } from 'lodash'; import { v4 } from 'uuid'; import { sleepFor } from '../../Promise'; -import { Persistedjob, SerializedPersistedJob } from '../PersistedJob'; +import { ConfigurationSyncPersistedData, PersistedJob } from '../PersistedJob'; -export class ConfigurationSyncJob extends Persistedjob { +const defaultMsBetweenRetries = 3000; + +export class ConfigurationSyncJob extends PersistedJob { constructor({ identifier, nextAttemptTimestamp, maxAttempts, currentRetry, - }: { - identifier: string | null; - nextAttemptTimestamp: number | null; - maxAttempts: number | null; - currentRetry: number; - }) { + }: Pick & + Partial>) { super({ jobType: 'ConfigurationSyncJobType', identifier: identifier || v4(), - delayBetweenRetries: 3000, - maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3, - nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000, - singleJobInQueue: true, + delayBetweenRetries: defaultMsBetweenRetries, + maxAttempts: maxAttempts, + nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + defaultMsBetweenRetries, currentRetry, }); } public async run() { // blablha do everything from the notion page, and if success, return true. - window.log.warn(`running job ${this.jobType} with id:"${this.identifier}" `); + window.log.warn( + `running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" ` + ); await sleepFor(5000); window.log.warn( - `running job ${this.jobType} with id:"${this.identifier}" done and returning failed ` + `running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done and returning failed ` ); return false; } - public serializeJob(): SerializedPersistedJob { + public serializeJob(): ConfigurationSyncPersistedData { const fromParent = super.serializeBase(); return fromParent; } + + public addJobCheck( + jobs: Array + ): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null { + return this.addJobCheckSameTypePresent(jobs); + } + + /** + * For the SharedConfig job, we do not care about the jobs already in the list. + * We never want to add a new sync configuration job if there is already one in the queue. + * This is done by the `addJobCheck` method above + */ + public nonRunningJobsToRemove(_jobs: Array) { + return []; + } } diff --git a/ts/session/utils/job_runners/jobs/JobRunnerType.ts b/ts/session/utils/job_runners/jobs/JobRunnerType.ts index 1b448416b..d24a2dc78 100644 --- a/ts/session/utils/job_runners/jobs/JobRunnerType.ts +++ b/ts/session/utils/job_runners/jobs/JobRunnerType.ts @@ -1 +1,5 @@ -export type JobRunnerType = 'ConfigurationSyncJob' | 'FakeSleepForJob'; +export type JobRunnerType = + | 'ConfigurationSyncJob' + | 'FakeSleepForJob' + | 'FakeSleepForMultiJob' + | 'AvatarDownloadJob'; diff --git a/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts b/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts index c96abbe9b..cedab8f53 100644 --- a/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts +++ b/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts @@ -2,14 +2,12 @@ import { isNumber } from 'lodash'; import { v4 } from 'uuid'; import { sleepFor } from '../../../../../session/utils/Promise'; import { - Persistedjob, - SerializedPersistedJob, + FakeSleepForMultiJobData, + FakeSleepJobData, + PersistedJob, } from '../../../../../session/utils/job_runners/PersistedJob'; -export class FakeSleepForMultiJob extends Persistedjob { - private readonly sleepDuration: number; - private readonly returnResult: boolean; - +export class FakeSleepForMultiJob extends PersistedJob { constructor({ identifier, nextAttemptTimestamp, @@ -17,25 +15,20 @@ export class FakeSleepForMultiJob extends Persistedjob { currentRetry, returnResult, sleepDuration, - }: { - identifier: string | null; - nextAttemptTimestamp: number | null; - maxAttempts: number | null; - currentRetry: number; - sleepDuration: number; - returnResult: boolean; - }) { + }: Pick & + Partial< + Pick + >) { super({ jobType: 'FakeSleepForJobMultiType', identifier: identifier || v4(), delayBetweenRetries: 10000, maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3, nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000, - singleJobInQueue: false, currentRetry, + returnResult, + sleepDuration, }); - this.returnResult = returnResult; - this.sleepDuration = sleepDuration; if (process.env.NODE_APP_INSTANCE !== undefined) { throw new Error('FakeSleepForJobMultiType are only meant for testing purposes'); } @@ -43,41 +36,53 @@ export class FakeSleepForMultiJob extends Persistedjob { public async run() { window.log.warn( - `running job ${this.jobType} with id:"${this.identifier}". sleeping for ${this.sleepDuration} & returning ${this.returnResult} ` + `running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}". sleeping for ${this.persistedData.sleepDuration} & returning ${this.persistedData.returnResult} ` + ); + await sleepFor(this.persistedData.sleepDuration); + window.log.warn( + `${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done. returning success ` ); - await sleepFor(this.sleepDuration); - window.log.warn(`${this.jobType} with id:"${this.identifier}" done. returning success `); - return this.returnResult; + return this.persistedData.returnResult; + } + + public serializeJob(): FakeSleepForMultiJobData { + return super.serializeBase(); + } + + /** + * For the fakesleep for multi, we want to allow as many job as we want, so this returns null + */ + public addJobCheck( + _jobs: Array + ): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null { + return null; } - public serializeJob(): SerializedPersistedJob { - const fromParent = super.serializeBase(); - fromParent.sleepDuration = this.sleepDuration; - fromParent.returnResult = this.returnResult; - return fromParent; + /** + * For the MultiFakeSleep job, there are no jobs to remove if we try to add a new one of the same type. + */ + public nonRunningJobsToRemove(_jobs: Array) { + return []; } } -export class FakeSleepForJob extends Persistedjob { +export class FakeSleepForJob extends PersistedJob { constructor({ identifier, nextAttemptTimestamp, maxAttempts, currentRetry, - }: { - identifier: string | null; - nextAttemptTimestamp: number | null; - maxAttempts: number | null; - currentRetry: number; - }) { + }: Pick & + Partial>) { super({ jobType: 'FakeSleepForJobType', identifier: identifier || v4(), delayBetweenRetries: 10000, - maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3, + maxAttempts, nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000, - singleJobInQueue: true, currentRetry, + returnResult: false, + sleepDuration: 5000, }); if (process.env.NODE_APP_INSTANCE !== undefined) { throw new Error('FakeSleepForJob are only meant for testing purposes'); @@ -85,14 +90,32 @@ export class FakeSleepForJob extends Persistedjob { } public async run() { - window.log.warn(`running job ${this.jobType} with id:"${this.identifier}" `); - await sleepFor(5000); - window.log.warn(`${this.jobType} with id:"${this.identifier}" done. returning failed `); + window.log.warn( + `running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" ` + ); + await sleepFor(this.persistedData.sleepDuration); + window.log.warn( + `${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done. returning failed ` + ); return false; } - public serializeJob(): SerializedPersistedJob { - const fromParent = super.serializeBase(); - return fromParent; + public serializeJob(): FakeSleepJobData { + return super.serializeBase(); + } + + public addJobCheck( + jobs: Array + ): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null { + return this.addJobCheckSameTypePresent(jobs); + } + + /** + * For the FakeSleep job, we do not care about the jobs already in the list. + * We just never want to add a new job of that type if there is already one in the queue. + * This is done by the `addJobCheck` method above + */ + public nonRunningJobsToRemove(_jobs: Array) { + return []; } } diff --git a/ts/test/session/unit/utils/job_runner/JobRunner_test.ts b/ts/test/session/unit/utils/job_runner/JobRunner_test.ts index 5299f3738..4592596d5 100644 --- a/ts/test/session/unit/utils/job_runner/JobRunner_test.ts +++ b/ts/test/session/unit/utils/job_runner/JobRunner_test.ts @@ -7,7 +7,11 @@ import { PersistedJobRunner, } from '../../../../../session/utils/job_runners/JobRunner'; import { FakeSleepForJob, FakeSleepForMultiJob } from './FakeSleepForJob'; -import { SerializedPersistedJob } from '../../../../../session/utils/job_runners/PersistedJob'; +import { + FakeSleepForMultiJobData, + FakeSleepJobData, + TypeOfPersistedData, +} from '../../../../../session/utils/job_runners/PersistedJob'; import { sleepFor } from '../../../../../session/utils/Promise'; import { stubData } from '../../../../test-utils/utils'; @@ -21,7 +25,7 @@ function getFakeSleepForJob(timestamp: number): FakeSleepForJob { return job; } -function getFakeSleepForJobPersisted(timestamp: number): SerializedPersistedJob { +function getFakeSleepForJobPersisted(timestamp: number): FakeSleepJobData { return getFakeSleepForJob(timestamp).serializeJob(); } @@ -49,7 +53,8 @@ function getFakeSleepForMultiJob({ describe('JobRunner', () => { let getItemById: Sinon.SinonStub; let clock: Sinon.SinonFakeTimers; - let runner: PersistedJobRunner; + let runner: PersistedJobRunner; + let runnerMulti: PersistedJobRunner; let jobEventsListener: JobEventListener; beforeEach(() => { @@ -57,31 +62,40 @@ describe('JobRunner', () => { stubData('createOrUpdateItem'); clock = Sinon.useFakeTimers({ shouldAdvanceTime: true }); jobEventsListener = { - onJobDeferred: (_job: SerializedPersistedJob) => { + onJobDeferred: (_job: TypeOfPersistedData) => { // window.log.warn('listener got deferred for job ', job); }, - onJobSuccess: (_job: SerializedPersistedJob) => { + onJobSuccess: (_job: TypeOfPersistedData) => { // window.log.warn('listener got success for job ', job); }, - onJobError: (_job: SerializedPersistedJob) => { + onJobError: (_job: TypeOfPersistedData) => { // window.log.warn('listener got error for job ', job); }, - onJobStarted: (_job: SerializedPersistedJob) => { + onJobStarted: (_job: TypeOfPersistedData) => { // window.log.warn('listener got started for job ', job); }, }; - runner = new PersistedJobRunner('FakeSleepForJob', jobEventsListener); + runner = new PersistedJobRunner('FakeSleepForJob', jobEventsListener); + runnerMulti = new PersistedJobRunner( + 'FakeSleepForMultiJob', + jobEventsListener + ); }); afterEach(() => { Sinon.restore(); runner.resetForTesting(); + runnerMulti.resetForTesting(); }); describe('loadJobsFromDb', () => { - it('throw if already loaded', async () => { - await runner.loadJobsFromDb(); + it('throw if not loaded', async () => { try { + getItemById.resolves({ + id: '', + value: JSON.stringify([]), + }); + await runner.loadJobsFromDb(); throw new Error('PLOP'); // the line above should throw something else } catch (e) { @@ -154,21 +168,21 @@ describe('JobRunner', () => { }); it('can add a FakeSleepForJobMulti (sorted) even if one is already there', async () => { - await runner.loadJobsFromDb(); + await runnerMulti.loadJobsFromDb(); const job = getFakeSleepForMultiJob({ timestamp: 1234 }); const job2 = getFakeSleepForMultiJob({ timestamp: 123 }); const job3 = getFakeSleepForMultiJob({ timestamp: 1 }); - let result = await runner.addJob(job); + let result = await runnerMulti.addJob(job); expect(result).to.eq('job_deferred'); - result = await runner.addJob(job2); + result = await runnerMulti.addJob(job2); expect(result).to.eq('job_deferred'); - result = await runner.addJob(job3); + result = await runnerMulti.addJob(job3); expect(result).to.eq('job_deferred'); - expect(runner.getJobList()).to.deep.eq([ + expect(runnerMulti.getJobList()).to.deep.eq([ job3.serializeJob(), job2.serializeJob(), job.serializeJob(), @@ -176,87 +190,90 @@ describe('JobRunner', () => { }); it('cannot add a FakeSleepForJobMulti with an id already existing', async () => { - await runner.loadJobsFromDb(); + await runnerMulti.loadJobsFromDb(); const job = getFakeSleepForMultiJob({ timestamp: 1234 }); - const job2 = getFakeSleepForMultiJob({ timestamp: 123, identifier: job.identifier }); - let result = await runner.addJob(job); + const job2 = getFakeSleepForMultiJob({ + timestamp: 123, + identifier: job.persistedData.identifier, + }); + let result = await runnerMulti.addJob(job); expect(result).to.be.eq('job_deferred'); - result = await runner.addJob(job2); + result = await runnerMulti.addJob(job2); expect(result).to.be.eq('identifier_exists'); - expect(runner.getJobList()).to.deep.eq([job.serializeJob()]); + expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob()]); }); it('two jobs are running sequentially', async () => { - await runner.loadJobsFromDb(); + await runnerMulti.loadJobsFromDb(); const job = getFakeSleepForMultiJob({ timestamp: 100 }); const job2 = getFakeSleepForMultiJob({ timestamp: 200 }); - runner.startProcessing(); + runnerMulti.startProcessing(); clock.tick(110); // job should be started right away - let result = await runner.addJob(job); + let result = await runnerMulti.addJob(job); expect(result).to.eq('job_started'); - result = await runner.addJob(job2); + result = await runnerMulti.addJob(job2); expect(result).to.eq('job_deferred'); - expect(runner.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]); - expect(runner.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]); + expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]); + expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]); // each job takes 5s to finish, so let's tick once the first one should be done clock.tick(5010); - await runner.waitCurrentJob(); + await runnerMulti.waitCurrentJob(); clock.tick(5010); - await runner.waitCurrentJob(); + await runnerMulti.waitCurrentJob(); - expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]); + expect(runnerMulti.getJobList()).to.deep.eq([job2.serializeJob()]); clock.tick(5000); - await runner.waitCurrentJob(); + await runnerMulti.waitCurrentJob(); - expect(runner.getJobList()).to.deep.eq([]); + expect(runnerMulti.getJobList()).to.deep.eq([]); }); it('adding one job after the first is done starts it', async () => { - await runner.loadJobsFromDb(); + await runnerMulti.loadJobsFromDb(); const job = getFakeSleepForMultiJob({ timestamp: 100 }); const job2 = getFakeSleepForMultiJob({ timestamp: 120 }); - runner.startProcessing(); + runnerMulti.startProcessing(); clock.tick(110); // job should be started right away - let result = await runner.addJob(job); - expect(runner.getJobList()).to.deep.eq([job.serializeJob()]); + let result = await runnerMulti.addJob(job); + expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob()]); expect(result).to.eq('job_started'); clock.tick(5010); - await runner.waitCurrentJob(); + await runnerMulti.waitCurrentJob(); clock.tick(5010); - // just give some time for the runner to pick up a new job + // just give some time for the runnerMulti to pick up a new job await sleepFor(100); // the first job should already be finished now - result = await runner.addJob(job2); + result = await runnerMulti.addJob(job2); expect(result).to.eq('job_started'); - expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]); + expect(runnerMulti.getJobList()).to.deep.eq([job2.serializeJob()]); // each job takes 5s to finish, so let's tick once the first one should be done clock.tick(5010); - await runner.waitCurrentJob(); + await runnerMulti.waitCurrentJob(); - expect(runner.getJobList()).to.deep.eq([]); + expect(runnerMulti.getJobList()).to.deep.eq([]); }); it('adding one job after the first is done schedules it', async () => { - await runner.loadJobsFromDb(); + await runnerMulti.loadJobsFromDb(); const job = getFakeSleepForMultiJob({ timestamp: 100 }); - runner.startProcessing(); + runnerMulti.startProcessing(); clock.tick(110); // job should be started right away - let result = await runner.addJob(job); - expect(runner.getJobList()).to.deep.eq([job.serializeJob()]); + let result = await runnerMulti.addJob(job); + expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob()]); expect(result).to.eq('job_started'); clock.tick(5010); - await runner.waitCurrentJob(); + await runnerMulti.waitCurrentJob(); clock.tick(5010); // just give some time for the runner to pick up a new job @@ -265,10 +282,10 @@ describe('JobRunner', () => { const job2 = getFakeSleepForMultiJob({ timestamp: clock.now + 100 }); // job should already be finished now - result = await runner.addJob(job2); + result = await runnerMulti.addJob(job2); // new job should be deferred as timestamp is not in the past expect(result).to.eq('job_deferred'); - expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]); + expect(runnerMulti.getJobList()).to.deep.eq([job2.serializeJob()]); // tick enough for the job to need to be started clock.tick(100); @@ -278,9 +295,9 @@ describe('JobRunner', () => { clock.tick(5000); await job2.waitForCurrentTry(); - await runner.waitCurrentJob(); + await runnerMulti.waitCurrentJob(); - expect(runner.getJobList()).to.deep.eq([]); + expect(runnerMulti.getJobList()).to.deep.eq([]); }); }); @@ -352,27 +369,27 @@ describe('JobRunner', () => { }); it('does await if there are jobs and one is started', async () => { - await runner.loadJobsFromDb(); + await runnerMulti.loadJobsFromDb(); const job = getFakeSleepForMultiJob({ timestamp: 100, returnResult: false }); // this job keeps failing - runner.startProcessing(); + runnerMulti.startProcessing(); clock.tick(110); // job should be started right away - const result = await runner.addJob(job); - expect(runner.getJobList()).to.deep.eq([job.serializeJob()]); + const result = await runnerMulti.addJob(job); + expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob()]); expect(result).to.eq('job_started'); clock.tick(5010); - await runner.waitCurrentJob(); + await runnerMulti.waitCurrentJob(); const jobUpdated = { ...job.serializeJob(), nextAttemptTimestamp: clock.now + 10000, currentRetry: 1, }; - // just give time for the runner to pick up a new job + // just give time for the runnerMulti to pick up a new job await sleepFor(10); // the job failed, so the job should still be there - expect(runner.getJobList()).to.deep.eq([jobUpdated]); + expect(runnerMulti.getJobList()).to.deep.eq([jobUpdated]); // that job should be retried now clock.tick(11000); @@ -384,16 +401,16 @@ describe('JobRunner', () => { }; await sleepFor(10); - await runner.waitCurrentJob(); - expect(runner.getJobList()).to.deep.eq([jobUpdated2]); + await runnerMulti.waitCurrentJob(); + expect(runnerMulti.getJobList()).to.deep.eq([jobUpdated2]); // that job should be retried one more time and then removed from the list of jobs to be run clock.tick(11000); - await runner.waitCurrentJob(); + await runnerMulti.waitCurrentJob(); await sleepFor(10); - await runner.waitCurrentJob(); - expect(runner.getJobList()).to.deep.eq([]); + await runnerMulti.waitCurrentJob(); + expect(runnerMulti.getJobList()).to.deep.eq([]); }); }); });