mirror of https://github.com/oxen-io/session-ios
				
				
				
			
			You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			1907 lines
		
	
	
		
			80 KiB
		
	
	
	
		
			Swift
		
	
			
		
		
	
	
			1907 lines
		
	
	
		
			80 KiB
		
	
	
	
		
			Swift
		
	
| // Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
 | |
| //
 | |
| // stringlint:disable
 | |
| 
 | |
| import Foundation
 | |
| import GRDB
 | |
| 
 | |
| public protocol JobRunnerType {
 | |
|     // MARK: - Configuration
 | |
|     
 | |
|     func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant)
 | |
|     func canStart(queue: JobQueue?) -> Bool
 | |
|     func afterBlockingQueue(callback: @escaping () -> ())
 | |
|     func queue(for variant: Job.Variant) -> DispatchQueue?
 | |
|         
 | |
|     // MARK: - State Management
 | |
|     
 | |
|     func jobInfoFor(jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant?) -> [Int64: JobRunner.JobInfo]
 | |
|     
 | |
|     func appDidFinishLaunching(using dependencies: Dependencies)
 | |
|     func appDidBecomeActive(using dependencies: Dependencies)
 | |
|     func startNonBlockingQueues(using dependencies: Dependencies)
 | |
|     func stopAndClearPendingJobs(exceptForVariant: Job.Variant?, using dependencies: Dependencies, onComplete: (() -> ())?)
 | |
|     
 | |
|     // MARK: - Job Scheduling
 | |
|     
 | |
|     @discardableResult func add(_ db: Database, job: Job?, dependantJob: Job?, canStartJob: Bool, using dependencies: Dependencies) -> Job?
 | |
|     @discardableResult func upsert(_ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies) -> Job?
 | |
|     @discardableResult func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)?
 | |
|     func enqueueDependenciesIfNeeded(_ jobs: [Job], using dependencies: Dependencies)
 | |
|     func afterJob(_ job: Job?, state: JobRunner.JobState, callback: @escaping (JobRunner.JobResult) -> ())
 | |
|     func removePendingJob(_ job: Job?)
 | |
| }
 | |
| 
 | |
| // MARK: - JobRunnerType Convenience
 | |
| 
 | |
| public extension JobRunnerType {
 | |
|     func allJobInfo() -> [Int64: JobRunner.JobInfo] { return jobInfoFor(jobs: nil, state: .any, variant: nil) }
 | |
|     
 | |
|     func jobInfoFor(jobs: [Job]) -> [Int64: JobRunner.JobInfo] {
 | |
|         return jobInfoFor(jobs: jobs, state: .any, variant: nil)
 | |
|     }
 | |
| 
 | |
|     func jobInfoFor(jobs: [Job], state: JobRunner.JobState) -> [Int64: JobRunner.JobInfo] {
 | |
|         return jobInfoFor(jobs: jobs, state: state, variant: nil)
 | |
|     }
 | |
| 
 | |
|     func jobInfoFor(state: JobRunner.JobState) -> [Int64: JobRunner.JobInfo] {
 | |
|         return jobInfoFor(jobs: nil, state: state, variant: nil)
 | |
|     }
 | |
| 
 | |
|     func jobInfoFor(state: JobRunner.JobState, variant: Job.Variant) -> [Int64: JobRunner.JobInfo] {
 | |
|         return jobInfoFor(jobs: nil, state: state, variant: variant)
 | |
|     }
 | |
| 
 | |
|     func jobInfoFor(variant: Job.Variant) -> [Int64: JobRunner.JobInfo] {
 | |
|         return jobInfoFor(jobs: nil, state: .any, variant: variant)
 | |
|     }
 | |
|     
 | |
|     func isCurrentlyRunning(_ job: Job?) -> Bool {
 | |
|         guard let job: Job = job else { return false }
 | |
|         
 | |
|         return !jobInfoFor(jobs: [job], state: .running, variant: nil).isEmpty
 | |
|     }
 | |
|     
 | |
|     func hasJob<T: Encodable>(
 | |
|         of variant: Job.Variant? = nil,
 | |
|         inState state: JobRunner.JobState = .any,
 | |
|         with jobDetails: T
 | |
|     ) -> Bool {
 | |
|         guard
 | |
|             let detailsData: Data = try? JSONEncoder()
 | |
|                 .with(outputFormatting: .sortedKeys)    // Needed for deterministic comparison
 | |
|                 .encode(jobDetails)
 | |
|         else { return false }
 | |
|         
 | |
|         return jobInfoFor(jobs: nil, state: state, variant: variant)
 | |
|             .values
 | |
|             .contains(where: { $0.detailsData == detailsData })
 | |
|     }
 | |
|     
 | |
|     func stopAndClearPendingJobs(exceptForVariant: Job.Variant? = nil, using dependencies: Dependencies, onComplete: (() -> ())? = nil) {
 | |
|         stopAndClearPendingJobs(exceptForVariant: exceptForVariant, using: dependencies, onComplete: onComplete)
 | |
|     }
 | |
|     
 | |
|     // MARK: -- Job Scheduling
 | |
|     
 | |
|     @discardableResult func add(_ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies) -> Job? {
 | |
|         return add(db, job: job, dependantJob: nil, canStartJob: canStartJob, using: dependencies)
 | |
|     }
 | |
|     
 | |
|     func afterJob(_ job: Job?, callback: @escaping (JobRunner.JobResult) -> ()) {
 | |
|         afterJob(job, state: .any, callback: callback)
 | |
|     }
 | |
| }
 | |
| 
 | |
| // MARK: - JobExecutor
 | |
| 
 | |
| public protocol JobExecutor {
 | |
|     /// The maximum number of times the job can fail before it fails permanently
 | |
|     ///
 | |
|     /// **Note:** A value of `-1` means it will retry indefinitely
 | |
|     static var maxFailureCount: Int { get }
 | |
|     static var requiresThreadId: Bool { get }
 | |
|     static var requiresInteractionId: Bool { get }
 | |
| 
 | |
|     /// This method contains the logic needed to complete a job
 | |
|     ///
 | |
|     /// **Note:** The code in this method should run synchronously and the various
 | |
|     /// "result" blocks should not be called within a database closure
 | |
|     ///
 | |
|     /// - Parameters:
 | |
|     ///   - job: The job which is being run
 | |
|     ///   - success: The closure which is called when the job succeeds (with an
 | |
|     ///   updated `job` and a flag indicating whether the job should forcibly stop running)
 | |
|     ///   - failure: The closure which is called when the job fails (with an updated
 | |
|     ///   `job`, an `Error` (if applicable) and a flag indicating whether it was a permanent
 | |
|     ///   failure)
 | |
|     ///   - deferred: The closure which is called when the job is deferred (with an
 | |
|     ///   updated `job`)
 | |
|     static func run(
 | |
|         _ job: Job,
 | |
|         queue: DispatchQueue,
 | |
|         success: @escaping (Job, Bool, Dependencies) -> (),
 | |
|         failure: @escaping (Job, Error?, Bool, Dependencies) -> (),
 | |
|         deferred: @escaping (Job, Dependencies) -> (),
 | |
|         using dependencies: Dependencies
 | |
|     )
 | |
| }
 | |
| 
 | |
| // MARK: - JobRunner
 | |
| 
 | |
| public final class JobRunner: JobRunnerType {
 | |
|     public struct JobState: OptionSet, Hashable {
 | |
|         public let rawValue: UInt8
 | |
|         
 | |
|         public init(rawValue: UInt8) {
 | |
|             self.rawValue = rawValue
 | |
|         }
 | |
|         
 | |
|         public static let pending: JobState = JobState(rawValue: 1 << 0)
 | |
|         public static let running: JobState = JobState(rawValue: 1 << 1)
 | |
|         
 | |
|         public static let any: JobState = [ .pending, .running ]
 | |
|     }
 | |
|     
 | |
|     public enum JobResult: Equatable {
 | |
|         case succeeded
 | |
|         case failed(Error?, Bool)
 | |
|         case deferred
 | |
|         case notFound
 | |
|         
 | |
|         public static func == (lhs: JobRunner.JobResult, rhs: JobRunner.JobResult) -> Bool {
 | |
|             switch (lhs, rhs) {
 | |
|                 case (.succeeded, .succeeded): return true
 | |
|                 case (.failed(let lhsError, let lhsPermanent), .failed(let rhsError, let rhsPermanent)):
 | |
|                     return (
 | |
|                         // Not a perfect solution but should be good enough
 | |
|                         "\(lhsError ?? JobRunnerError.unknown)" == "\(rhsError ?? JobRunnerError.unknown)" &&
 | |
|                         lhsPermanent == rhsPermanent
 | |
|                     )
 | |
|                     
 | |
|                 case (.deferred, .deferred): return true
 | |
|                 default: return false
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     public struct JobInfo: Equatable, CustomDebugStringConvertible {
 | |
|         public let variant: Job.Variant
 | |
|         public let threadId: String?
 | |
|         public let interactionId: Int64?
 | |
|         public let detailsData: Data?
 | |
|         public let uniqueHashValue: Int?
 | |
|         
 | |
|         public var debugDescription: String {
 | |
|             let dataDescription: String = detailsData
 | |
|                 .map { data in "Data(hex: \(data.toHexString()), \(data.bytes.count) bytes" }
 | |
|                 .defaulting(to: "nil")
 | |
|             
 | |
|             return [
 | |
|                 "JobRunner.JobInfo(",
 | |
|                 "variant: \(variant),",
 | |
|                 " threadId: \(threadId ?? "nil"),",
 | |
|                 " interactionId: \(interactionId.map { "\($0)" } ?? "nil"),",
 | |
|                 " detailsData: \(dataDescription),",
 | |
|                 " uniqueHashValue: \(uniqueHashValue.map { "\($0)" } ?? "nil")",
 | |
|                 ")"
 | |
|             ].joined()
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     private enum Validation {
 | |
|         case enqueueOnly
 | |
|         case persist
 | |
|     }
 | |
|     
 | |
|     // MARK: - Variables
 | |
|     
 | |
|     private let allowToExecuteJobs: Bool
 | |
|     private let blockingQueue: Atomic<JobQueue?>
 | |
|     private let queues: Atomic<[Job.Variant: JobQueue]>
 | |
|     private var blockingQueueDrainCallback: Atomic<[() -> ()]> = Atomic([])
 | |
|     
 | |
|     internal var appReadyToStartQueues: Atomic<Bool> = Atomic(false)
 | |
|     internal var appHasBecomeActive: Atomic<Bool> = Atomic(false)
 | |
|     internal var perSessionJobsCompleted: Atomic<Set<Int64>> = Atomic([])
 | |
|     internal var hasCompletedInitialBecomeActive: Atomic<Bool> = Atomic(false)
 | |
|     internal var shutdownBackgroundTask: Atomic<OWSBackgroundTask?> = Atomic(nil)
 | |
|     
 | |
|     private var canStartNonBlockingQueue: Bool {
 | |
|         blockingQueue.wrappedValue?.hasStartedAtLeastOnce.wrappedValue == true &&
 | |
|         blockingQueue.wrappedValue?.isRunning.wrappedValue != true &&
 | |
|         appHasBecomeActive.wrappedValue
 | |
|     }
 | |
|     
 | |
|     // MARK: - Initialization
 | |
|     
 | |
|     init(
 | |
|         isTestingJobRunner: Bool = false,
 | |
|         variantsToExclude: [Job.Variant] = [],
 | |
|         using dependencies: Dependencies = Dependencies()
 | |
|     ) {
 | |
|         var jobVariants: Set<Job.Variant> = Job.Variant.allCases
 | |
|             .filter { !variantsToExclude.contains($0) }
 | |
|             .asSet()
 | |
|         
 | |
|         self.allowToExecuteJobs = (
 | |
|             isTestingJobRunner || (
 | |
|                 Singleton.hasAppContext &&
 | |
|                 Singleton.appContext.isMainApp &&
 | |
|                 !SNUtilitiesKit.isRunningTests
 | |
|             )
 | |
|         )
 | |
|         self.blockingQueue = Atomic(
 | |
|             JobQueue(
 | |
|                 type: .blocking,
 | |
|                 executionType: .serial,
 | |
|                 qos: .default,
 | |
|                 isTestingJobRunner: isTestingJobRunner,
 | |
|                 jobVariants: []
 | |
|             )
 | |
|         )
 | |
|         self.queues = Atomic([
 | |
|             // MARK: -- Message Send Queue
 | |
|             
 | |
|             JobQueue(
 | |
|                 type: .messageSend,
 | |
|                 executionType: .concurrent, // Allow as many jobs to run at once as supported by the device
 | |
|                 qos: .default,
 | |
|                 isTestingJobRunner: isTestingJobRunner,
 | |
|                 jobVariants: [
 | |
|                     jobVariants.remove(.attachmentUpload),
 | |
|                     jobVariants.remove(.messageSend),
 | |
|                     jobVariants.remove(.notifyPushServer),
 | |
|                     jobVariants.remove(.sendReadReceipts),
 | |
|                     jobVariants.remove(.groupLeaving),
 | |
|                     jobVariants.remove(.configurationSync)
 | |
|                 ].compactMap { $0 }
 | |
|             ),
 | |
|             
 | |
|             // MARK: -- Message Receive Queue
 | |
|             
 | |
|             JobQueue(
 | |
|                 type: .messageReceive,
 | |
|                 // Explicitly serial as executing concurrently means message receives getting processed at
 | |
|                 // different speeds which can result in:
 | |
|                 // • Small batches of messages appearing in the UI before larger batches
 | |
|                 // • Closed group messages encrypted with updated keys could start parsing before it's key
 | |
|                 //   update message has been processed (ie. guaranteed to fail)
 | |
|                 executionType: .serial,
 | |
|                 qos: .default,
 | |
|                 isTestingJobRunner: isTestingJobRunner,
 | |
|                 jobVariants: [
 | |
|                     jobVariants.remove(.messageReceive),
 | |
|                     jobVariants.remove(.configMessageReceive)
 | |
|                 ].compactMap { $0 }
 | |
|             ),
 | |
|             
 | |
|             // MARK: -- Attachment Download Queue
 | |
|             
 | |
|             JobQueue(
 | |
|                 type: .attachmentDownload,
 | |
|                 executionType: .serial,
 | |
|                 qos: .utility,
 | |
|                 isTestingJobRunner: isTestingJobRunner,
 | |
|                 jobVariants: [
 | |
|                     jobVariants.remove(.attachmentDownload)
 | |
|                 ].compactMap { $0 }
 | |
|             ),
 | |
|             
 | |
|             // MARK: -- Expiration Update Queue
 | |
|             
 | |
|             JobQueue(
 | |
|                 type: .expirationUpdate,
 | |
|                 executionType: .concurrent, // Allow as many jobs to run at once as supported by the device
 | |
|                 qos: .default,
 | |
|                 isTestingJobRunner: isTestingJobRunner,
 | |
|                 jobVariants: [
 | |
|                     jobVariants.remove(.expirationUpdate),
 | |
|                     jobVariants.remove(.getExpiration),
 | |
|                     jobVariants.remove(.disappearingMessages),
 | |
|                     jobVariants.remove(.checkForAppUpdates) // Don't want this to block other jobs
 | |
|                 ].compactMap { $0 }
 | |
|             ),
 | |
|             
 | |
|             // MARK: -- General Queue
 | |
|             
 | |
|             JobQueue(
 | |
|                 type: .general(number: 0),
 | |
|                 executionType: .serial,
 | |
|                 qos: .utility,
 | |
|                 isTestingJobRunner: isTestingJobRunner,
 | |
|                 jobVariants: Array(jobVariants)
 | |
|             )
 | |
|         ].reduce(into: [:]) { prev, next in
 | |
|             next.jobVariants.forEach { variant in
 | |
|                 prev[variant] = next
 | |
|             }
 | |
|         })
 | |
|         
 | |
|         // Now that we've finished setting up the JobRunner, update the queue closures
 | |
|         self.blockingQueue.mutate {
 | |
|             $0?.canStart = { [weak self] queue -> Bool in (self?.canStart(queue: queue) == true) }
 | |
|             $0?.onQueueDrained = { [weak self] in
 | |
|                 // Once all blocking jobs have been completed we want to start running
 | |
|                 // the remaining job queues
 | |
|                 self?.startNonBlockingQueues(using: dependencies)
 | |
|                 
 | |
|                 self?.blockingQueueDrainCallback.mutate {
 | |
|                     $0.forEach { $0() }
 | |
|                     $0 = []
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|         
 | |
|         self.queues.mutate {
 | |
|             $0.values.forEach { queue in
 | |
|                 queue.canStart = { [weak self] targetQueue -> Bool in (self?.canStart(queue: targetQueue) == true) }
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     // MARK: - Configuration
 | |
|     
 | |
|     public func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) {
 | |
|         blockingQueue.wrappedValue?.setExecutor(executor, for: variant) // The blocking queue can run any job
 | |
|         queues.wrappedValue[variant]?.setExecutor(executor, for: variant)
 | |
|     }
 | |
|     
 | |
|     public func canStart(queue: JobQueue?) -> Bool {
 | |
|         return (
 | |
|             allowToExecuteJobs &&
 | |
|             appReadyToStartQueues.wrappedValue && (
 | |
|                 queue?.type == .blocking ||
 | |
|                 canStartNonBlockingQueue
 | |
|             )
 | |
|         )
 | |
|     }
 | |
| 
 | |
|     public func afterBlockingQueue(callback: @escaping () -> ()) {
 | |
|         guard
 | |
|             (blockingQueue.wrappedValue?.hasStartedAtLeastOnce.wrappedValue != true) ||
 | |
|             (blockingQueue.wrappedValue?.isRunning.wrappedValue == true)
 | |
|         else { return callback() }
 | |
|     
 | |
|         blockingQueueDrainCallback.mutate { $0.append(callback) }
 | |
|     }
 | |
|     
 | |
|     public func queue(for variant: Job.Variant) -> DispatchQueue? {
 | |
|         return queues.wrappedValue[variant]?.targetQueue()
 | |
|     }
 | |
|     
 | |
|     // MARK: - State Management
 | |
| 
 | |
|     public func jobInfoFor(
 | |
|         jobs: [Job]?,
 | |
|         state: JobRunner.JobState,
 | |
|         variant: Job.Variant?
 | |
|     ) -> [Int64: JobRunner.JobInfo] {
 | |
|         var result: [(Int64, JobRunner.JobInfo)] = []
 | |
|         let targetKeys: [JobQueue.JobKey] = (jobs?.compactMap { JobQueue.JobKey($0) } ?? [])
 | |
|         let targetVariants: [Job.Variant] = (variant.map { [$0] } ?? jobs?.map { $0.variant })
 | |
|             .defaulting(to: [])
 | |
|         
 | |
|         // Insert the state of any pending jobs
 | |
|         if state.contains(.pending) {
 | |
|             func infoFor(queue: JobQueue?, variants: [Job.Variant]) -> [(Int64, JobRunner.JobInfo)] {
 | |
|                 return (queue?.pendingJobsQueue.wrappedValue
 | |
|                     .filter { variants.isEmpty || variants.contains($0.variant) }
 | |
|                     .compactMap { job -> (Int64, JobRunner.JobInfo)? in
 | |
|                         guard let jobKey: JobQueue.JobKey = JobQueue.JobKey(job) else { return nil }
 | |
|                         guard
 | |
|                             targetKeys.isEmpty ||
 | |
|                             targetKeys.contains(jobKey)
 | |
|                         else { return nil }
 | |
|                         
 | |
|                         return (
 | |
|                             jobKey.id,
 | |
|                             JobRunner.JobInfo(
 | |
|                                 variant: job.variant,
 | |
|                                 threadId: job.threadId,
 | |
|                                 interactionId: job.interactionId,
 | |
|                                 detailsData: job.details,
 | |
|                                 uniqueHashValue: job.uniqueHashValue
 | |
|                             )
 | |
|                         )
 | |
|                     })
 | |
|                     .defaulting(to: [])
 | |
|             }
 | |
|             
 | |
|             result.append(contentsOf: infoFor(queue: blockingQueue.wrappedValue, variants: targetVariants))
 | |
|             queues.wrappedValue
 | |
|                 .filter { key, _ -> Bool in targetVariants.isEmpty || targetVariants.contains(key) }
 | |
|                 .map { _, queue in queue }
 | |
|                 .asSet()
 | |
|                 .forEach { queue in result.append(contentsOf: infoFor(queue: queue, variants: targetVariants)) }
 | |
|         }
 | |
|         
 | |
|         // Insert the state of any running jobs
 | |
|         if state.contains(.running) {
 | |
|             func infoFor(queue: JobQueue?, variants: [Job.Variant]) -> [(Int64, JobRunner.JobInfo)] {
 | |
|                 return (queue?.infoForAllCurrentlyRunningJobs()
 | |
|                     .filter { variants.isEmpty || variants.contains($0.value.variant) }
 | |
|                     .compactMap { jobId, info -> (Int64, JobRunner.JobInfo)? in
 | |
|                         guard
 | |
|                             targetKeys.isEmpty ||
 | |
|                             targetKeys.contains(JobQueue.JobKey(id: jobId, variant: info.variant))
 | |
|                         else { return nil }
 | |
|                         
 | |
|                         return (jobId, info)
 | |
|                     })
 | |
|                     .defaulting(to: [])
 | |
|             }
 | |
|             
 | |
|             result.append(contentsOf: infoFor(queue: blockingQueue.wrappedValue, variants: targetVariants))
 | |
|             queues.wrappedValue
 | |
|                 .filter { key, _ -> Bool in targetVariants.isEmpty || targetVariants.contains(key) }
 | |
|                 .map { _, queue in queue }
 | |
|                 .asSet()
 | |
|                 .forEach { queue in result.append(contentsOf: infoFor(queue: queue, variants: targetVariants)) }
 | |
|         }
 | |
|         
 | |
|         return result
 | |
|             .reduce(into: [:]) { result, next in
 | |
|                 result[next.0] = next.1
 | |
|             }
 | |
|     }
 | |
|     
 | |
|     public func appDidFinishLaunching(using dependencies: Dependencies) {
 | |
|         // Flag that the JobRunner can start it's queues
 | |
|         appReadyToStartQueues.mutate { $0 = true }
 | |
|         
 | |
|         // Note: 'appDidBecomeActive' will run on first launch anyway so we can
 | |
|         // leave those jobs out and can wait until then to start the JobRunner
 | |
|         let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = dependencies.storage
 | |
|             .read { db in
 | |
|                 let blockingJobs: [Job] = try Job
 | |
|                     .filter(
 | |
|                         [
 | |
|                             Job.Behaviour.recurringOnLaunch,
 | |
|                             Job.Behaviour.runOnceNextLaunch
 | |
|                         ].contains(Job.Columns.behaviour)
 | |
|                     )
 | |
|                     .filter(Job.Columns.shouldBlock == true)
 | |
|                     .order(
 | |
|                         Job.Columns.priority.desc,
 | |
|                         Job.Columns.id
 | |
|                     )
 | |
|                     .fetchAll(db)
 | |
|                 let nonblockingJobs: [Job] = try Job
 | |
|                     .filter(
 | |
|                         [
 | |
|                             Job.Behaviour.recurringOnLaunch,
 | |
|                             Job.Behaviour.runOnceNextLaunch
 | |
|                         ].contains(Job.Columns.behaviour)
 | |
|                     )
 | |
|                     .filter(Job.Columns.shouldBlock == false)
 | |
|                     .order(
 | |
|                         Job.Columns.priority.desc,
 | |
|                         Job.Columns.id
 | |
|                     )
 | |
|                     .fetchAll(db)
 | |
|                 
 | |
|                 return (blockingJobs, nonblockingJobs)
 | |
|             }
 | |
|             .defaulting(to: ([], []))
 | |
|         
 | |
|         // Add and start any blocking jobs
 | |
|         blockingQueue.wrappedValue?.appDidFinishLaunching(
 | |
|             with: jobsToRun.blocking,
 | |
|             canStart: true,
 | |
|             using: dependencies
 | |
|         )
 | |
|         
 | |
|         // Add any non-blocking jobs (we don't start these incase there are blocking "on active"
 | |
|         // jobs as well)
 | |
|         let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.nonBlocking.grouped(by: \.variant)
 | |
|         let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue
 | |
|         
 | |
|         jobsByVariant.forEach { variant, jobs in
 | |
|             jobQueues[variant]?.appDidFinishLaunching(
 | |
|                 with: jobs,
 | |
|                 canStart: false,
 | |
|                 using: dependencies
 | |
|             )
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     public func appDidBecomeActive(using dependencies: Dependencies) {
 | |
|         // Flag that the JobRunner can start it's queues and start queueing non-launch jobs
 | |
|         appReadyToStartQueues.mutate { $0 = true }
 | |
|         appHasBecomeActive.mutate { $0 = true }
 | |
|         
 | |
|         // If we have a running "sutdownBackgroundTask" then we want to cancel it as otherwise it
 | |
|         // can result in the database being suspended and us being unable to interact with it at all
 | |
|         shutdownBackgroundTask.mutate {
 | |
|             $0?.cancel()
 | |
|             $0 = nil
 | |
|         }
 | |
|         
 | |
|         // Retrieve any jobs which should run when becoming active
 | |
|         let hasCompletedInitialBecomeActive: Bool = self.hasCompletedInitialBecomeActive.wrappedValue
 | |
|         let jobsToRun: [Job] = dependencies.storage
 | |
|             .read { db in
 | |
|                 return try Job
 | |
|                     .filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive)
 | |
|                     .order(
 | |
|                         Job.Columns.priority.desc,
 | |
|                         Job.Columns.id
 | |
|                     )
 | |
|                     .fetchAll(db)
 | |
|             }
 | |
|             .defaulting(to: [])
 | |
|             .filter { hasCompletedInitialBecomeActive || !$0.shouldSkipLaunchBecomeActive }
 | |
|         
 | |
|         // Store the current queue state locally to avoid multiple atomic retrievals
 | |
|         let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue
 | |
|         let blockingQueueIsRunning: Bool = (blockingQueue.wrappedValue?.isRunning.wrappedValue == true)
 | |
|         
 | |
|         guard !jobsToRun.isEmpty else {
 | |
|             if !blockingQueueIsRunning {
 | |
|                 jobQueues.map { _, queue in queue }.asSet().forEach { $0.start(using: dependencies) }
 | |
|             }
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // Add and start any non-blocking jobs (if there are no blocking jobs)
 | |
|         //
 | |
|         // We only want to trigger the queue to start once so we need to consolidate the
 | |
|         // queues to list of jobs (as queues can handle multiple job variants), this means
 | |
|         // that 'onActive' jobs will be queued before any standard jobs
 | |
|         let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.grouped(by: \.variant)
 | |
|         
 | |
|         jobQueues
 | |
|             .reduce(into: [:]) { result, variantAndQueue in
 | |
|                 result[variantAndQueue.value] = (result[variantAndQueue.value] ?? [])
 | |
|                     .appending(contentsOf: (jobsByVariant[variantAndQueue.key] ?? []))
 | |
|             }
 | |
|             .forEach { queue, jobs in
 | |
|                 queue.appDidBecomeActive(
 | |
|                     with: jobs,
 | |
|                     canStart: !blockingQueueIsRunning,
 | |
|                     using: dependencies
 | |
|                 )
 | |
|             }
 | |
|         
 | |
|         self.hasCompletedInitialBecomeActive.mutate { $0 = true }
 | |
|     }
 | |
|     
 | |
|     public func startNonBlockingQueues(using dependencies: Dependencies) {
 | |
|         queues.wrappedValue.map { _, queue in queue }.asSet().forEach { queue in
 | |
|             queue.start(using: dependencies)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     public func stopAndClearPendingJobs(
 | |
|         exceptForVariant: Job.Variant?,
 | |
|         using dependencies: Dependencies,
 | |
|         onComplete: (() -> ())?
 | |
|     ) {
 | |
|         // Inform the JobRunner that it can't start any queues (this is to prevent queues from
 | |
|         // rescheduling themselves while in the background, when the app restarts or becomes active
 | |
|         // the JobRunenr will update this flag)
 | |
|         appReadyToStartQueues.mutate { $0 = false }
 | |
|         appHasBecomeActive.mutate { $0 = false }
 | |
|         
 | |
|         // Stop all queues except for the one containing the `exceptForVariant`
 | |
|         queues.wrappedValue
 | |
|             .map { _, queue in queue }
 | |
|             .asSet()
 | |
|             .filter { queue -> Bool in
 | |
|                 guard let exceptForVariant: Job.Variant = exceptForVariant else { return true }
 | |
|                 
 | |
|                 return !queue.jobVariants.contains(exceptForVariant)
 | |
|             }
 | |
|             .forEach { $0.stopAndClearPendingJobs() }
 | |
|         
 | |
|         // Ensure the queue is actually running (if not the trigger the callback immediately)
 | |
|         guard
 | |
|             let exceptForVariant: Job.Variant = exceptForVariant,
 | |
|             let queue: JobQueue = queues.wrappedValue[exceptForVariant],
 | |
|             queue.isRunning.wrappedValue == true
 | |
|         else {
 | |
|             onComplete?()
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         let oldQueueDrained: (() -> ())? = queue.onQueueDrained
 | |
|         
 | |
|         // Create a backgroundTask to give the queue the chance to properly be drained
 | |
|         shutdownBackgroundTask.mutate {
 | |
|             $0 = OWSBackgroundTask(labelStr: #function) { [weak queue] state in
 | |
|                 // If the background task didn't succeed then trigger the onComplete (and hope we have
 | |
|                 // enough time to complete it's logic)
 | |
|                 guard state != .cancelled else {
 | |
|                     queue?.onQueueDrained = oldQueueDrained
 | |
|                     return
 | |
|                 }
 | |
|                 guard state != .success else { return }
 | |
|                 
 | |
|                 onComplete?()
 | |
|                 queue?.onQueueDrained = oldQueueDrained
 | |
|                 queue?.stopAndClearPendingJobs()
 | |
|             }
 | |
|         }
 | |
|         
 | |
|         // Add a callback to be triggered once the queue is drained
 | |
|         queue.onQueueDrained = { [weak self, weak queue] in
 | |
|             oldQueueDrained?()
 | |
|             queue?.onQueueDrained = oldQueueDrained
 | |
|             onComplete?()
 | |
|             
 | |
|             self?.shutdownBackgroundTask.mutate { $0 = nil }
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     // MARK: - Execution
 | |
|     
 | |
|     @discardableResult public func add(
 | |
|         _ db: Database,
 | |
|         job: Job?,
 | |
|         dependantJob: Job?,
 | |
|         canStartJob: Bool,
 | |
|         using dependencies: Dependencies
 | |
|     ) -> Job? {
 | |
|         guard let updatedJob: Job = validatedJob(db, job: job, validation: .persist) else { return nil }
 | |
|         
 | |
|         // If we are adding a job that's dependant on another job then create the dependency between them
 | |
|         if let jobId: Int64 = updatedJob.id, let dependantJobId: Int64 = dependantJob?.id {
 | |
|             try? JobDependencies(
 | |
|                 jobId: jobId,
 | |
|                 dependantId: dependantJobId
 | |
|             )
 | |
|             .insert(db)
 | |
|         }
 | |
|         
 | |
|         // Don't add to the queue if the JobRunner isn't ready (it's been saved to the db so it'll be loaded
 | |
|         // once the queue actually get started later)
 | |
|         guard canAddToQueue(updatedJob) else { return updatedJob }
 | |
|         
 | |
|         let jobQueue: JobQueue? = queues.wrappedValue[updatedJob.variant]
 | |
|         jobQueue?.add(db, job: updatedJob, canStartJob: canStartJob, using: dependencies)
 | |
|         
 | |
|         // Don't start the queue if the job can't be started
 | |
|         guard canStartJob else { return updatedJob }
 | |
|         
 | |
|         // Start the job runner if needed
 | |
|         db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(jobQueue?.queueContext ?? "N/A")") { _ in
 | |
|             jobQueue?.start(using: dependencies)
 | |
|         }
 | |
|         
 | |
|         return updatedJob
 | |
|     }
 | |
|     
 | |
|     public func upsert(
 | |
|         _ db: Database,
 | |
|         job: Job?,
 | |
|         canStartJob: Bool,
 | |
|         using dependencies: Dependencies
 | |
|     ) -> Job? {
 | |
|         guard let job: Job = job else { return nil }    // Ignore null jobs
 | |
|         guard job.id != nil else {
 | |
|             // When we upsert a job that should be unique we want to return the existing job (if it exists)
 | |
|             switch job.uniqueHashValue {
 | |
|                 case .none: return add(db, job: job, canStartJob: canStartJob, using: dependencies)
 | |
|                 case .some:
 | |
|                     let existingJob: Job? = try? Job
 | |
|                         .filter(Job.Columns.variant == job.variant)
 | |
|                         .filter(Job.Columns.uniqueHashValue == job.uniqueHashValue)
 | |
|                         .fetchOne(db)
 | |
|                     
 | |
|                     return (existingJob ?? add(db, job: job, canStartJob: canStartJob, using: dependencies))
 | |
|             }
 | |
|         }
 | |
|         guard let updatedJob: Job = validatedJob(db, job: job, validation: .enqueueOnly) else { return nil }
 | |
|         
 | |
|         // Don't add to the queue if the JobRunner isn't ready (it's been saved to the db so it'll be loaded
 | |
|         // once the queue actually get started later)
 | |
|         guard canAddToQueue(updatedJob) else { return updatedJob }
 | |
|         
 | |
|         let jobQueue: JobQueue? = queues.wrappedValue[updatedJob.variant]
 | |
|         jobQueue?.upsert(db, job: updatedJob, canStartJob: canStartJob, using: dependencies)
 | |
|         
 | |
|         // Don't start the queue if the job can't be started
 | |
|         guard canStartJob else { return updatedJob }
 | |
|         
 | |
|         // Start the job runner if needed
 | |
|         db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(jobQueue?.queueContext ?? "N/A")") { _ in
 | |
|             jobQueue?.start(using: dependencies)
 | |
|         }
 | |
|         
 | |
|         return updatedJob
 | |
|     }
 | |
|     
 | |
|     @discardableResult public func insert(
 | |
|         _ db: Database,
 | |
|         job: Job?,
 | |
|         before otherJob: Job
 | |
|     ) -> (Int64, Job)? {
 | |
|         switch job?.behaviour {
 | |
|             case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch:
 | |
|                 SNLog("[JobRunner] Attempted to insert \(job?.variant) job before the current one even though it's behaviour is \(job?.behaviour)")
 | |
|                 return nil
 | |
|                 
 | |
|             default: break
 | |
|         }
 | |
|         
 | |
|         guard
 | |
|             let updatedJob: Job = validatedJob(db, job: job, validation: .persist),
 | |
|             let jobId: Int64 = updatedJob.id
 | |
|         else { return nil }
 | |
|         
 | |
|         queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob)
 | |
|         
 | |
|         return (jobId, updatedJob)
 | |
|     }
 | |
|     
 | |
|     /// Job dependencies can be quite messy as they might already be running or scheduled on different queues from the related job, this could result
 | |
|     /// in some odd inter-dependencies between the JobQueues. Instead of this we want all jobs to run on their original assigned queues (so the
 | |
|     /// concurrency rules remain consistent and easy to reason with), the only downside to this approach is serial queues could potentially be blocked
 | |
|     /// waiting on unrelated dependencies to be run as this method will insert jobs at the start of the `pendingJobsQueue`
 | |
|     public func enqueueDependenciesIfNeeded(_ jobs: [Job], using dependencies: Dependencies) {
 | |
|         /// Do nothing if we weren't given any jobs
 | |
|         guard !jobs.isEmpty else { return }
 | |
|         
 | |
|         /// Ignore any dependencies which are already running or scheduled
 | |
|         let dependencyJobQueues: Set<JobQueue> = jobs
 | |
|             .compactMap { queues.wrappedValue[$0.variant] }
 | |
|             .asSet()
 | |
|         let allCurrentlyRunningJobIds: [Int64] = dependencyJobQueues
 | |
|             .flatMap { $0.currentlyRunningJobIds.wrappedValue }
 | |
|         let jobsToEnqueue: [JobQueue: [Job]] = jobs
 | |
|             .compactMap { job in job.id.map { ($0, job) } }
 | |
|             .filter { jobId, _ in !allCurrentlyRunningJobIds.contains(jobId) }
 | |
|             .compactMap { _, job in queues.wrappedValue[job.variant].map { (job, $0) } }
 | |
|             .grouped(by: { _, queue in queue })
 | |
|             .mapValues { data in data.map { job, _ in job } }
 | |
|         
 | |
|         /// Regardless of whether the jobs are dependant jobs or dependencies we want them to be moved to the start of the
 | |
|         /// `pendingJobsQueue` because at least one job in the job chain has been triggered so we want to try to complete
 | |
|         /// the entire job chain rather than worry about deadlocks between different job chains
 | |
|         ///
 | |
|         /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be
 | |
|         /// removed from the queue, replaced by their dependencies
 | |
|         jobsToEnqueue.forEach { queue, jobs in
 | |
|             queue.pendingJobsQueue.mutate { pendingJobs in
 | |
|                 pendingJobs = pendingJobs
 | |
|                     .filter { !jobs.contains($0) }
 | |
|                     .inserting(contentsOf: jobs, at: 0)
 | |
|             }
 | |
|             
 | |
|             // Start the job queue if needed (might be a different queue from the currently executing one)
 | |
|             queue.start(using: dependencies)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     public func afterJob(_ job: Job?, state: JobRunner.JobState, callback: @escaping (JobResult) -> ()) {
 | |
|         guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else {
 | |
|             callback(.notFound)
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         queue.afterJob(jobId, state: state, callback: callback)
 | |
|     }
 | |
|     
 | |
|     public func removePendingJob(_ job: Job?) {
 | |
|         guard let job: Job = job, let jobId: Int64 = job.id else { return }
 | |
|         
 | |
|         queues.wrappedValue[job.variant]?.removePendingJob(jobId)
 | |
|     }
 | |
|     
 | |
|     // MARK: - Convenience
 | |
| 
 | |
|     fileprivate static func getRetryInterval(for job: Job) -> TimeInterval {
 | |
|         // Arbitrary backoff factor...
 | |
|         // try  1 delay: 0.5s
 | |
|         // try  2 delay: 1s
 | |
|         // ...
 | |
|         // try  5 delay: 16s
 | |
|         // ...
 | |
|         // try 11 delay: 512s
 | |
|         let maxBackoff: Double = 10 * 60 // 10 minutes
 | |
|         return 0.25 * min(maxBackoff, pow(2, Double(job.failureCount)))
 | |
|     }
 | |
|     
 | |
|     fileprivate func canAddToQueue(_ job: Job) -> Bool {
 | |
|         // We can only start the job if it's an "on launch" job or the app has become active
 | |
|         return (
 | |
|             job.behaviour == .runOnceNextLaunch ||
 | |
|             job.behaviour == .recurringOnLaunch ||
 | |
|             appHasBecomeActive.wrappedValue
 | |
|         )
 | |
|     }
 | |
|     
 | |
|     private func validatedJob(_ db: Database, job: Job?, validation: Validation) -> Job? {
 | |
|         guard let job: Job = job else { return nil }
 | |
|         
 | |
|         switch (validation, job.uniqueHashValue) {
 | |
|             case (.enqueueOnly, .none): return job
 | |
|             case (.enqueueOnly, .some(let uniqueHashValue)):
 | |
|                 // Nothing currently running or sitting in a JobQueue
 | |
|                 guard !allJobInfo().contains(where: { _, info -> Bool in info.uniqueHashValue == uniqueHashValue }) else {
 | |
|                     SNLog("[JobRunner] Unable to add \(job.variant) job due to unique constraint")
 | |
|                     return nil
 | |
|                 }
 | |
|                 
 | |
|                 return job
 | |
|                 
 | |
|             case (.persist, .some(let uniqueHashValue)):
 | |
|                 guard
 | |
|                     // Nothing currently running or sitting in a JobQueue
 | |
|                     !allJobInfo().contains(where: { _, info -> Bool in info.uniqueHashValue == uniqueHashValue }) &&
 | |
|                     // Nothing in the database
 | |
|                     !Job.filter(Job.Columns.uniqueHashValue == uniqueHashValue).isNotEmpty(db)
 | |
|                 else {
 | |
|                     SNLog("[JobRunner] Unable to add \(job.variant) job due to unique constraint")
 | |
|                     return nil
 | |
|                 }
 | |
|                 
 | |
|                 fallthrough // Validation passed so try to persist the job
 | |
|                 
 | |
|             case (.persist, .none):
 | |
|                 guard let updatedJob: Job = try? job.inserted(db), updatedJob.id != nil else {
 | |
|                     SNLog("[JobRunner] Unable to add \(job.variant) job\(job.id == nil ? " due to missing id" : "")")
 | |
|                     return nil
 | |
|                 }
 | |
|                 
 | |
|                 return updatedJob
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| // MARK: - JobQueue
 | |
| 
 | |
| public final class JobQueue: Hashable {
 | |
|     fileprivate enum QueueType: Hashable {
 | |
|         case blocking
 | |
|         case general(number: Int)
 | |
|         case messageSend
 | |
|         case messageReceive
 | |
|         case attachmentDownload
 | |
|         case expirationUpdate
 | |
|         
 | |
|         var name: String {
 | |
|             switch self {
 | |
|                 case .blocking: return "Blocking"
 | |
|                 case .general(let number): return "General-\(number)"
 | |
|                 case .messageSend: return "MessageSend"
 | |
|                 case .messageReceive: return "MessageReceive"
 | |
|                 case .attachmentDownload: return "AttachmentDownload"
 | |
|                 case .expirationUpdate: return "ExpirationUpdate"
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     fileprivate enum ExecutionType {
 | |
|         /// A serial queue will execute one job at a time until the queue is empty, then will load any new/deferred
 | |
|         /// jobs and run those one at a time
 | |
|         case serial
 | |
|         
 | |
|         /// A concurrent queue will execute as many jobs as the device supports at once until the queue is empty,
 | |
|         /// then will load any new/deferred jobs and try to start them all
 | |
|         case concurrent
 | |
|     }
 | |
|     
 | |
|     private class Trigger {
 | |
|         private var timer: Timer?
 | |
|         fileprivate var fireTimestamp: TimeInterval = 0
 | |
|         
 | |
|         static func create(
 | |
|             queue: JobQueue,
 | |
|             timestamp: TimeInterval,
 | |
|             using dependencies: Dependencies
 | |
|         ) -> Trigger? {
 | |
|             /// Setup the trigger (wait at least 1 second before triggering)
 | |
|             ///
 | |
|             /// **Note:** We use the `Timer.scheduledTimerOnMainThread` method because running a timer
 | |
|             /// on our random queue threads results in the timer never firing, the `start` method will redirect itself to
 | |
|             /// the correct thread
 | |
|             let trigger: Trigger = Trigger()
 | |
|             trigger.fireTimestamp = max(1, (timestamp - dependencies.dateNow.timeIntervalSince1970))
 | |
|             trigger.timer = Timer.scheduledTimerOnMainThread(
 | |
|                 withTimeInterval: trigger.fireTimestamp,
 | |
|                 repeats: false,
 | |
|                 using: dependencies,
 | |
|                 block: { [weak queue] _ in
 | |
|                     queue?.start(forceWhenAlreadyRunning: (queue?.executionType == .concurrent), using: dependencies)
 | |
|                 }
 | |
|             )
 | |
|             return trigger
 | |
|         }
 | |
|         
 | |
|         func invalidate() {
 | |
|             // Need to do this to prevent a strong reference cycle
 | |
|             timer?.invalidate()
 | |
|             timer = nil
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     fileprivate struct JobKey: Equatable, Hashable {
 | |
|         fileprivate let id: Int64
 | |
|         fileprivate let variant: Job.Variant
 | |
|         
 | |
|         fileprivate init(id: Int64, variant: Job.Variant) {
 | |
|             self.id = id
 | |
|             self.variant = variant
 | |
|         }
 | |
|         
 | |
|         fileprivate init?(_ job: Job?) {
 | |
|             guard let id: Int64 = job?.id, let variant: Job.Variant = job?.variant else { return nil }
 | |
|             
 | |
|             self.id = id
 | |
|             self.variant = variant
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     private static let deferralLoopThreshold: Int = 3
 | |
|     
 | |
|     private let id: UUID = UUID()
 | |
|     fileprivate let type: QueueType
 | |
|     private let executionType: ExecutionType
 | |
|     private let qosClass: DispatchQoS
 | |
|     private let queueKey: DispatchSpecificKey = DispatchSpecificKey<String>()
 | |
|     fileprivate let queueContext: String
 | |
|     fileprivate let jobVariants: [Job.Variant]
 | |
|     
 | |
|     private lazy var internalQueue: DispatchQueue = {
 | |
|         let result: DispatchQueue = DispatchQueue(
 | |
|             label: self.queueContext,
 | |
|             qos: self.qosClass,
 | |
|             attributes: (self.executionType == .concurrent ? [.concurrent] : []),
 | |
|             autoreleaseFrequency: .inherit,
 | |
|             target: nil
 | |
|         )
 | |
|         result.setSpecific(key: queueKey, value: queueContext)
 | |
|         
 | |
|         return result
 | |
|     }()
 | |
|     
 | |
|     private var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:])
 | |
|     fileprivate var canStart: ((JobQueue?) -> Bool)?
 | |
|     fileprivate var onQueueDrained: (() -> ())?
 | |
|     fileprivate var hasStartedAtLeastOnce: Atomic<Bool> = Atomic(false)
 | |
|     fileprivate var isRunning: Atomic<Bool> = Atomic(false)
 | |
|     fileprivate var pendingJobsQueue: Atomic<[Job]> = Atomic([])
 | |
|     
 | |
|     private var nextTrigger: Atomic<Trigger?> = Atomic(nil)
 | |
|     fileprivate var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:])
 | |
|     fileprivate var currentlyRunningJobIds: Atomic<Set<Int64>> = Atomic([])
 | |
|     private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:])
 | |
|     private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
 | |
|     private let maxDeferralsPerSecond: Int
 | |
|     
 | |
|     fileprivate var hasPendingJobs: Bool { !pendingJobsQueue.wrappedValue.isEmpty }
 | |
|     
 | |
|     // MARK: - Initialization
 | |
|     
 | |
|     fileprivate init(
 | |
|         type: QueueType,
 | |
|         executionType: ExecutionType,
 | |
|         qos: DispatchQoS,
 | |
|         isTestingJobRunner: Bool,
 | |
|         jobVariants: [Job.Variant]
 | |
|     ) {
 | |
|         self.type = type
 | |
|         self.executionType = executionType
 | |
|         self.queueContext = "JobQueue-\(type.name)"
 | |
|         self.qosClass = qos
 | |
|         self.maxDeferralsPerSecond = (isTestingJobRunner ? 10 : 1)  // Allow for tripping the defer loop in tests
 | |
|         self.jobVariants = jobVariants
 | |
|     }
 | |
|     
 | |
|     // MARK: - Hashable
 | |
|     
 | |
|     public func hash(into hasher: inout Hasher) {
 | |
|         id.hash(into: &hasher)
 | |
|     }
 | |
|     
 | |
|     public static func == (lhs: JobQueue, rhs: JobQueue) -> Bool {
 | |
|         return (lhs.id == rhs.id)
 | |
|     }
 | |
|     
 | |
|     // MARK: - Configuration
 | |
|     
 | |
|     fileprivate func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) {
 | |
|         executorMap.mutate { $0[variant] = executor }
 | |
|     }
 | |
|     
 | |
|     // MARK: - Execution
 | |
|     
 | |
|     fileprivate func targetQueue() -> DispatchQueue {
 | |
|         /// As it turns out Combine doesn't play too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to
 | |
|         /// the queue which means an odd situation can occasionally occur where the `finished` event can actually run before the `output`
 | |
|         /// event - this can result in unexpected behaviours (for more information see https://github.com/groue/GRDB.swift/issues/1334)
 | |
|         ///
 | |
|         /// Due to this if a job is meant to run on a concurrent queue then we actually want to create a temporary serial queue just for the execution
 | |
|         /// of that job
 | |
|         guard executionType == .concurrent else { return internalQueue }
 | |
|         
 | |
|         return DispatchQueue(
 | |
|             label: "\(self.queueContext)-serial",
 | |
|             qos: self.qosClass,
 | |
|             attributes: [],
 | |
|             autoreleaseFrequency: .inherit,
 | |
|             target: nil
 | |
|         )
 | |
|     }
 | |
| 
 | |
|     fileprivate func add(
 | |
|         _ db: Database,
 | |
|         job: Job,
 | |
|         canStartJob: Bool,
 | |
|         using dependencies: Dependencies
 | |
|     ) {
 | |
|         // Check if the job should be added to the queue
 | |
|         guard
 | |
|             canStartJob,
 | |
|             job.behaviour != .runOnceNextLaunch,
 | |
|             job.nextRunTimestamp <= dependencies.dateNow.timeIntervalSince1970
 | |
|         else { return }
 | |
|         guard job.id != nil else {
 | |
|             SNLog("[JobRunner] Prevented attempt to add \(job.variant) job without id to queue")
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         pendingJobsQueue.mutate { $0.append(job) }
 | |
|         
 | |
|         // If this is a concurrent queue then we should immediately start the next job
 | |
|         guard executionType == .concurrent else { return }
 | |
|         
 | |
|         // Ensure that the database commit has completed and then trigger the next job to run (need
 | |
|         // to ensure any interactions have been correctly inserted first)
 | |
|         db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Add: \(job.variant)") { [weak self] _ in
 | |
|             self?.runNextJob(using: dependencies)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
 | |
|     /// the JobRunner
 | |
|     ///
 | |
|     /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
 | |
|     /// is in the future then the job won't be started
 | |
|     fileprivate func upsert(
 | |
|         _ db: Database,
 | |
|         job: Job,
 | |
|         canStartJob: Bool,
 | |
|         using dependencies: Dependencies
 | |
|     ) {
 | |
|         guard let jobId: Int64 = job.id else {
 | |
|             SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue")
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // Lock the pendingJobsQueue while checking the index and inserting to ensure we don't run into
 | |
|         // any multi-threading shenanigans
 | |
|         //
 | |
|         // Note: currently running jobs are removed from the pendingJobsQueue so we don't need to check
 | |
|         // the 'jobsCurrentlyRunning' set
 | |
|         var didUpdateExistingJob: Bool = false
 | |
|         
 | |
|         pendingJobsQueue.mutate { queue in
 | |
|             if let jobIndex: Array<Job>.Index = queue.firstIndex(where: { $0.id == jobId }) {
 | |
|                 queue[jobIndex] = job
 | |
|                 didUpdateExistingJob = true
 | |
|             }
 | |
|         }
 | |
|         
 | |
|         // If we didn't update an existing job then we need to add it to the pendingJobsQueue
 | |
|         guard !didUpdateExistingJob else { return }
 | |
|         
 | |
|         add(db, job: job, canStartJob: canStartJob, using: dependencies)
 | |
|     }
 | |
|     
 | |
|     fileprivate func insert(_ job: Job, before otherJob: Job) {
 | |
|         guard job.id != nil else {
 | |
|             SNLog("[JobRunner] Prevented attempt to insert \(job.variant) job without id to queue")
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // Insert the job before the current job (re-adding the current job to
 | |
|         // the start of the pendingJobsQueue if it's not in there) - this will mean the new
 | |
|         // job will run and then the otherJob will run (or run again) once it's
 | |
|         // done
 | |
|         pendingJobsQueue.mutate {
 | |
|             guard let otherJobIndex: Int = $0.firstIndex(of: otherJob) else {
 | |
|                 $0.insert(contentsOf: [job, otherJob], at: 0)
 | |
|                 return
 | |
|             }
 | |
|             
 | |
|             $0.insert(job, at: otherJobIndex)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     fileprivate func appDidFinishLaunching(
 | |
|         with jobs: [Job],
 | |
|         canStart: Bool,
 | |
|         using dependencies: Dependencies
 | |
|     ) {
 | |
|         pendingJobsQueue.mutate { $0.append(contentsOf: jobs) }
 | |
|         
 | |
|         // Start the job runner if needed
 | |
|         if canStart && !isRunning.wrappedValue {
 | |
|             start(using: dependencies)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     fileprivate func appDidBecomeActive(
 | |
|         with jobs: [Job],
 | |
|         canStart: Bool,
 | |
|         using dependencies: Dependencies
 | |
|     ) {
 | |
|         let currentlyRunningJobIds: Set<Int64> = currentlyRunningJobIds.wrappedValue
 | |
|         
 | |
|         pendingJobsQueue.mutate { queue in
 | |
|             // Avoid re-adding jobs to the queue that are already in it (this can
 | |
|             // happen if the user sends the app to the background before the 'onActive'
 | |
|             // jobs and then brings it back to the foreground)
 | |
|             let jobsNotAlreadyInQueue: [Job] = jobs
 | |
|                 .filter { job in
 | |
|                     !currentlyRunningJobIds.contains(job.id ?? -1) &&
 | |
|                     !queue.contains(where: { $0.id == job.id })
 | |
|                 }
 | |
|             
 | |
|             queue.append(contentsOf: jobsNotAlreadyInQueue)
 | |
|         }
 | |
|         
 | |
|         // Start the job runner if needed
 | |
|         if canStart && !isRunning.wrappedValue {
 | |
|             start(using: dependencies)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     fileprivate func infoForAllCurrentlyRunningJobs() -> [Int64: JobRunner.JobInfo] {
 | |
|         return currentlyRunningJobInfo.wrappedValue
 | |
|     }
 | |
|     
 | |
|     fileprivate func afterJob(_ jobId: Int64, state: JobRunner.JobState, callback: @escaping (JobRunner.JobResult) -> ()) {
 | |
|         /// Check if the current job state matches the requested state (if not then the job in the requested state can't be found so stop here)
 | |
|         switch (state, currentlyRunningJobIds.wrappedValue.contains(jobId)) {
 | |
|             case (.running, false): return callback(.notFound)
 | |
|             case (.pending, true): return callback(.notFound)
 | |
|             default: break
 | |
|         }
 | |
|         
 | |
|         jobCallbacks.mutate { jobCallbacks in
 | |
|             jobCallbacks[jobId] = (jobCallbacks[jobId] ?? []).appending(callback)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     fileprivate func hasPendingOrRunningJobWith(
 | |
|         threadId: String? = nil,
 | |
|         interactionId: Int64? = nil,
 | |
|         detailsData: Data? = nil
 | |
|     ) -> Bool {
 | |
|         let pendingJobs: [Job] = pendingJobsQueue.wrappedValue
 | |
|         let currentlyRunningJobInfo: [Int64: JobRunner.JobInfo] = currentlyRunningJobInfo.wrappedValue
 | |
|         var possibleJobIds: Set<Int64> = Set(currentlyRunningJobInfo.keys)
 | |
|             .inserting(contentsOf: pendingJobs.compactMap { $0.id }.asSet())
 | |
|         
 | |
|         // Remove any which don't have the matching threadId (if provided)
 | |
|         if let targetThreadId: String = threadId {
 | |
|             let pendingJobIdsWithWrongThreadId: Set<Int64> = pendingJobs
 | |
|                 .filter { $0.threadId != targetThreadId }
 | |
|                 .compactMap { $0.id }
 | |
|                 .asSet()
 | |
|             let runningJobIdsWithWrongThreadId: Set<Int64> = currentlyRunningJobInfo
 | |
|                 .filter { _, info -> Bool in info.threadId != targetThreadId }
 | |
|                 .map { key, _ in key }
 | |
|                 .asSet()
 | |
|             
 | |
|             possibleJobIds = possibleJobIds
 | |
|                 .subtracting(pendingJobIdsWithWrongThreadId)
 | |
|                 .subtracting(runningJobIdsWithWrongThreadId)
 | |
|         }
 | |
|         
 | |
|         // Remove any which don't have the matching interactionId (if provided)
 | |
|         if let targetInteractionId: Int64 = interactionId {
 | |
|             let pendingJobIdsWithWrongInteractionId: Set<Int64> = pendingJobs
 | |
|                 .filter { $0.interactionId != targetInteractionId }
 | |
|                 .compactMap { $0.id }
 | |
|                 .asSet()
 | |
|             let runningJobIdsWithWrongInteractionId: Set<Int64> = currentlyRunningJobInfo
 | |
|                 .filter { _, info -> Bool in info.interactionId != targetInteractionId }
 | |
|                 .map { key, _ in key }
 | |
|                 .asSet()
 | |
|             
 | |
|             possibleJobIds = possibleJobIds
 | |
|                 .subtracting(pendingJobIdsWithWrongInteractionId)
 | |
|                 .subtracting(runningJobIdsWithWrongInteractionId)
 | |
|         }
 | |
|         
 | |
|         // Remove any which don't have the matching details (if provided)
 | |
|         if let targetDetailsData: Data = detailsData {
 | |
|             let pendingJobIdsWithWrongDetailsData: Set<Int64> = pendingJobs
 | |
|                 .filter { $0.details != targetDetailsData }
 | |
|                 .compactMap { $0.id }
 | |
|                 .asSet()
 | |
|             let runningJobIdsWithWrongDetailsData: Set<Int64> = currentlyRunningJobInfo
 | |
|                 .filter { _, info -> Bool in info.detailsData != detailsData }
 | |
|                 .map { key, _ in key }
 | |
|                 .asSet()
 | |
|             
 | |
|             possibleJobIds = possibleJobIds
 | |
|                 .subtracting(pendingJobIdsWithWrongDetailsData)
 | |
|                 .subtracting(runningJobIdsWithWrongDetailsData)
 | |
|         }
 | |
|         
 | |
|         return !possibleJobIds.isEmpty
 | |
|     }
 | |
|     
 | |
|     fileprivate func removePendingJob(_ jobId: Int64) {
 | |
|         pendingJobsQueue.mutate { queue in
 | |
|             queue = queue.filter { $0.id != jobId }
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     // MARK: - Job Running
 | |
|     
 | |
|     fileprivate func start(
 | |
|         forceWhenAlreadyRunning: Bool = false,
 | |
|         using dependencies: Dependencies
 | |
|     ) {
 | |
|         // Only start if the JobRunner is allowed to start the queue
 | |
|         guard canStart?(self) == true else { return }
 | |
|         guard forceWhenAlreadyRunning || !isRunning.wrappedValue else { return }
 | |
|         
 | |
|         // The JobRunner runs synchronously we need to ensure this doesn't start
 | |
|         // on the main thread (if it is on the main thread then swap to a different thread)
 | |
|         guard DispatchQueue.with(key: queueKey, matches: queueContext, using: dependencies) else {
 | |
|             internalQueue.async(using: dependencies) { [weak self] in
 | |
|                 self?.start(forceWhenAlreadyRunning: forceWhenAlreadyRunning, using: dependencies)
 | |
|             }
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // Flag the JobRunner as running (to prevent something else from trying to start it
 | |
|         // and messing with the execution behaviour)
 | |
|         var wasAlreadyRunning: Bool = false
 | |
|         isRunning.mutate { isRunning in
 | |
|             wasAlreadyRunning = isRunning
 | |
|             isRunning = true
 | |
|         }
 | |
|         hasStartedAtLeastOnce.mutate { $0 = true }
 | |
|         
 | |
|         // Get any pending jobs
 | |
|         
 | |
|         let jobVariants: [Job.Variant] = self.jobVariants
 | |
|         let jobIdsAlreadyRunning: Set<Int64> = currentlyRunningJobIds.wrappedValue
 | |
|         let jobsAlreadyInQueue: Set<Int64> = pendingJobsQueue.wrappedValue.compactMap { $0.id }.asSet()
 | |
|         let jobsToRun: [Job] = dependencies.storage.read(using: dependencies) { db in
 | |
|             try Job
 | |
|                 .filterPendingJobs(
 | |
|                     variants: jobVariants,
 | |
|                     excludeFutureJobs: true,
 | |
|                     includeJobsWithDependencies: false
 | |
|                 )
 | |
|                 .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running
 | |
|                 .filter(!jobsAlreadyInQueue.contains(Job.Columns.id))   // Exclude jobs already in the queue
 | |
|                 .fetchAll(db)
 | |
|         }
 | |
|         .defaulting(to: [])
 | |
|         
 | |
|         // Determine the number of jobs to run
 | |
|         var jobCount: Int = 0
 | |
|         
 | |
|         pendingJobsQueue.mutate { queue in
 | |
|             queue.append(contentsOf: jobsToRun)
 | |
|             jobCount = queue.count
 | |
|         }
 | |
|         
 | |
|         // If there are no pending jobs and nothing in the queue then schedule the JobRunner
 | |
|         // to start again when the next scheduled job should start
 | |
|         guard jobCount > 0 else {
 | |
|             if jobIdsAlreadyRunning.isEmpty {
 | |
|                 isRunning.mutate { $0 = false }
 | |
|                 scheduleNextSoonestJob(using: dependencies)
 | |
|             }
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // Run the first job in the pendingJobsQueue
 | |
|         if !wasAlreadyRunning {
 | |
|             Log.info("[JobRunner] Starting \(queueContext) with (\(jobCount) job\(jobCount != 1 ? "s" : ""))", silenceForTests: true)
 | |
|         }
 | |
|         runNextJob(using: dependencies)
 | |
|     }
 | |
|     
 | |
|     fileprivate func stopAndClearPendingJobs() {
 | |
|         isRunning.mutate { $0 = false }
 | |
|         pendingJobsQueue.mutate { $0 = [] }
 | |
|         deferLoopTracker.mutate { $0 = [:] }
 | |
|     }
 | |
|     
 | |
|     private func runNextJob(using dependencies: Dependencies) {
 | |
|         // Ensure the queue is running (if we've stopped the queue then we shouldn't start the next job)
 | |
|         guard isRunning.wrappedValue else { return }
 | |
|         
 | |
|         // Ensure this is running on the correct queue
 | |
|         guard DispatchQueue.with(key: queueKey, matches: queueContext, using: dependencies) else {
 | |
|             internalQueue.async(using: dependencies) { [weak self] in
 | |
|                 self?.runNextJob(using: dependencies)
 | |
|             }
 | |
|             return
 | |
|         }
 | |
|         guard executionType == .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty else {
 | |
|             return SNLog("[JobRunner] \(queueContext) Ignoring 'runNextJob' due to currently running job in serial queue")
 | |
|         }
 | |
|         guard let (nextJob, numJobsRemaining): (Job, Int) = pendingJobsQueue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else {
 | |
|             // If it's a serial queue, or there are no more jobs running then update the 'isRunning' flag
 | |
|             if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
 | |
|                 isRunning.mutate { $0 = false }
 | |
|             }
 | |
|             
 | |
|             // Always attempt to schedule the next soonest job (otherwise if enough jobs get started in rapid
 | |
|             // succession then pending/failed jobs in the database may never get re-started in a concurrent queue)
 | |
|             scheduleNextSoonestJob(using: dependencies)
 | |
|             return
 | |
|         }
 | |
|         guard let jobExecutor: JobExecutor.Type = executorMap.wrappedValue[nextJob.variant] else {
 | |
|             SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing executor")
 | |
|             handleJobFailed(
 | |
|                 nextJob,
 | |
|                 error: JobRunnerError.executorMissing,
 | |
|                 permanentFailure: true,
 | |
|                 using: dependencies
 | |
|             )
 | |
|             return
 | |
|         }
 | |
|         guard !jobExecutor.requiresThreadId || nextJob.threadId != nil else {
 | |
|             SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required threadId")
 | |
|             handleJobFailed(
 | |
|                 nextJob,
 | |
|                 error: JobRunnerError.requiredThreadIdMissing,
 | |
|                 permanentFailure: true,
 | |
|                 using: dependencies
 | |
|             )
 | |
|             return
 | |
|         }
 | |
|         guard !jobExecutor.requiresInteractionId || nextJob.interactionId != nil else {
 | |
|             SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required interactionId")
 | |
|             handleJobFailed(
 | |
|                 nextJob,
 | |
|                 error: JobRunnerError.requiredInteractionIdMissing,
 | |
|                 permanentFailure: true,
 | |
|                 using: dependencies
 | |
|             )
 | |
|             return
 | |
|         }
 | |
|         guard nextJob.id != nil else {
 | |
|             SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing id")
 | |
|             handleJobFailed(
 | |
|                 nextJob,
 | |
|                 error: JobRunnerError.jobIdMissing,
 | |
|                 permanentFailure: false,
 | |
|                 using: dependencies
 | |
|             )
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // If the 'nextRunTimestamp' for the job is in the future then don't run it yet
 | |
|         guard nextJob.nextRunTimestamp <= dependencies.dateNow.timeIntervalSince1970 else {
 | |
|             handleJobDeferred(nextJob, using: dependencies)
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // Check if the next job has any dependencies
 | |
|         let dependencyInfo: (expectedCount: Int, jobs: Set<Job>) = dependencies.storage.read(using: dependencies) { db in
 | |
|             let expectedDependencies: Set<JobDependencies> = try JobDependencies
 | |
|                 .filter(JobDependencies.Columns.jobId == nextJob.id)
 | |
|                 .fetchSet(db)
 | |
|             let jobDependencies: Set<Job> = try Job
 | |
|                 .filter(ids: expectedDependencies.compactMap { $0.dependantId })
 | |
|                 .fetchSet(db)
 | |
|             
 | |
|             return (expectedDependencies.count, jobDependencies)
 | |
|         }
 | |
|         .defaulting(to: (0, []))
 | |
|         
 | |
|         guard dependencyInfo.jobs.count == dependencyInfo.expectedCount else {
 | |
|             SNLog("[JobRunner] \(queueContext) Removing \(nextJob.variant) job due to missing dependencies")
 | |
|             handleJobFailed(
 | |
|                 nextJob,
 | |
|                 error: JobRunnerError.missingDependencies,
 | |
|                 permanentFailure: true,
 | |
|                 using: dependencies
 | |
|             )
 | |
|             return
 | |
|         }
 | |
|         guard dependencyInfo.jobs.isEmpty else {
 | |
|             SNLog("[JobRunner] \(queueContext) Deferring \(nextJob.variant) job until \(dependencyInfo.jobs.count) dependenc\(dependencyInfo.jobs.count == 1 ? "y" : "ies") are completed")
 | |
|                         
 | |
|             // Enqueue the dependencies then defer the current job
 | |
|             dependencies.jobRunner.enqueueDependenciesIfNeeded(
 | |
|                 Array(dependencyInfo.jobs),
 | |
|                 using: dependencies
 | |
|             )
 | |
|             handleJobDeferred(nextJob, using: dependencies)
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // Update the state to indicate the particular job is running
 | |
|         //
 | |
|         // Note: We need to store 'numJobsRemaining' in it's own variable because
 | |
|         // the 'SNLog' seems to dispatch to it's own queue which ends up getting
 | |
|         // blocked by the JobRunner's queue becuase 'jobQueue' is Atomic
 | |
|         var numJobsRunning: Int = 0
 | |
|         nextTrigger.mutate { trigger in
 | |
|             trigger?.invalidate()   // Need to invalidate to prevent a memory leak
 | |
|             trigger = nil
 | |
|         }
 | |
|         currentlyRunningJobIds.mutate { currentlyRunningJobIds in
 | |
|             currentlyRunningJobIds = currentlyRunningJobIds.inserting(nextJob.id)
 | |
|             numJobsRunning = currentlyRunningJobIds.count
 | |
|         }
 | |
|         currentlyRunningJobInfo.mutate { currentlyRunningJobInfo in
 | |
|             currentlyRunningJobInfo = currentlyRunningJobInfo.setting(
 | |
|                 nextJob.id,
 | |
|                 JobRunner.JobInfo(
 | |
|                     variant: nextJob.variant,
 | |
|                     threadId: nextJob.threadId,
 | |
|                     interactionId: nextJob.interactionId,
 | |
|                     detailsData: nextJob.details,
 | |
|                     uniqueHashValue: nextJob.uniqueHashValue
 | |
|                 )
 | |
|             )
 | |
|         }
 | |
|         SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)")
 | |
|         
 | |
|         jobExecutor.run(
 | |
|             nextJob,
 | |
|             queue: targetQueue(),
 | |
|             success: handleJobSucceeded,
 | |
|             failure: handleJobFailed,
 | |
|             deferred: handleJobDeferred,
 | |
|             using: dependencies
 | |
|         )
 | |
|         
 | |
|         // If this queue executes concurrently and there are still jobs remaining then immediately attempt
 | |
|         // to start the next job
 | |
|         if executionType == .concurrent && numJobsRemaining > 0 {
 | |
|             internalQueue.async(using: dependencies) { [weak self] in
 | |
|                 self?.runNextJob(using: dependencies)
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     private func scheduleNextSoonestJob(using dependencies: Dependencies) {
 | |
|         // Retrieve any pending jobs from the database
 | |
|         let jobVariants: [Job.Variant] = self.jobVariants
 | |
|         let jobIdsAlreadyRunning: Set<Int64> = currentlyRunningJobIds.wrappedValue
 | |
|         let nextJobTimestamp: TimeInterval? = dependencies.storage.read(using: dependencies) { db in
 | |
|             try Job
 | |
|                 .filterPendingJobs(
 | |
|                     variants: jobVariants,
 | |
|                     excludeFutureJobs: false,
 | |
|                     includeJobsWithDependencies: false
 | |
|                 )
 | |
|                 .select(.nextRunTimestamp)
 | |
|                 .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running
 | |
|                 .asRequest(of: TimeInterval.self)
 | |
|                 .fetchOne(db)
 | |
|         }
 | |
|         
 | |
|         // If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger
 | |
|         // the 'onQueueDrained' callback and stop
 | |
|         guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, canStart?(self) == true else {
 | |
|             if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
 | |
|                 self.onQueueDrained?()
 | |
|             }
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // If the next job isn't scheduled in the future then just restart the JobRunner immediately
 | |
|         let secondsUntilNextJob: TimeInterval = (nextJobTimestamp - dependencies.dateNow.timeIntervalSince1970)
 | |
|         
 | |
|         guard secondsUntilNextJob > 0 else {
 | |
|             // Only log that the queue is getting restarted if this queue had actually been about to stop
 | |
|             if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
 | |
|                 let timingString: String = (nextJobTimestamp == 0 ?
 | |
|                     "that should be in the queue" :
 | |
|                     "scheduled \(.seconds(secondsUntilNextJob), unit: .s) ago"
 | |
|                 )
 | |
|                 SNLog("[JobRunner] Restarting \(queueContext) immediately for job \(timingString)")
 | |
|             }
 | |
|             
 | |
|             // Trigger the 'start' function to load in any pending jobs that aren't already in the
 | |
|             // queue (for concurrent queues we want to force them to load in pending jobs and add
 | |
|             // them to the queue regardless of whether the queue is already running)
 | |
|             internalQueue.async(using: dependencies) { [weak self] in
 | |
|                 self?.start(forceWhenAlreadyRunning: (self?.executionType == .concurrent), using: dependencies)
 | |
|             }
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // Only schedule a trigger if the queue is concurrent, or it has actually completed
 | |
|         guard executionType == .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty else { return }
 | |
|         
 | |
|         // Setup a trigger
 | |
|         SNLog("[JobRunner] Stopping \(queueContext) until next job in \(.seconds(secondsUntilNextJob), unit: .s)")
 | |
|         nextTrigger.mutate { trigger in
 | |
|             trigger?.invalidate()   // Need to invalidate the old trigger to prevent a memory leak
 | |
|             trigger = Trigger.create(queue: self, timestamp: nextJobTimestamp, using: dependencies)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     // MARK: - Handling Results
 | |
| 
 | |
|     /// This function is called when a job succeeds
 | |
|     private func handleJobSucceeded(
 | |
|         _ job: Job,
 | |
|         shouldStop: Bool,
 | |
|         using dependencies: Dependencies
 | |
|     ) {
 | |
|         /// Retrieve the dependant jobs first (the `JobDependecies` table has cascading deletion when the original `Job` is
 | |
|         /// removed so we need to retrieve these records before that happens)
 | |
|         let dependantJobs: [Job] = dependencies.storage
 | |
|             .read(using: dependencies) { db in try job.dependantJobs.fetchAll(db) }
 | |
|             .defaulting(to: [])
 | |
|         
 | |
|         switch job.behaviour {
 | |
|             case .runOnce, .runOnceNextLaunch:
 | |
|                 dependencies.storage.write(using: dependencies) { db in
 | |
|                     /// Since this job has been completed we can update the dependencies so other job that were dependant
 | |
|                     /// on this one can be run
 | |
|                     _ = try JobDependencies
 | |
|                         .filter(JobDependencies.Columns.dependantId == job.id)
 | |
|                         .deleteAll(db)
 | |
|                     
 | |
|                     _ = try job.delete(db)
 | |
|                 }
 | |
|                 
 | |
|             case .recurring where shouldStop == true:
 | |
|                 dependencies.storage.write(using: dependencies) { db in
 | |
|                     /// Since this job has been completed we can update the dependencies so other job that were dependant
 | |
|                     /// on this one can be run
 | |
|                     _ = try JobDependencies
 | |
|                         .filter(JobDependencies.Columns.dependantId == job.id)
 | |
|                         .deleteAll(db)
 | |
|                     
 | |
|                     _ = try job.delete(db)
 | |
|                 }
 | |
|                 
 | |
|             /// For `recurring` jobs which have already run, they should automatically run again but we want at least 1 second
 | |
|             /// to pass before doing so - the job itself should really update it's own `nextRunTimestamp` (this is just a safety net)
 | |
|             case .recurring where job.nextRunTimestamp <= dependencies.dateNow.timeIntervalSince1970:
 | |
|                 guard let jobId: Int64 = job.id else { break }
 | |
|                 
 | |
|                 dependencies.storage.write(using: dependencies) { db in
 | |
|                     _ = try Job
 | |
|                         .filter(id: jobId)
 | |
|                         .updateAll(
 | |
|                             db,
 | |
|                             Job.Columns.failureCount.set(to: 0),
 | |
|                             Job.Columns.nextRunTimestamp.set(to: (dependencies.dateNow.timeIntervalSince1970 + 1))
 | |
|                         )
 | |
|                 }
 | |
|                 
 | |
|             /// For `recurringOnLaunch/Active` jobs which have already run but failed once, we need to clear their
 | |
|             /// `failureCount` and `nextRunTimestamp` to prevent them from endlessly running over and over again
 | |
|             case .recurringOnLaunch, .recurringOnActive:
 | |
|                 guard
 | |
|                     let jobId: Int64 = job.id,
 | |
|                     job.failureCount != 0 &&
 | |
|                     job.nextRunTimestamp > TimeInterval.leastNonzeroMagnitude
 | |
|                 else { break }
 | |
|                 
 | |
|                 dependencies.storage.write(using: dependencies) { db in
 | |
|                     _ = try Job
 | |
|                         .filter(id: jobId)
 | |
|                         .updateAll(
 | |
|                             db,
 | |
|                             Job.Columns.failureCount.set(to: 0),
 | |
|                             Job.Columns.nextRunTimestamp.set(to: 0)
 | |
|                         )
 | |
|                 }
 | |
|             
 | |
|             default: break
 | |
|         }
 | |
|         
 | |
|         /// Now that the job has been completed we want to enqueue any jobs that were dependant on it
 | |
|         dependencies.jobRunner.enqueueDependenciesIfNeeded(
 | |
|             dependantJobs,
 | |
|             using: dependencies
 | |
|         )
 | |
|         
 | |
|         // Perform job cleanup and start the next job
 | |
|         performCleanUp(for: job, result: .succeeded, using: dependencies)
 | |
|         internalQueue.async(using: dependencies) { [weak self] in
 | |
|             self?.runNextJob(using: dependencies)
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /// This function is called when a job fails, if it's wasn't a permanent failure then the 'failureCount' for the job will be incremented and it'll
 | |
|     /// be re-run after a retry interval has passed
 | |
|     private func handleJobFailed(
 | |
|         _ job: Job,
 | |
|         error: Error?,
 | |
|         permanentFailure: Bool,
 | |
|         using dependencies: Dependencies
 | |
|     ) {
 | |
|         guard dependencies.storage.read(using: dependencies, { db in try Job.exists(db, id: job.id ?? -1) }) == true else {
 | |
|             SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled")
 | |
|             performCleanUp(for: job, result: .failed(error, permanentFailure), using: dependencies)
 | |
|             
 | |
|             internalQueue.async(using: dependencies) { [weak self] in
 | |
|                 self?.runNextJob(using: dependencies)
 | |
|             }
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // If this is the blocking queue and a "blocking" job failed then rerun it
 | |
|         // immediately (in this case we don't trigger any job callbacks because the
 | |
|         // job isn't actually done, it's going to try again immediately)
 | |
|         if self.type == .blocking && job.shouldBlock {
 | |
|             SNLog("[JobRunner] \(queueContext) \(job.variant) job failed due to error: \(error ?? JobRunnerError.unknown); retrying immediately")
 | |
|             
 | |
|             // If it was a possible deferral loop then we don't actually want to
 | |
|             // retry the job (even if it's a blocking one, this gives a small chance
 | |
|             // that the app could continue to function)
 | |
|             let wasPossibleDeferralLoop: Bool = {
 | |
|                 if let error = error, case JobRunnerError.possibleDeferralLoop = error { return true }
 | |
|                 
 | |
|                 return false
 | |
|             }()
 | |
|             performCleanUp(
 | |
|                 for: job,
 | |
|                 result: .failed(error, permanentFailure),
 | |
|                 shouldTriggerCallbacks: wasPossibleDeferralLoop,
 | |
|                 using: dependencies
 | |
|             )
 | |
|             
 | |
|             // Only add it back to the queue if it wasn't a deferral loop
 | |
|             if !wasPossibleDeferralLoop {
 | |
|                 pendingJobsQueue.mutate { $0.insert(job, at: 0) }
 | |
|             }
 | |
|             
 | |
|             internalQueue.async(using: dependencies) { [weak self] in
 | |
|                 self?.runNextJob(using: dependencies)
 | |
|             }
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         // Get the max failure count for the job (a value of '-1' means it will retry indefinitely)
 | |
|         let maxFailureCount: Int = (executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0)
 | |
|         let nextRunTimestamp: TimeInterval = (dependencies.dateNow.timeIntervalSince1970 + JobRunner.getRetryInterval(for: job))
 | |
|         var dependantJobIds: [Int64] = []
 | |
|         var failureText: String = "failed due to error: \(error ?? JobRunnerError.unknown)"
 | |
|         
 | |
|         dependencies.storage.write(using: dependencies) { db in
 | |
|             /// Retrieve a list of dependant jobs so we can clear them from the queue
 | |
|             dependantJobIds = try job.dependantJobs
 | |
|                 .select(.id)
 | |
|                 .asRequest(of: Int64.self)
 | |
|                 .fetchAll(db)
 | |
| 
 | |
|             /// Delete/update the failed jobs and any dependencies
 | |
|             let updatedFailureCount: UInt = (job.failureCount + 1)
 | |
|         
 | |
|             guard
 | |
|                 !permanentFailure && (
 | |
|                     maxFailureCount < 0 ||
 | |
|                     updatedFailureCount <= maxFailureCount
 | |
|                 )
 | |
|             else {
 | |
|                 failureText = (maxFailureCount >= 0 && updatedFailureCount > maxFailureCount ?
 | |
|                     "failed permanently due to error: \(error ?? JobRunnerError.unknown); too many retries" :
 | |
|                     "failed permanently due to error: \(error ?? JobRunnerError.unknown)"
 | |
|                 )
 | |
|                 
 | |
|                 // If the job permanently failed or we have performed all of our retry attempts
 | |
|                 // then delete the job and all of it's dependant jobs (it'll probably never succeed)
 | |
|                 _ = try job.dependantJobs
 | |
|                     .deleteAll(db)
 | |
| 
 | |
|                 _ = try job.delete(db)
 | |
|                 return
 | |
|             }
 | |
|             
 | |
|             failureText = "failed due to error: \(error ?? JobRunnerError.unknown); scheduling retry (failure count is \(updatedFailureCount))"
 | |
|             
 | |
|             try job
 | |
|                 .with(
 | |
|                     failureCount: updatedFailureCount,
 | |
|                     nextRunTimestamp: nextRunTimestamp
 | |
|                 )
 | |
|                 .upserted(db)
 | |
|             
 | |
|             // Update the failureCount and nextRunTimestamp on dependant jobs as well (update the
 | |
|             // 'nextRunTimestamp' value to be 1ms later so when the queue gets regenerated they'll
 | |
|             // come after the dependency)
 | |
|             try job.dependantJobs
 | |
|                 .updateAll(
 | |
|                     db,
 | |
|                     Job.Columns.failureCount.set(to: updatedFailureCount),
 | |
|                     Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000)))
 | |
|                 )
 | |
|         }
 | |
|         
 | |
|         /// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try
 | |
|         /// to run a deleted job or get stuck in a loop of trying to run dependencies indefinitely)
 | |
|         if !dependantJobIds.isEmpty {
 | |
|             pendingJobsQueue.mutate { queue in
 | |
|                 queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
 | |
|             }
 | |
|         }
 | |
|         
 | |
|         SNLog("[JobRunner] \(queueContext) \(job.variant) job \(failureText)")
 | |
|         performCleanUp(for: job, result: .failed(error, permanentFailure), using: dependencies)
 | |
|         internalQueue.async(using: dependencies) { [weak self] in
 | |
|             self?.runNextJob(using: dependencies)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     /// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant
 | |
|     /// on other jobs, and it should automatically manage those dependencies)
 | |
|     public func handleJobDeferred(
 | |
|         _ job: Job,
 | |
|         using dependencies: Dependencies
 | |
|     ) {
 | |
|         var stuckInDeferLoop: Bool = false
 | |
|         
 | |
|         deferLoopTracker.mutate {
 | |
|             guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else {
 | |
|                 $0 = $0.setting(
 | |
|                     job.id,
 | |
|                     (1, [dependencies.dateNow.timeIntervalSince1970])
 | |
|                 )
 | |
|                 return
 | |
|             }
 | |
|             
 | |
|             let timeNow: TimeInterval = dependencies.dateNow.timeIntervalSince1970
 | |
|             stuckInDeferLoop = (
 | |
|                 lastRecord.count >= JobQueue.deferralLoopThreshold &&
 | |
|                 (timeNow - lastRecord.times[0]) < CGFloat(lastRecord.count * maxDeferralsPerSecond)
 | |
|             )
 | |
|             
 | |
|             $0 = $0.setting(
 | |
|                 job.id,
 | |
|                 (
 | |
|                     lastRecord.count + 1,
 | |
|                     // Only store the last 'deferralLoopThreshold' times to ensure we aren't running faster
 | |
|                     // than one loop per second
 | |
|                     lastRecord.times.suffix(JobQueue.deferralLoopThreshold - 1) + [timeNow]
 | |
|                 )
 | |
|             )
 | |
|         }
 | |
|         
 | |
|         // It's possible (by introducing bugs) to create a loop where a Job tries to run and immediately
 | |
|         // defers itself but then attempts to run again (resulting in an infinite loop); this won't block
 | |
|         // the app since it's on a background thread but can result in 100% of a CPU being used (and a
 | |
|         // battery drain)
 | |
|         //
 | |
|         // This code will maintain an in-memory store for any jobs which are deferred too quickly (ie.
 | |
|         // more than 'deferralLoopThreshold' times within 'deferralLoopThreshold' seconds)
 | |
|         guard !stuckInDeferLoop else {
 | |
|             deferLoopTracker.mutate { $0 = $0.removingValue(forKey: job.id) }
 | |
|             handleJobFailed(
 | |
|                 job,
 | |
|                 error: JobRunnerError.possibleDeferralLoop,
 | |
|                 permanentFailure: false,
 | |
|                 using: dependencies
 | |
|             )
 | |
|             return
 | |
|         }
 | |
|         
 | |
|         performCleanUp(for: job, result: .deferred, using: dependencies)
 | |
|         internalQueue.async(using: dependencies) { [weak self] in
 | |
|             self?.runNextJob(using: dependencies)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     private func performCleanUp(
 | |
|         for job: Job,
 | |
|         result: JobRunner.JobResult,
 | |
|         shouldTriggerCallbacks: Bool = true,
 | |
|         using dependencies: Dependencies
 | |
|     ) {
 | |
|         // The job is removed from the queue before it runs so all we need to to is remove it
 | |
|         // from the 'currentlyRunning' set
 | |
|         currentlyRunningJobIds.mutate { $0 = $0.removing(job.id) }
 | |
|         currentlyRunningJobInfo.mutate { $0 = $0.removingValue(forKey: job.id) }
 | |
|         
 | |
|         guard shouldTriggerCallbacks else { return }
 | |
|         
 | |
|         // Run any job callbacks now that it's done
 | |
|         var jobCallbacksToRun: [(JobRunner.JobResult) -> ()] = []
 | |
|         jobCallbacks.mutate { jobCallbacks in
 | |
|             jobCallbacksToRun = (jobCallbacks[job.id] ?? [])
 | |
|             jobCallbacks = jobCallbacks.removingValue(forKey: job.id)
 | |
|         }
 | |
|         
 | |
|         DispatchQueue.global(qos: .default).async(using: dependencies) {
 | |
|             jobCallbacksToRun.forEach { $0(result) }
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| // MARK: - Formatting
 | |
| 
 | |
| extension String.StringInterpolation {
 | |
|     mutating func appendInterpolation(_ variant: Job.Variant?) {
 | |
|         appendLiteral(variant.map { "\($0)" } ?? "unknown") // stringlint:disable
 | |
|     }
 | |
|     
 | |
|     mutating func appendInterpolation(_ behaviour: Job.Behaviour?) {
 | |
|         appendLiteral(behaviour.map { "\($0)" } ?? "unknown") // stringlint:disable
 | |
|     }
 | |
| }
 | |
| 
 | |
| // MARK: - JobRunner Singleton
 | |
| // FIXME: Remove this once the jobRunner is dependency injected everywhere correctly
 | |
| public extension JobRunner {
 | |
|     internal static let instance: JobRunner = JobRunner()
 | |
|     
 | |
|     // MARK: - Static Access
 | |
|     
 | |
|     static func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) {
 | |
|         instance.setExecutor(executor, for: variant)
 | |
|     }
 | |
|     
 | |
|     static func appDidFinishLaunching(using dependencies: Dependencies = Dependencies()) {
 | |
|         instance.appDidFinishLaunching(using: dependencies)
 | |
|     }
 | |
|     
 | |
|     static func appDidBecomeActive(using dependencies: Dependencies = Dependencies()) {
 | |
|         instance.appDidBecomeActive(using: dependencies)
 | |
|     }
 | |
|     
 | |
|     static func afterBlockingQueue(callback: @escaping () -> ()) {
 | |
|         instance.afterBlockingQueue(callback: callback)
 | |
|     }
 | |
|     
 | |
|     /// Add a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
 | |
|     /// the JobRunner
 | |
|     ///
 | |
|     /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
 | |
|     /// is in the future then the job won't be started
 | |
|     static func add(
 | |
|         _ db: Database,
 | |
|         job: Job?,
 | |
|         canStartJob: Bool = true,
 | |
|         using dependencies: Dependencies = Dependencies()
 | |
|     ) { instance.add(db, job: job, canStartJob: canStartJob, using: dependencies) }
 | |
|     
 | |
|     /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
 | |
|     /// the JobRunner
 | |
|     ///
 | |
|     /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
 | |
|     /// is in the future then the job won't be started
 | |
|     @discardableResult static func upsert(
 | |
|         _ db: Database,
 | |
|         job: Job?,
 | |
|         canStartJob: Bool = true,
 | |
|         using dependencies: Dependencies = Dependencies()
 | |
|     ) -> Job? { return instance.upsert(db, job: job, canStartJob: canStartJob, using: dependencies) }
 | |
|     
 | |
|     @discardableResult static func insert(
 | |
|         _ db: Database,
 | |
|         job: Job?,
 | |
|         before otherJob: Job
 | |
|     ) -> (Int64, Job)? { instance.insert(db, job: job, before: otherJob) }
 | |
|     
 | |
|     /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run
 | |
|     /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their
 | |
|     /// failure - they _should_ be picked up again the next time the app is launched)
 | |
|     static func stopAndClearPendingJobs(
 | |
|         exceptForVariant: Job.Variant? = nil,
 | |
|         using dependencies: Dependencies,
 | |
|         onComplete: (() -> ())? = nil
 | |
|     ) { 
 | |
|         instance.stopAndClearPendingJobs(exceptForVariant: exceptForVariant, using: dependencies, onComplete: onComplete)
 | |
|     }
 | |
|     
 | |
|     static func isCurrentlyRunning(_ job: Job?) -> Bool {
 | |
|         return instance.isCurrentlyRunning(job)
 | |
|     }
 | |
|     
 | |
|     static func afterJob(_ job: Job?, state: JobState = .any, callback: @escaping (JobResult) -> ()) {
 | |
|         instance.afterJob(job, state: state, callback: callback)
 | |
|     }
 | |
|     
 | |
|     static func removePendingJob(_ job: Job?) {
 | |
|         instance.removePendingJob(job)
 | |
|     }
 | |
| }
 |