// C o p y r i g h t © 2 0 2 2 R a n g e p r o o f P t y L t d . A l l r i g h t s r e s e r v e d .
import Foundation
import GRDB
import SignalCoreKit
public protocol JobExecutor {
// / T h e m a x i m u m n u m b e r o f t i m e s t h e j o b c a n f a i l b e f o r e i t f a i l s p e r m a n e n t l y
// /
// / * * N o t e : * * A v a l u e o f ` - 1 ` m e a n s i t w i l l r e t r y i n d e f i n i t e l y
static var maxFailureCount : Int { get }
static var requiresThreadId : Bool { get }
static var requiresInteractionId : Bool { get }
// / T h i s m e t h o d c o n t a i n s t h e l o g i c n e e d e d t o c o m p l e t e a j o b
// /
// / * * N o t e : * * T h e c o d e i n t h i s m e t h o d s h o u l d r u n s y n c h r o n o u s l y a n d t h e v a r i o u s
// / " r e s u l t " b l o c k s s h o u l d n o t b e c a l l e d w i t h i n a d a t a b a s e c l o s u r e
// /
// / - P a r a m e t e r s :
// / - j o b : T h e j o b w h i c h i s b e i n g r u n
// / - s u c c e s s : T h e c l o s u r e w h i c h i s c a l l e d w h e n t h e j o b s u c c e e d s ( w i t h a n
// / u p d a t e d ` j o b ` a n d a f l a g i n d i c a t i n g w h e t h e r t h e j o b s h o u l d f o r c i b l y s t o p r u n n i n g )
// / - f a i l u r e : T h e c l o s u r e w h i c h i s c a l l e d w h e n t h e j o b f a i l s ( w i t h a n u p d a t e d
// / ` j o b ` , a n ` E r r o r ` ( i f a p p l i c a b l e ) a n d a f l a g i n d i c a t i n g w h e t h e r i t w a s a p e r m a n e n t
// / f a i l u r e )
// / - d e f e r r e d : T h e c l o s u r e w h i c h i s c a l l e d w h e n t h e j o b i s d e f e r r e d ( w i t h a n
// / u p d a t e d ` j o b ` )
static func run (
_ job : Job ,
success : @ escaping ( Job , Bool ) -> ( ) ,
failure : @ escaping ( Job , Error ? , Bool ) -> ( ) ,
deferred : @ escaping ( Job ) -> ( )
)
}
public final class JobRunner {
private class Trigger {
private var timer : Timer ?
static func create ( timestamp : TimeInterval ) -> Trigger ? {
// S e t u p t h e t r i g g e r ( w a i t a t l e a s t 1 s e c o n d b e f o r e t r i g g e r i n g )
let trigger : Trigger = Trigger ( )
trigger . timer = Timer . scheduledTimer (
timeInterval : max ( 1 , ( timestamp - Date ( ) . timeIntervalSince1970 ) ) ,
target : self ,
selector : #selector ( start ) ,
userInfo : nil ,
repeats : false
)
return trigger
}
deinit { timer ? . invalidate ( ) }
@objc func start ( ) {
JobRunner . start ( )
}
}
// TODO: C o u l d t h i s b e a b o t t l e n e c k ? ( s i n g l e s e r i a l q u e u e t o p r o c e s s a l l t h e s e j o b s ? G r o u p b y t h r e a d ? ) .
// TODO: M u l t i - t h r e a d s u p p o r t .
private static let queueKey : DispatchSpecificKey = DispatchSpecificKey < String > ( )
private static let queueContext : String = " JobRunner "
private static let internalQueue : DispatchQueue = {
let result : DispatchQueue = DispatchQueue ( label : queueContext )
result . setSpecific ( key : queueKey , value : queueContext )
return result
} ( )
internal static var executorMap : Atomic < [ Job . Variant : JobExecutor . Type ] > = Atomic ( [ : ] )
private static var nextTrigger : Atomic < Trigger ? > = Atomic ( nil )
private static var isRunning : Atomic < Bool > = Atomic ( false )
private static var jobQueue : Atomic < [ Job ] > = Atomic ( [ ] )
private static var jobsCurrentlyRunning : Atomic < Set < Int64 > > = Atomic ( [ ] )
private static var perSessionJobsCompleted : Atomic < Set < Int64 > > = Atomic ( [ ] )
// MARK: - C o n f i g u r a t i o n
public static func add ( executor : JobExecutor . Type , for variant : Job . Variant ) {
executorMap . mutate { $0 [ variant ] = executor }
}
// MARK: - E x e c u t i o n
// / A d d a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
public static func add ( _ db : Database , job : Job ? , canStartJob : Bool = true ) {
// S t o r e t h e j o b i n t o t h e d a t a b a s e ( g e t t i n g a n i d f o r i t )
guard let updatedJob : Job = try ? job ? . inserted ( db ) else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return
}
// C h e c k i f t h e j o b s h o u l d b e a d d e d t o t h e q u e u e
guard
canStartJob ,
updatedJob . behaviour != . runOnceNextLaunch ,
updatedJob . nextRunTimestamp <= Date ( ) . timeIntervalSince1970
else { return }
jobQueue . mutate { $0 . append ( updatedJob ) }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionCommit { _ in
if ! isRunning . wrappedValue {
start ( )
}
}
}
// / U p s e r t a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
public static func upsert ( _ db : Database , job : Job ? , canStartJob : Bool = true ) {
guard let job : Job = job else { return } // I g n o r e n u l l j o b s
guard let jobId : Int64 = job . id else {
add ( db , job : job , canStartJob : canStartJob )
return
}
// L o c k t h e q u e u e w h i l e c h e c k i n g t h e i n d e x a n d i n s e r t i n g t o e n s u r e w e d o n ' t r u n i n t o
// a n y m u l t i - t h r e a d i n g s h e n a n i g a n s
//
// N o t e : c u r r e n t l y r u n n i n g j o b s a r e r e m o v e d f r o m t h e q u e u e s o w e d o n ' t n e e d t o c h e c k
// t h e ' j o b s C u r r e n t l y R u n n i n g ' s e t
var didUpdateExistingJob : Bool = false
jobQueue . mutate { queue in
if let jobIndex : Array < Job > . Index = queue . firstIndex ( where : { $0 . id = = jobId } ) {
queue [ jobIndex ] = job
didUpdateExistingJob = true
}
}
// I f w e d i d n ' t u p d a t e a n e x i s t i n g j o b t h e n w e n e e d t o a d d i t t o t h e q u e u e
guard ! didUpdateExistingJob else { return }
add ( db , job : job , canStartJob : canStartJob )
}
@ discardableResult public static func insert ( _ db : Database , job : Job ? , before otherJob : Job ) -> Job ? {
switch job ? . behaviour {
case . recurringOnActive , . recurringOnLaunch , . runOnceNextLaunch :
SNLog ( " [JobRunner] Attempted to insert \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job before the current one even though it's behaviour is \( job . map { " \( $0 . behaviour ) " } ? ? " unknown " ) " )
return nil
default : break
}
// S t o r e t h e j o b i n t o t h e d a t a b a s e ( g e t t i n g a n i d f o r i t )
guard let updatedJob : Job = try ? job ? . inserted ( db ) else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return nil
}
// I n s e r t t h e j o b b e f o r e t h e c u r r e n t j o b ( r e - a d d i n g t h e c u r r e n t j o b t o
// t h e s t a r t o f t h e q u e u e i f i t ' s n o t i n t h e r e ) - t h i s w i l l m e a n t h e n e w
// j o b w i l l r u n a n d t h e n t h e o t h e r J o b w i l l r u n ( o r r u n a g a i n ) o n c e i t ' s
// d o n e
jobQueue . mutate {
guard let otherJobIndex : Int = $0 . firstIndex ( of : otherJob ) else {
$0 . insert ( contentsOf : [ updatedJob , otherJob ] , at : 0 )
return
}
$0 . insert ( updatedJob , at : otherJobIndex )
}
return updatedJob
}
public static func appDidFinishLaunching ( ) {
// N o t e : ' a p p D i d B e c o m e A c t i v e ' w i l l r u n o n f i r s t l a u n c h a n y w a y s o w e c a n
// l e a v e t h o s e j o b s o u t a n d c a n w a i t u n t i l t h e n t o s t a r t t h e J o b R u n n e r
let maybeJobsToRun : [ Job ] ? = GRDBStorage . shared . read { db in
try Job
. filter (
[
Job . Behaviour . recurringOnLaunch ,
Job . Behaviour . recurringOnLaunchBlocking ,
Job . Behaviour . recurringOnLaunchBlockingOncePerSession ,
Job . Behaviour . runOnceNextLaunch
] . contains ( Job . Columns . behaviour )
)
. order ( Job . Columns . id )
. fetchAll ( db )
}
guard let jobsToRun : [ Job ] = maybeJobsToRun else { return }
jobQueue . mutate {
// I n s e r t a n y b l o c k i n g j o b s a f t e r a n y e x i s t i n g b l o c k i n g j o b s t h e n a d d
// t h e r e m a i n i n g j o b s t o t h e e n d o f t h e q u e u e
let lastBlockingIndex = $0 . lastIndex ( where : { $0 . isBlocking } )
. defaulting ( to : $0 . startIndex . advanced ( by : - 1 ) )
. advanced ( by : 1 )
$0 . insert (
contentsOf : jobsToRun . filter { $0 . isBlocking } ,
at : lastBlockingIndex
)
$0 . append (
contentsOf : jobsToRun . filter { ! $0 . isBlocking }
)
}
}
public static func appDidBecomeActive ( ) {
let maybeJobsToRun : [ Job ] ? = GRDBStorage . shared . read { db in
try Job
. filter (
[
Job . Behaviour . recurringOnActive ,
Job . Behaviour . recurringOnActiveBlocking
] . contains ( Job . Columns . behaviour )
)
. order ( Job . Columns . id )
. fetchAll ( db )
}
guard let jobsToRun : [ Job ] = maybeJobsToRun else { return }
jobQueue . mutate {
// I n s e r t a n y b l o c k i n g j o b s a f t e r a n y e x i s t i n g b l o c k i n g j o b s t h e n a d d
// t h e r e m a i n i n g j o b s t o t h e e n d o f t h e q u e u e
let lastBlockingIndex = $0 . lastIndex ( where : { $0 . isBlocking } )
. defaulting ( to : $0 . startIndex . advanced ( by : - 1 ) )
. advanced ( by : 1 )
$0 . insert (
contentsOf : jobsToRun . filter { $0 . isBlocking } ,
at : lastBlockingIndex
)
$0 . append (
contentsOf : jobsToRun . filter { ! $0 . isBlocking }
)
}
// S t a r t t h e j o b r u n n e r i f n e e d e d
if ! isRunning . wrappedValue {
start ( )
}
}
public static func isCurrentlyRunning ( _ job : Job ? ) -> Bool {
guard let job : Job = job , let jobId : Int64 = job . id else { return false }
return jobsCurrentlyRunning . wrappedValue . contains ( jobId )
}
// MARK: - J o b R u n n i n g
public static func start ( ) {
// W e o n l y w a n t t h e J o b R u n n e r t o r u n i n t h e m a i n a p p
guard CurrentAppContext ( ) . isMainApp else { return }
guard ! isRunning . wrappedValue else { return }
// T h e J o b R u n n e r r u n s s y n c h r o n o u s l y w e n e e d t o e n s u r e t h i s d o e s n ' t s t a r t
// o n t h e m a i n t h r e a d ( i f i t i s o n t h e m a i n t h r e a d t h e n s w a p t o a d i f f e r e n t t h r e a d )
guard DispatchQueue . getSpecific ( key : queueKey ) = = queueContext else {
internalQueue . async {
start ( )
} // TODO: W a n t t o h a v e m u l t i p l e t h r e a d s f o r t h i s ( a t t a c h m e n t d o w n l o a d s h o u l d b e s e p a r a t e - d o w e e v e n u s e a t t a c h m e n t u p l o a d a n y m o r e ? ? ? )
return
}
// G e t a n y p e n d i n g j o b s
let maybeJobsToRun : [ Job ] ? = GRDBStorage . shared . read { db in
try Job // TODO: T e s t t h i s
. filterPendingJobs ( )
. fetchAll ( db )
}
// D e t e r m i n e t h e n u m b e r o f j o b s t o r u n
var jobCount : Int = 0
jobQueue . mutate { queue in
// A d d t h e j o b s t o t h e q u e u e
if let jobsToRun : [ Job ] = maybeJobsToRun {
queue . append ( contentsOf : jobsToRun )
}
jobCount = queue . count
}
// I f t h e r e a r e n o p e n d i n g j o b s t h e n s c h e d u l e t h e J o b R u n n e r t o s t a r t a g a i n
// w h e n t h e n e x t s c h e d u l e d j o b s h o u l d s t a r t
guard jobCount > 0 else {
isRunning . mutate { $0 = false }
scheduleNextSoonestJob ( )
return
}
// R u n t h e f i r s t j o b i n t h e q u e u e
SNLog ( " [JobRunner] Starting with ( \( jobCount ) job \( jobCount != 1 ? " s " : " " ) ) " )
runNextJob ( )
}
private static func runNextJob ( ) {
// E n s u r e t h i s i s r u n n i n g o n t h e c o r r e c t q u e u e
guard DispatchQueue . getSpecific ( key : queueKey ) = = queueContext else {
internalQueue . async {
runNextJob ( )
}
return
}
guard let ( nextJob , numJobsRemaining ) : ( Job , Int ) = jobQueue . mutate ( { queue in queue . popFirst ( ) . map { ( $0 , queue . count ) } } ) else {
isRunning . mutate { $0 = false }
scheduleNextSoonestJob ( )
return
}
guard let jobExecutor : JobExecutor . Type = executorMap . wrappedValue [ nextJob . variant ] else {
SNLog ( " [JobRunner] Unable to run \( nextJob . variant ) job due to missing executor " )
handleJobFailed ( nextJob , error : JobRunnerError . executorMissing , permanentFailure : true )
return
}
guard ! jobExecutor . requiresThreadId || nextJob . threadId != nil else {
SNLog ( " [JobRunner] Unable to run \( nextJob . variant ) job due to missing required threadId " )
handleJobFailed ( nextJob , error : JobRunnerError . requiredThreadIdMissing , permanentFailure : true )
return
}
guard ! jobExecutor . requiresInteractionId || nextJob . interactionId != nil else {
SNLog ( " [JobRunner] Unable to run \( nextJob . variant ) job due to missing required interactionId " )
handleJobFailed ( nextJob , error : JobRunnerError . requiredInteractionIdMissing , permanentFailure : true )
return
}
// I f t h e ' n e x t R u n T i m e s t a m p ' f o r t h e j o b i s i n t h e f u t u r e t h e n d o n ' t r u n i t y e t
guard nextJob . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 else {
handleJobDeferred ( nextJob )
return
}
// C h e c k i f t h e n e x t j o b h a s a n y d e p e n d e n c i e s
let jobDependencies : [ Job ] = GRDBStorage . shared
. read { db in try nextJob . dependencies . fetchAll ( db ) }
. defaulting ( to : [ ] )
guard jobDependencies . isEmpty else {
SNLog ( " [JobRunner] Found job with \( jobDependencies . count ) dependencies, running those first " )
let jobDependencyIds : [ Int64 ] = jobDependencies
. compactMap { $0 . id }
let jobIdsNotInQueue : Set < Int64 > = jobDependencyIds
. asSet ( )
. subtracting ( jobQueue . wrappedValue . compactMap { $0 . id } )
// I f t h e r e a r e d e p e n d e n c i e s w h i c h a r e n ' t i n t h e q u e u e w e s h o u l d j u s t a p p e n d t h e m
guard ! jobIdsNotInQueue . isEmpty else {
jobQueue . mutate { queue in
queue . append (
contentsOf : jobDependencies
. filter { jobIdsNotInQueue . contains ( $0 . id ? ? - 1 ) }
)
queue . append ( nextJob )
}
handleJobDeferred ( nextJob )
return
}
// O t h e r w i s e r e - a d d t h e c u r r e n t j o b a f t e r i t ' s d e p e n d e n c i e s
jobQueue . mutate { queue in
guard let lastDependencyIndex : Int = queue . lastIndex ( where : { jobDependencyIds . contains ( $0 . id ? ? - 1 ) } ) else {
queue . append ( nextJob )
return
}
queue . insert ( nextJob , at : lastDependencyIndex + 1 )
}
handleJobDeferred ( nextJob )
return
}
// U p d a t e t h e s t a t e t o i n d i c a t e i t ' s r u n n i n g
//
// N o t e : W e n e e d t o s t o r e ' n u m J o b s R e m a i n i n g ' i n i t ' s o w n v a r i a b l e b e c a u s e
// t h e ' S N L o g ' s e e m s t o d i s p a t c h t o i t ' s o w n q u e u e w h i c h e n d s u p g e t t i n g
// b l o c k e d b y t h e J o b R u n n e r ' s q u e u e b e c u a s e ' j o b Q u e u e ' i s A t o m i c
nextTrigger . mutate { $0 = nil }
isRunning . mutate { $0 = true }
jobsCurrentlyRunning . mutate { $0 = $0 . inserting ( nextJob . id ) }
SNLog ( " [JobRunner] Start job ( \( numJobsRemaining ) remaining) " )
jobExecutor . run (
nextJob ,
success : handleJobSucceeded ,
failure : handleJobFailed ,
deferred : handleJobDeferred
)
}
private static func scheduleNextSoonestJob ( ) {
let nextJobTimestamp : TimeInterval ? = GRDBStorage . shared
. read { db in
try TimeInterval
. fetchOne (
db ,
Job
. filterPendingJobs ( excludeFutureJobs : false )
. select ( . nextRunTimestamp )
)
}
guard let nextJobTimestamp : TimeInterval = nextJobTimestamp else { return }
// I f t h e n e x t j o b i s n ' t s c h e d u l e d i n t h e f u t u r e t h e n j u s t r e s t a r t t h e J o b R u n n e r i m m e d i a t e l y
let secondsUntilNextJob : TimeInterval = ( nextJobTimestamp - Date ( ) . timeIntervalSince1970 )
guard secondsUntilNextJob > 0 else {
SNLog ( " [JobRunner] Restarting immediately for job scheduled \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) ) ago " )
internalQueue . async {
JobRunner . start ( )
}
return
}
// S e t u p a t r i g g e r
SNLog ( " [JobRunner] Stopping until next job in \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) ) " )
nextTrigger . mutate { $0 = Trigger . create ( timestamp : nextJobTimestamp ) }
}
// MARK: - H a n d l i n g R e s u l t s
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b s u c c e e d s
private static func handleJobSucceeded ( _ job : Job , shouldStop : Bool ) {
switch job . behaviour {
case . runOnce , . runOnceNextLaunch :
GRDBStorage . shared . write { db in
// F i r s t r e m o v e a n y J o b D e p e n d e n c i e s r e q u i r i n g t h i s j o b t o b e c o m p l e t e d ( i f
// w e d o n ' t t h e n t h e d e p e n d a n t j o b s w i l l a u t o m a t i c a l l y b e d e l e t e d )
_ = try JobDependencies
. filter ( JobDependencies . Columns . dependantId = = job . id )
. deleteAll ( db )
_ = try job . delete ( db )
}
case . recurring where shouldStop = = true :
GRDBStorage . shared . write { db in
// F i r s t r e m o v e a n y J o b D e p e n d e n c i e s r e q u i r i n g t h i s j o b t o b e c o m p l e t e d ( i f
// w e d o n ' t t h e n t h e d e p e n d a n t j o b s w i l l a u t o m a t i c a l l y b e d e l e t e d )
_ = try JobDependencies
. filter ( JobDependencies . Columns . dependantId = = job . id )
. deleteAll ( db )
_ = try job . delete ( db )
}
// F o r ` r e c u r r i n g ` j o b s w h i c h h a v e a l r e a d y r u n , t h e y s h o u l d a u t o m a t i c a l l y r u n a g a i n
// b u t w e w a n t a t l e a s t 1 s e c o n d t o p a s s b e f o r e d o i n g s o - t h e j o b i t s e l f s h o u l d
// r e a l l y u p d a t e i t ' s o w n ' n e x t R u n T i m e s t a m p ' ( t h i s i s j u s t a s a f e t y n e t )
case . recurring where job . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 :
GRDBStorage . shared . write { db in
_ = try job
. with ( nextRunTimestamp : ( Date ( ) . timeIntervalSince1970 + 1 ) )
. saved ( db )
}
case . recurringOnLaunchBlockingOncePerSession :
perSessionJobsCompleted . mutate { $0 = $0 . inserting ( job . id ) }
default : break
}
// T h e j o b i s r e m o v e d f r o m t h e q u e u e b e f o r e i t r u n s s o a l l w e n e e d t o t o i s r e m o v e i t
// f r o m t h e ' c u r r e n t l y R u n n i n g ' s e t a n d s t a r t t h e n e x t o n e
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
internalQueue . async {
runNextJob ( )
}
}
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b f a i l s , i f i t ' s w a s n ' t a p e r m a n e n t f a i l u r e t h e n t h e ' f a i l u r e C o u n t ' f o r t h e j o b w i l l b e i n c r e m e n t e d a n d i t ' l l
// / b e r e - r u n a f t e r a r e t r y i n t e r v a l h a s p a s s e d
private static func handleJobFailed ( _ job : Job , error : Error ? , permanentFailure : Bool ) {
guard GRDBStorage . shared . read ( { db in try Job . exists ( db , id : job . id ? ? - 1 ) } ) = = true else {
SNLog ( " [JobRunner] \( job . variant ) job canceled " )
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
internalQueue . async {
runNextJob ( )
}
return
}
switch job . behaviour {
// I f a " b l o c k i n g " j o b f a i l e d t h e n r e r u n i t i m m e d i a t e l y
case . recurringOnLaunchBlocking , . recurringOnActiveBlocking :
SNLog ( " [JobRunner] blocking \( job . variant ) job failed; retrying immediately " )
jobQueue . mutate ( { $0 . insert ( job , at : 0 ) } )
internalQueue . async {
runNextJob ( )
}
return
// F o r " b l o c k i n g o n c e p e r s e s s i o n " j o b s o n l y r e r u n i t i m m e d i a t e l y i f i t h a s n ' t a l r e a d y
// r u n t h i s s e s s i o n
case . recurringOnLaunchBlockingOncePerSession :
guard ! perSessionJobsCompleted . wrappedValue . contains ( job . id ? ? - 1 ) else { break }
SNLog ( " [JobRunner] blocking \( job . variant ) job failed; retrying immediately " )
perSessionJobsCompleted . mutate { $0 = $0 . inserting ( job . id ) }
jobQueue . mutate ( { $0 . insert ( job , at : 0 ) } )
internalQueue . async {
runNextJob ( )
}
return
default : break
}
GRDBStorage . shared . write { db in
// G e t t h e m a x f a i l u r e c o u n t f o r t h e j o b ( a v a l u e o f ' - 1 ' m e a n s i t w i l l r e t r y i n d e f i n i t e l y )
let maxFailureCount : Int = ( executorMap . wrappedValue [ job . variant ] ? . maxFailureCount ? ? 0 )
guard
! permanentFailure &&
maxFailureCount >= 0 &&
job . failureCount + 1 < maxFailureCount
else {
// I f t h e j o b p e r m a n e n t l y f a i l e d o r w e h a v e p e r f o r m e d a l l o f o u r r e t r y a t t e m p t s
// t h e n d e l e t e t h e j o b ( i t ' l l p r o b a b l y n e v e r s u c c e e d )
_ = try job . delete ( db )
return
}
SNLog ( " [JobRunner] \( job . variant ) job failed; scheduling retry (failure count is \( job . failureCount + 1 ) ) " )
_ = try job
. with (
failureCount : ( job . failureCount + 1 ) ,
nextRunTimestamp : ( Date ( ) . timeIntervalSince1970 + getRetryInterval ( for : job ) )
)
. saved ( db )
}
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
internalQueue . async {
runNextJob ( )
}
}
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b n e i t h e r s u c c e e d s o r f a i l s ( t h i s s h o u l d o n l y o c c u r i f t h e j o b h a s s p e c i f i c l o g i c t h a t m a k e s i t d e p e n d a n t
// / o n o t h e r j o b s , a n d i t s h o u l d a u t o m a t i c a l l y m a n a g e t h o s e d e p e n d e n c i e s )
private static func handleJobDeferred ( _ job : Job ) {
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
internalQueue . async {
runNextJob ( )
}
}
// MARK: - C o n v e n i e n c e
private static func getRetryInterval ( for job : Job ) -> TimeInterval {
// A r b i t r a r y b a c k o f f f a c t o r . . .
// t r y 1 d e l a y : 0 . 5 s
// t r y 2 d e l a y : 1 s
// . . .
// t r y 5 d e l a y : 1 6 s
// . . .
// t r y 1 1 d e l a y : 5 1 2 s
let maxBackoff : Double = 10 * 60 // 1 0 m i n u t e s
return 0.25 * min ( maxBackoff , pow ( 2 , Double ( job . failureCount ) ) )
}
}