Updated the JobRunner 'afterJob' function to work via Combine instead of closures

pull/894/head
Morgan Pretty 5 months ago
parent 38dad27969
commit 5d5a32af8e

@ -327,15 +327,20 @@ public extension ConfigurationSyncJob {
success: { _, _ in resolver(Result.success(())) },
failure: { _, error, _ in resolver(Result.failure(error)) },
deferred: { job in
dependencies[singleton: .jobRunner].afterJob(job) { result in
switch result {
/// If it gets deferred a second time then we should probably just fail - no use waiting on something
/// that may never run (also means we can avoid another potential defer loop)
case .notFound, .deferred: resolver(Result.failure(NetworkError.unknown))
case .failed(let error, _): resolver(Result.failure(error))
case .succeeded: resolver(Result.success(()))
}
}
dependencies[singleton: .jobRunner]
.afterJob(job)
.first()
.sinkUntilComplete(
receiveValue: { result in
switch result {
/// If it gets deferred a second time then we should probably just fail - no use waiting on something
/// that may never run (also means we can avoid another potential defer loop)
case .notFound, .deferred: resolver(Result.failure(NetworkError.unknown))
case .failed(let error, _): resolver(Result.failure(error))
case .succeeded: resolver(Result.success(()))
}
}
)
},
using: dependencies
)

@ -3,6 +3,7 @@
// stringlint:disable
import Foundation
import Combine
import GRDB
// MARK: - Singleton
@ -50,7 +51,7 @@ public protocol JobRunnerType {
@discardableResult func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)?
func enqueueDependenciesIfNeeded(_ jobs: [Job])
func manuallyTriggerResult(_ job: Job?, result: JobRunner.JobResult)
func afterJob(_ job: Job?, state: JobRunner.JobState, callback: @escaping (JobRunner.JobResult) -> ())
func afterJob(_ job: Job?, state: JobRunner.JobState) -> AnyPublisher<JobRunner.JobResult, Never>
func removePendingJob(_ job: Job?)
}
@ -111,8 +112,8 @@ public extension JobRunnerType {
return add(db, job: job, dependantJob: nil, canStartJob: canStartJob)
}
func afterJob(_ job: Job?, callback: @escaping (JobRunner.JobResult) -> ()) {
afterJob(job, state: .any, callback: callback)
func afterJob(_ job: Job?) -> AnyPublisher<JobRunner.JobResult, Never> {
return afterJob(job, state: .any)
}
}
@ -859,13 +860,12 @@ public final class JobRunner: JobRunnerType {
}
}
public func afterJob(_ job: Job?, state: JobRunner.JobState, callback: @escaping (JobResult) -> ()) {
public func afterJob(_ job: Job?, state: JobRunner.JobState) -> AnyPublisher<JobRunner.JobResult, Never> {
guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else {
callback(.notFound)
return
return Just(.notFound).eraseToAnyPublisher()
}
queue.afterJob(jobId, state: state, callback: callback)
return queue.afterJob(jobId, state: state)
}
public func removePendingJob(_ job: Job?) {
@ -1054,11 +1054,11 @@ public final class JobQueue: Hashable {
fileprivate var isRunningInBackgroundTask: Atomic<Bool> = Atomic(false)
private var nextTrigger: Atomic<Trigger?> = Atomic(nil)
fileprivate var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:])
fileprivate var currentlyRunningJobIds: Atomic<Set<Int64>> = Atomic([])
private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:])
fileprivate var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
private let maxDeferralsPerSecond: Int
private let jobCompletedSubject: PassthroughSubject<(Int64?, JobRunner.JobResult), Never> = PassthroughSubject()
fileprivate var hasPendingJobs: Bool { !pendingJobsQueue.wrappedValue.isEmpty }
@ -1235,17 +1235,18 @@ public final class JobQueue: Hashable {
return currentlyRunningJobInfo.wrappedValue
}
fileprivate func afterJob(_ jobId: Int64, state: JobRunner.JobState, callback: @escaping (JobRunner.JobResult) -> ()) {
fileprivate func afterJob(_ jobId: Int64, state: JobRunner.JobState) -> AnyPublisher<JobRunner.JobResult, Never> {
/// Check if the current job state matches the requested state (if not then the job in the requested state can't be found so stop here)
switch (state, currentlyRunningJobIds.wrappedValue.contains(jobId)) {
case (.running, false): return callback(.notFound)
case (.pending, true): return callback(.notFound)
case (.running, false): return Just(.notFound).eraseToAnyPublisher()
case (.pending, true): return Just(.notFound).eraseToAnyPublisher()
default: break
}
jobCallbacks.mutate { jobCallbacks in
jobCallbacks[jobId] = (jobCallbacks[jobId] ?? []).appending(callback)
}
return jobCompletedSubject
.filter { $0.0 == jobId }
.map { $0.1 }
.eraseToAnyPublisher()
}
fileprivate func hasPendingOrRunningJobWith(
@ -1857,16 +1858,8 @@ public final class JobQueue: Hashable {
guard shouldTriggerCallbacks else { return }
// Run any job callbacks now that it's done
var jobCallbacksToRun: [(JobRunner.JobResult) -> ()] = []
jobCallbacks.mutate { jobCallbacks in
jobCallbacksToRun = (jobCallbacks[job.id] ?? [])
jobCallbacks = jobCallbacks.removingValue(forKey: job.id)
}
DispatchQueue.global(qos: .default).async(using: dependencies) {
jobCallbacksToRun.forEach { $0(result) }
}
// Notify any listeners of the job result
jobCompletedSubject.send((job.id, result))
}
}

Loading…
Cancel
Save