// C o p y r i g h t © 2 0 2 2 R a n g e p r o o f P t y L t d . A l l r i g h t s r e s e r v e d .
//
// s t r i n g l i n t : d i s a b l e
import Foundation
import GRDB
// MARK: - S i n g l e t o n
public extension Singleton {
static let jobRunner : SingletonConfig < JobRunnerType > = Dependencies . create (
identifier : " jobRunner " ,
createInstance : { _ in JobRunner ( ) }
)
}
// MARK: - J o b R u n n e r T y p e
public protocol JobRunnerType {
// MARK: - C o n f i g u r a t i o n
func setExecutor ( _ executor : JobExecutor . Type , for variant : Job . Variant )
func canStart ( queue : JobQueue ? ) -> Bool
func afterBlockingQueue ( callback : @ escaping ( ) -> ( ) )
// MARK: - S t a t e M a n a g e m e n t
func jobInfoFor ( jobs : [ Job ] ? , state : JobRunner . JobState , variant : Job . Variant ? ) -> [ Int64 : JobRunner . JobInfo ]
func appDidFinishLaunching ( using dependencies : Dependencies )
func appDidBecomeActive ( using dependencies : Dependencies )
func startNonBlockingQueues ( using dependencies : Dependencies )
func stopAndClearPendingJobs ( exceptForVariant : Job . Variant ? , onComplete : ( ( ) -> ( ) ) ? )
// MARK: - J o b S c h e d u l i n g
@ discardableResult func add ( _ db : Database , job : Job ? , dependantJob : Job ? , canStartJob : Bool , using dependencies : Dependencies ) -> Job ?
func upsert ( _ db : Database , job : Job ? , canStartJob : Bool , using dependencies : Dependencies )
@ discardableResult func insert ( _ db : Database , job : Job ? , before otherJob : Job ) -> ( Int64 , Job ) ?
func enqueueDependenciesIfNeeded ( _ jobs : [ Job ] , using dependencies : Dependencies )
func manuallyTriggerResult ( _ job : Job ? , result : JobRunner . JobResult , using dependencies : Dependencies )
func afterJob ( _ job : Job ? , state : JobRunner . JobState , callback : @ escaping ( JobRunner . JobResult ) -> ( ) )
func removePendingJob ( _ job : Job ? )
}
// MARK: - J o b R u n n e r T y p e C o n v e n i e n c e
public extension JobRunnerType {
func allJobInfo ( ) -> [ Int64 : JobRunner . JobInfo ] { return jobInfoFor ( jobs : nil , state : . any , variant : nil ) }
func jobInfoFor ( jobs : [ Job ] ) -> [ Int64 : JobRunner . JobInfo ] {
return jobInfoFor ( jobs : jobs , state : . any , variant : nil )
}
func jobInfoFor ( jobs : [ Job ] , state : JobRunner . JobState ) -> [ Int64 : JobRunner . JobInfo ] {
return jobInfoFor ( jobs : jobs , state : state , variant : nil )
}
func jobInfoFor ( state : JobRunner . JobState ) -> [ Int64 : JobRunner . JobInfo ] {
return jobInfoFor ( jobs : nil , state : state , variant : nil )
}
func jobInfoFor ( state : JobRunner . JobState , variant : Job . Variant ) -> [ Int64 : JobRunner . JobInfo ] {
return jobInfoFor ( jobs : nil , state : state , variant : variant )
}
func jobInfoFor ( variant : Job . Variant ) -> [ Int64 : JobRunner . JobInfo ] {
return jobInfoFor ( jobs : nil , state : . any , variant : variant )
}
func isCurrentlyRunning ( _ job : Job ? ) -> Bool {
guard let job : Job = job else { return false }
return ! jobInfoFor ( jobs : [ job ] , state : . running , variant : nil ) . isEmpty
}
func hasJob < T : Encodable > (
of variant : Job . Variant ? = nil ,
inState state : JobRunner . JobState = . any ,
with jobDetails : T
) -> Bool {
guard
let detailsData : Data = try ? JSONEncoder ( )
. with ( outputFormatting : . sortedKeys ) // N e e d e d f o r d e t e r m i n i s t i c c o m p a r i s o n
. encode ( jobDetails )
else { return false }
return jobInfoFor ( jobs : nil , state : state , variant : variant )
. values
. contains ( where : { $0 . detailsData = = detailsData } )
}
func stopAndClearPendingJobs ( exceptForVariant : Job . Variant ? = nil , onComplete : ( ( ) -> ( ) ) ? = nil ) {
stopAndClearPendingJobs ( exceptForVariant : exceptForVariant , onComplete : onComplete )
}
// MARK: - - J o b S c h e d u l i n g
@ discardableResult func add ( _ db : Database , job : Job ? , canStartJob : Bool , using dependencies : Dependencies ) -> Job ? {
return add ( db , job : job , dependantJob : nil , canStartJob : canStartJob , using : dependencies )
}
func afterJob ( _ job : Job ? , callback : @ escaping ( JobRunner . JobResult ) -> ( ) ) {
afterJob ( job , state : . any , callback : callback )
}
}
// MARK: - J o b E x e c u t o r
public protocol JobExecutor {
// / T h e m a x i m u m n u m b e r o f t i m e s t h e j o b c a n f a i l b e f o r e i t f a i l s p e r m a n e n t l y
// /
// / * * N o t e : * * A v a l u e o f ` - 1 ` m e a n s i t w i l l r e t r y i n d e f i n i t e l y
static var maxFailureCount : Int { get }
static var requiresThreadId : Bool { get }
static var requiresInteractionId : Bool { get }
// / T h i s m e t h o d c o n t a i n s t h e l o g i c n e e d e d t o c o m p l e t e a j o b
// /
// / * * N o t e : * * T h e c o d e i n t h i s m e t h o d s h o u l d r u n s y n c h r o n o u s l y a n d t h e v a r i o u s
// / " r e s u l t " b l o c k s s h o u l d n o t b e c a l l e d w i t h i n a d a t a b a s e c l o s u r e
// /
// / - P a r a m e t e r s :
// / - j o b : T h e j o b w h i c h i s b e i n g r u n
// / - s u c c e s s : T h e c l o s u r e w h i c h i s c a l l e d w h e n t h e j o b s u c c e e d s ( w i t h a n
// / u p d a t e d ` j o b ` a n d a f l a g i n d i c a t i n g w h e t h e r t h e j o b s h o u l d f o r c i b l y s t o p r u n n i n g )
// / - f a i l u r e : T h e c l o s u r e w h i c h i s c a l l e d w h e n t h e j o b f a i l s ( w i t h a n u p d a t e d
// / ` j o b ` , a n ` E r r o r ` ( i f a p p l i c a b l e ) a n d a f l a g i n d i c a t i n g w h e t h e r i t w a s a p e r m a n e n t
// / f a i l u r e )
// / - d e f e r r e d : T h e c l o s u r e w h i c h i s c a l l e d w h e n t h e j o b i s d e f e r r e d ( w i t h a n
// / u p d a t e d ` j o b ` )
static func run (
_ job : Job ,
queue : DispatchQueue ,
success : @ escaping ( Job , Bool , Dependencies ) -> ( ) ,
failure : @ escaping ( Job , Error ? , Bool , Dependencies ) -> ( ) ,
deferred : @ escaping ( Job , Dependencies ) -> ( ) ,
using dependencies : Dependencies
)
}
// MARK: - J o b R u n n e r
public final class JobRunner : JobRunnerType {
public struct JobState : OptionSet , Hashable {
public let rawValue : UInt8
public init ( rawValue : UInt8 ) {
self . rawValue = rawValue
}
public static let pending : JobState = JobState ( rawValue : 1 << 0 )
public static let running : JobState = JobState ( rawValue : 1 << 1 )
public static let any : JobState = [ . pending , . running ]
}
public enum JobResult : Equatable {
case succeeded
case failed ( Error ? , Bool )
case deferred
case notFound
public static func = = ( lhs : JobRunner . JobResult , rhs : JobRunner . JobResult ) -> Bool {
switch ( lhs , rhs ) {
case ( . succeeded , . succeeded ) : return true
case ( . failed ( let lhsError , let lhsPermanent ) , . failed ( let rhsError , let rhsPermanent ) ) :
return (
// N o t a p e r f e c t s o l u t i o n b u t s h o u l d b e g o o d e n o u g h
lhsError ? . localizedDescription = = rhsError ? . localizedDescription &&
lhsPermanent = = rhsPermanent
)
case ( . deferred , . deferred ) : return true
default : return false
}
}
}
public struct JobInfo : Equatable , CustomDebugStringConvertible {
public let variant : Job . Variant
public let threadId : String ?
public let interactionId : Int64 ?
public let detailsData : Data ?
public let uniqueHashValue : Int ?
public var debugDescription : String {
let dataDescription : String = detailsData
. map { data in " Data(hex: \( data . toHexString ( ) ) , \( data . bytes . count ) bytes " }
. defaulting ( to : " nil " )
return [
" JobRunner.JobInfo( " ,
" variant: \( variant ) , " ,
" threadId: \( threadId ? ? " nil " ) , " ,
" interactionId: \( interactionId . map { " \( $0 ) " } ? ? " nil " ) , " ,
" detailsData: \( dataDescription ) , " ,
" uniqueHashValue: \( uniqueHashValue . map { " \( $0 ) " } ? ? " nil " ) " ,
" ) "
] . joined ( )
}
}
private enum Validation {
case enqueueOnly
case persist
}
// MARK: - V a r i a b l e s
private let allowToExecuteJobs : Bool
private let blockingQueue : Atomic < JobQueue ? >
private let queues : Atomic < [ Job . Variant : JobQueue ] >
private var blockingQueueDrainCallback : Atomic < [ ( ) -> ( ) ] > = Atomic ( [ ] )
internal var appReadyToStartQueues : Atomic < Bool > = Atomic ( false )
internal var appHasBecomeActive : Atomic < Bool > = Atomic ( false )
internal var perSessionJobsCompleted : Atomic < Set < Int64 > > = Atomic ( [ ] )
internal var hasCompletedInitialBecomeActive : Atomic < Bool > = Atomic ( false )
internal var shutdownBackgroundTask : Atomic < OWSBackgroundTask ? > = Atomic ( nil )
private var canStartNonBlockingQueue : Bool {
blockingQueue . wrappedValue ? . hasStartedAtLeastOnce . wrappedValue = = true &&
blockingQueue . wrappedValue ? . isRunning . wrappedValue != true &&
appHasBecomeActive . wrappedValue
}
// MARK: - I n i t i a l i z a t i o n
init (
isTestingJobRunner : Bool = false ,
variantsToExclude : [ Job . Variant ] = [ ] ,
using dependencies : Dependencies = Dependencies ( )
) {
var jobVariants : Set < Job . Variant > = Job . Variant . allCases
. filter { ! variantsToExclude . contains ( $0 ) }
. asSet ( )
self . allowToExecuteJobs = (
isTestingJobRunner || (
HasAppContext ( ) &&
CurrentAppContext ( ) . isMainApp &&
! SNUtilitiesKit . isRunningTests
)
)
self . blockingQueue = Atomic (
JobQueue (
type : . blocking ,
executionType : . serial ,
qos : . default ,
isTestingJobRunner : isTestingJobRunner ,
jobVariants : [ ]
)
)
self . queues = Atomic ( [
// MARK: - - M e s s a g e S e n d Q u e u e
JobQueue (
type : . messageSend ,
executionType : . concurrent , // A l l o w a s m a n y j o b s t o r u n a t o n c e a s s u p p o r t e d b y t h e d e v i c e
qos : . default ,
isTestingJobRunner : isTestingJobRunner ,
jobVariants : [
jobVariants . remove ( . attachmentUpload ) ,
jobVariants . remove ( . messageSend ) ,
jobVariants . remove ( . notifyPushServer ) ,
jobVariants . remove ( . sendReadReceipts ) ,
jobVariants . remove ( . groupLeaving ) ,
jobVariants . remove ( . configurationSync ) ,
jobVariants . remove ( . groupInviteMember ) ,
jobVariants . remove ( . groupPromoteMember ) ,
jobVariants . remove ( . processPendingGroupMemberRemovals )
] . compactMap { $0 }
) ,
// MARK: - - M e s s a g e R e c e i v e Q u e u e
JobQueue (
type : . messageReceive ,
// E x p l i c i t l y s e r i a l a s e x e c u t i n g c o n c u r r e n t l y m e a n s m e s s a g e r e c e i v e s g e t t i n g p r o c e s s e d a t
// d i f f e r e n t s p e e d s w h i c h c a n r e s u l t i n :
// • S m a l l b a t c h e s o f m e s s a g e s a p p e a r i n g i n t h e U I b e f o r e l a r g e r b a t c h e s
// • C l o s e d g r o u p m e s s a g e s e n c r y p t e d w i t h u p d a t e d k e y s c o u l d s t a r t p a r s i n g b e f o r e i t ' s k e y
// u p d a t e m e s s a g e h a s b e e n p r o c e s s e d ( i e . g u a r a n t e e d t o f a i l )
executionType : . serial ,
qos : . default ,
isTestingJobRunner : isTestingJobRunner ,
jobVariants : [
jobVariants . remove ( . messageReceive ) ,
jobVariants . remove ( . configMessageReceive )
] . compactMap { $0 }
) ,
// MARK: - - A t t a c h m e n t D o w n l o a d Q u e u e
JobQueue (
type : . attachmentDownload ,
executionType : . serial ,
qos : . utility ,
isTestingJobRunner : isTestingJobRunner ,
jobVariants : [
jobVariants . remove ( . attachmentDownload )
] . compactMap { $0 }
) ,
// MARK: - - E x p i r a t i o n U p d a t e Q u e u e
JobQueue (
type : . expirationUpdate ,
executionType : . concurrent , // A l l o w a s m a n y j o b s t o r u n a t o n c e a s s u p p o r t e d b y t h e d e v i c e
qos : . default ,
isTestingJobRunner : isTestingJobRunner ,
jobVariants : [
jobVariants . remove ( . expirationUpdate ) ,
jobVariants . remove ( . getExpiration )
] . compactMap { $0 }
) ,
// MARK: - - D i s p l a y P i c t u r e D o w n l o a d Q u e u e
JobQueue (
type : . displayPictureDownload ,
executionType : . serial ,
qos : . utility ,
isTestingJobRunner : isTestingJobRunner ,
jobVariants : [
jobVariants . remove ( . displayPictureDownload )
] . compactMap { $0 }
) ,
// MARK: - - G e n e r a l Q u e u e
JobQueue (
type : . general ( number : 0 ) ,
executionType : . serial ,
qos : . utility ,
isTestingJobRunner : isTestingJobRunner ,
jobVariants : Array ( jobVariants )
)
] . reduce ( into : [ : ] ) { prev , next in
next . jobVariants . forEach { variant in
prev [ variant ] = next
}
} )
// N o w t h a t w e ' v e f i n i s h e d s e t t i n g u p t h e J o b R u n n e r , u p d a t e t h e q u e u e c l o s u r e s
self . blockingQueue . mutate {
$0 ? . canStart = { [ weak self ] queue -> Bool in ( self ? . canStart ( queue : queue ) = = true ) }
$0 ? . onQueueDrained = { [ weak self ] in
// O n c e a l l b l o c k i n g j o b s h a v e b e e n c o m p l e t e d w e w a n t t o s t a r t r u n n i n g
// t h e r e m a i n i n g j o b q u e u e s
self ? . startNonBlockingQueues ( using : dependencies )
self ? . blockingQueueDrainCallback . mutate {
$0 . forEach { $0 ( ) }
$0 = [ ]
}
}
}
self . queues . mutate {
$0 . values . forEach { queue in
queue . canStart = { [ weak self ] targetQueue -> Bool in ( self ? . canStart ( queue : targetQueue ) = = true ) }
}
}
}
// MARK: - C o n f i g u r a t i o n
public func setExecutor ( _ executor : JobExecutor . Type , for variant : Job . Variant ) {
blockingQueue . wrappedValue ? . setExecutor ( executor , for : variant ) // T h e b l o c k i n g q u e u e c a n r u n a n y j o b
queues . wrappedValue [ variant ] ? . setExecutor ( executor , for : variant )
}
public func canStart ( queue : JobQueue ? ) -> Bool {
return (
allowToExecuteJobs &&
appReadyToStartQueues . wrappedValue && (
queue ? . type = = . blocking ||
canStartNonBlockingQueue
)
)
}
public func afterBlockingQueue ( callback : @ escaping ( ) -> ( ) ) {
guard
( blockingQueue . wrappedValue ? . hasStartedAtLeastOnce . wrappedValue != true ) ||
( blockingQueue . wrappedValue ? . isRunning . wrappedValue = = true )
else { return callback ( ) }
blockingQueueDrainCallback . mutate { $0 . append ( callback ) }
}
// MARK: - S t a t e M a n a g e m e n t
public func jobInfoFor (
jobs : [ Job ] ? ,
state : JobRunner . JobState ,
variant : Job . Variant ?
) -> [ Int64 : JobRunner . JobInfo ] {
var result : [ ( Int64 , JobRunner . JobInfo ) ] = [ ]
let targetKeys : [ JobQueue . JobKey ] = ( jobs ? . compactMap { JobQueue . JobKey ( $0 ) } ? ? [ ] )
let targetVariants : [ Job . Variant ] = ( variant . map { [ $0 ] } ? ? jobs ? . map { $0 . variant } )
. defaulting ( to : [ ] )
// I n s e r t t h e s t a t e o f a n y p e n d i n g j o b s
if state . contains ( . pending ) {
func infoFor ( queue : JobQueue ? , variants : [ Job . Variant ] ) -> [ ( Int64 , JobRunner . JobInfo ) ] {
return ( queue ? . pendingJobsQueue . wrappedValue
. filter { variants . isEmpty || variants . contains ( $0 . variant ) }
. compactMap { job -> ( Int64 , JobRunner . JobInfo ) ? in
guard let jobKey : JobQueue . JobKey = JobQueue . JobKey ( job ) else { return nil }
guard
targetKeys . isEmpty ||
targetKeys . contains ( jobKey )
else { return nil }
return (
jobKey . id ,
JobRunner . JobInfo (
variant : job . variant ,
threadId : job . threadId ,
interactionId : job . interactionId ,
detailsData : job . details ,
uniqueHashValue : job . uniqueHashValue
)
)
} )
. defaulting ( to : [ ] )
}
result . append ( contentsOf : infoFor ( queue : blockingQueue . wrappedValue , variants : targetVariants ) )
queues . wrappedValue
. filter { key , _ -> Bool in targetVariants . isEmpty || targetVariants . contains ( key ) }
. map { _ , queue in queue }
. asSet ( )
. forEach { queue in result . append ( contentsOf : infoFor ( queue : queue , variants : targetVariants ) ) }
}
// I n s e r t t h e s t a t e o f a n y r u n n i n g j o b s
if state . contains ( . running ) {
func infoFor ( queue : JobQueue ? , variants : [ Job . Variant ] ) -> [ ( Int64 , JobRunner . JobInfo ) ] {
return ( queue ? . infoForAllCurrentlyRunningJobs ( )
. filter { variants . isEmpty || variants . contains ( $0 . value . variant ) }
. compactMap { jobId , info -> ( Int64 , JobRunner . JobInfo ) ? in
guard
targetKeys . isEmpty ||
targetKeys . contains ( JobQueue . JobKey ( id : jobId , variant : info . variant ) )
else { return nil }
return ( jobId , info )
} )
. defaulting ( to : [ ] )
}
result . append ( contentsOf : infoFor ( queue : blockingQueue . wrappedValue , variants : targetVariants ) )
queues . wrappedValue
. filter { key , _ -> Bool in targetVariants . isEmpty || targetVariants . contains ( key ) }
. map { _ , queue in queue }
. asSet ( )
. forEach { queue in result . append ( contentsOf : infoFor ( queue : queue , variants : targetVariants ) ) }
}
return result
. reduce ( into : [ : ] ) { result , next in
result [ next . 0 ] = next . 1
}
}
public func appDidFinishLaunching ( using dependencies : Dependencies ) {
// C l e a r a n y ' m a n u a l R e s u l t J o b ' e n t r i e s i n t h e d a t a b a s e ( d o n ' t w a n t t h e m b u i l d u p d u e t o c r a s h e s
// o r u n h a n d l e d c o d e p a t h s )
dependencies [ singleton : . storage ] . writeAsync { db in
try Job . filter ( Job . Columns . variant = = Job . Variant . manualResultJob ) . deleteAll ( db )
}
// F l a g t h a t t h e J o b R u n n e r c a n s t a r t i t ' s q u e u e s
appReadyToStartQueues . mutate { $0 = true }
// N o t e : ' a p p D i d B e c o m e A c t i v e ' w i l l r u n o n f i r s t l a u n c h a n y w a y s o w e c a n
// l e a v e t h o s e j o b s o u t a n d c a n w a i t u n t i l t h e n t o s t a r t t h e J o b R u n n e r
let jobsToRun : ( blocking : [ Job ] , nonBlocking : [ Job ] ) = dependencies [ singleton : . storage ]
. read { db in
let blockingJobs : [ Job ] = try Job
. filter (
[
Job . Behaviour . recurringOnLaunch ,
Job . Behaviour . runOnceNextLaunch
] . contains ( Job . Columns . behaviour )
)
. filter ( Job . Columns . shouldBlock = = true )
. order (
Job . Columns . priority . desc ,
Job . Columns . id
)
. fetchAll ( db )
let nonblockingJobs : [ Job ] = try Job
. filter (
[
Job . Behaviour . recurringOnLaunch ,
Job . Behaviour . runOnceNextLaunch
] . contains ( Job . Columns . behaviour )
)
. filter ( Job . Columns . shouldBlock = = false )
. order (
Job . Columns . priority . desc ,
Job . Columns . id
)
. fetchAll ( db )
return ( blockingJobs , nonblockingJobs )
}
. defaulting ( to : ( [ ] , [ ] ) )
// A d d a n d s t a r t a n y b l o c k i n g j o b s
blockingQueue . wrappedValue ? . appDidFinishLaunching (
with : jobsToRun . blocking ,
canStart : true ,
using : dependencies
)
// A d d a n y n o n - b l o c k i n g j o b s ( w e d o n ' t s t a r t t h e s e i n c a s e t h e r e a r e b l o c k i n g " o n a c t i v e "
// j o b s a s w e l l )
let jobsByVariant : [ Job . Variant : [ Job ] ] = jobsToRun . nonBlocking . grouped ( by : \ . variant )
let jobQueues : [ Job . Variant : JobQueue ] = queues . wrappedValue
jobsByVariant . forEach { variant , jobs in
jobQueues [ variant ] ? . appDidFinishLaunching (
with : jobs ,
canStart : false ,
using : dependencies
)
}
}
public func appDidBecomeActive ( using dependencies : Dependencies ) {
// F l a g t h a t t h e J o b R u n n e r c a n s t a r t i t ' s q u e u e s a n d s t a r t q u e u e i n g n o n - l a u n c h j o b s
appReadyToStartQueues . mutate { $0 = true }
appHasBecomeActive . mutate { $0 = true }
// I f w e h a v e a r u n n i n g " s u t d o w n B a c k g r o u n d T a s k " t h e n w e w a n t t o c a n c e l i t a s o t h e r w i s e i t
// c a n r e s u l t i n t h e d a t a b a s e b e i n g s u s p e n d e d a n d u s b e i n g u n a b l e t o i n t e r a c t w i t h i t a t a l l
shutdownBackgroundTask . mutate {
$0 ? . cancel ( )
$0 = nil
}
// R e t r i e v e a n y j o b s w h i c h s h o u l d r u n w h e n b e c o m i n g a c t i v e
let hasCompletedInitialBecomeActive : Bool = self . hasCompletedInitialBecomeActive . wrappedValue
let jobsToRun : [ Job ] = dependencies [ singleton : . storage ]
. read { db in
return try Job
. filter ( Job . Columns . behaviour = = Job . Behaviour . recurringOnActive )
. order (
Job . Columns . priority . desc ,
Job . Columns . id
)
. fetchAll ( db )
}
. defaulting ( to : [ ] )
. filter { hasCompletedInitialBecomeActive || ! $0 . shouldSkipLaunchBecomeActive }
// S t o r e t h e c u r r e n t q u e u e s t a t e l o c a l l y t o a v o i d m u l t i p l e a t o m i c r e t r i e v a l s
let jobQueues : [ Job . Variant : JobQueue ] = queues . wrappedValue
let blockingQueueIsRunning : Bool = ( blockingQueue . wrappedValue ? . isRunning . wrappedValue = = true )
guard ! jobsToRun . isEmpty else {
if ! blockingQueueIsRunning {
jobQueues . map { _ , queue in queue } . asSet ( ) . forEach { $0 . start ( using : dependencies ) }
}
return
}
// A d d a n d s t a r t a n y n o n - b l o c k i n g j o b s ( i f t h e r e a r e n o b l o c k i n g j o b s )
//
// W e o n l y w a n t t o t r i g g e r t h e q u e u e t o s t a r t o n c e s o w e n e e d t o c o n s o l i d a t e t h e
// q u e u e s t o l i s t o f j o b s ( a s q u e u e s c a n h a n d l e m u l t i p l e j o b v a r i a n t s ) , t h i s m e a n s
// t h a t ' o n A c t i v e ' j o b s w i l l b e q u e u e d b e f o r e a n y s t a n d a r d j o b s
let jobsByVariant : [ Job . Variant : [ Job ] ] = jobsToRun . grouped ( by : \ . variant )
jobQueues
. reduce ( into : [ : ] ) { result , variantAndQueue in
result [ variantAndQueue . value ] = ( result [ variantAndQueue . value ] ? ? [ ] )
. appending ( contentsOf : ( jobsByVariant [ variantAndQueue . key ] ? ? [ ] ) )
}
. forEach { queue , jobs in
queue . appDidBecomeActive (
with : jobs ,
canStart : ! blockingQueueIsRunning ,
using : dependencies
)
}
self . hasCompletedInitialBecomeActive . mutate { $0 = true }
}
public func startNonBlockingQueues ( using dependencies : Dependencies ) {
queues . wrappedValue . map { _ , queue in queue } . asSet ( ) . forEach { queue in
queue . start ( using : dependencies )
}
}
public func stopAndClearPendingJobs (
exceptForVariant : Job . Variant ? ,
onComplete : ( ( ) -> ( ) ) ?
) {
// I n f o r m t h e J o b R u n n e r t h a t i t c a n ' t s t a r t a n y q u e u e s ( t h i s i s t o p r e v e n t q u e u e s f r o m
// r e s c h e d u l i n g t h e m s e l v e s w h i l e i n t h e b a c k g r o u n d , w h e n t h e a p p r e s t a r t s o r b e c o m e s a c t i v e
// t h e J o b R u n e n r w i l l u p d a t e t h i s f l a g )
appReadyToStartQueues . mutate { $0 = false }
appHasBecomeActive . mutate { $0 = false }
// S t o p a l l q u e u e s e x c e p t f o r t h e o n e c o n t a i n i n g t h e ` e x c e p t F o r V a r i a n t `
queues . wrappedValue
. map { _ , queue in queue }
. asSet ( )
. filter { queue -> Bool in
guard let exceptForVariant : Job . Variant = exceptForVariant else { return true }
return ! queue . jobVariants . contains ( exceptForVariant )
}
. forEach { $0 . stopAndClearPendingJobs ( ) }
// E n s u r e t h e q u e u e i s a c t u a l l y r u n n i n g ( i f n o t t h e t r i g g e r t h e c a l l b a c k i m m e d i a t e l y )
guard
let exceptForVariant : Job . Variant = exceptForVariant ,
let queue : JobQueue = queues . wrappedValue [ exceptForVariant ] ,
queue . isRunning . wrappedValue = = true
else {
onComplete ? ( )
return
}
let oldQueueDrained : ( ( ) -> ( ) ) ? = queue . onQueueDrained
// C r e a t e a b a c k g r o u n d T a s k t o g i v e t h e q u e u e t h e c h a n c e t o p r o p e r l y b e d r a i n e d
shutdownBackgroundTask . mutate {
$0 = OWSBackgroundTask ( labelStr : #function ) { [ weak queue ] state in
// I f t h e b a c k g r o u n d t a s k d i d n ' t s u c c e e d t h e n t r i g g e r t h e o n C o m p l e t e ( a n d h o p e w e h a v e
// e n o u g h t i m e t o c o m p l e t e i t ' s l o g i c )
guard state != . cancelled else {
queue ? . onQueueDrained = oldQueueDrained
return
}
guard state != . success else { return }
onComplete ? ( )
queue ? . onQueueDrained = oldQueueDrained
queue ? . stopAndClearPendingJobs ( )
}
}
// A d d a c a l l b a c k t o b e t r i g g e r e d o n c e t h e q u e u e i s d r a i n e d
queue . onQueueDrained = { [ weak self , weak queue ] in
oldQueueDrained ? ( )
queue ? . onQueueDrained = oldQueueDrained
onComplete ? ( )
self ? . shutdownBackgroundTask . mutate { $0 = nil }
}
}
// MARK: - E x e c u t i o n
@ discardableResult public func add (
_ db : Database ,
job : Job ? ,
dependantJob : Job ? ,
canStartJob : Bool ,
using dependencies : Dependencies
) -> Job ? {
guard let updatedJob : Job = validatedJob ( db , job : job , validation : . persist ) else { return nil }
// I f w e a r e a d d i n g a j o b t h a t ' s d e p e n d a n t o n a n o t h e r j o b t h e n c r e a t e t h e d e p e n d e n c y b e t w e e n t h e m
if let jobId : Int64 = updatedJob . id , let dependantJobId : Int64 = dependantJob ? . id {
try ? JobDependencies (
jobId : jobId ,
dependantId : dependantJobId
)
. insert ( db )
}
// D o n ' t a d d t o t h e q u e u e i f t h e J o b R u n n e r i s n ' t r e a d y ( i t ' s b e e n s a v e d t o t h e d b s o i t ' l l b e l o a d e d
// o n c e t h e q u e u e a c t u a l l y g e t s t a r t e d l a t e r )
guard canAddToQueue ( updatedJob ) else { return updatedJob }
let jobQueue : JobQueue ? = queues . wrappedValue [ updatedJob . variant ]
jobQueue ? . add ( db , job : updatedJob , canStartJob : canStartJob , using : dependencies )
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return updatedJob }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionNestedOnce ( dedupeId : " JobRunner-Start: \( jobQueue ? . queueContext ? ? " N/A " ) " ) { _ in
jobQueue ? . start ( using : dependencies )
}
return updatedJob
}
public func upsert (
_ db : Database ,
job : Job ? ,
canStartJob : Bool ,
using dependencies : Dependencies
) {
guard let job : Job = job else { return } // I g n o r e n u l l j o b s
guard job . id != nil else {
add ( db , job : job , canStartJob : canStartJob , using : dependencies )
return
}
guard let updatedJob : Job = validatedJob ( db , job : job , validation : . enqueueOnly ) else { return }
// D o n ' t a d d t o t h e q u e u e i f t h e J o b R u n n e r i s n ' t r e a d y ( i t ' s b e e n s a v e d t o t h e d b s o i t ' l l b e l o a d e d
// o n c e t h e q u e u e a c t u a l l y g e t s t a r t e d l a t e r )
guard canAddToQueue ( updatedJob ) else { return }
let jobQueue : JobQueue ? = queues . wrappedValue [ updatedJob . variant ]
jobQueue ? . upsert ( db , job : updatedJob , canStartJob : canStartJob , using : dependencies )
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionNestedOnce ( dedupeId : " JobRunner-Start: \( jobQueue ? . queueContext ? ? " N/A " ) " ) { _ in
jobQueue ? . start ( using : dependencies )
}
}
@ discardableResult public func insert (
_ db : Database ,
job : Job ? ,
before otherJob : Job
) -> ( Int64 , Job ) ? {
switch job ? . behaviour {
case . recurringOnActive , . recurringOnLaunch , . runOnceNextLaunch :
SNLog ( " [JobRunner] Attempted to insert \( job ? . variant ) job before the current one even though it's behaviour is \( job ? . behaviour ) " )
return nil
default : break
}
guard
let updatedJob : Job = validatedJob ( db , job : job , validation : . persist ) ,
let jobId : Int64 = updatedJob . id
else { return nil }
queues . wrappedValue [ updatedJob . variant ] ? . insert ( updatedJob , before : otherJob )
return ( jobId , updatedJob )
}
// / J o b d e p e n d e n c i e s c a n b e q u i t e m e s s y a s t h e y m i g h t a l r e a d y b e r u n n i n g o r s c h e d u l e d o n d i f f e r e n t q u e u e s f r o m t h e r e l a t e d j o b , t h i s c o u l d r e s u l t
// / i n s o m e o d d i n t e r - d e p e n d e n c i e s b e t w e e n t h e J o b Q u e u e s . I n s t e a d o f t h i s w e w a n t a l l j o b s t o r u n o n t h e i r o r i g i n a l a s s i g n e d q u e u e s ( s o t h e
// / c o n c u r r e n c y r u l e s r e m a i n c o n s i s t e n t a n d e a s y t o r e a s o n w i t h ) , t h e o n l y d o w n s i d e t o t h i s a p p r o a c h i s s e r i a l q u e u e s c o u l d p o t e n t i a l l y b e b l o c k e d
// / w a i t i n g o n u n r e l a t e d d e p e n d e n c i e s t o b e r u n a s t h i s m e t h o d w i l l i n s e r t j o b s a t t h e s t a r t o f t h e ` p e n d i n g J o b s Q u e u e `
public func enqueueDependenciesIfNeeded ( _ jobs : [ Job ] , using dependencies : Dependencies ) {
// / D o n o t h i n g i f w e w e r e n ' t g i v e n a n y j o b s
guard ! jobs . isEmpty else { return }
// / I g n o r e a n y d e p e n d e n c i e s w h i c h a r e a l r e a d y r u n n i n g o r s c h e d u l e d
let dependencyJobQueues : Set < JobQueue > = jobs
. compactMap { queues . wrappedValue [ $0 . variant ] }
. asSet ( )
let allCurrentlyRunningJobIds : [ Int64 ] = dependencyJobQueues
. flatMap { $0 . currentlyRunningJobIds . wrappedValue }
let jobsToEnqueue : [ JobQueue : [ Job ] ] = jobs
. compactMap { job in job . id . map { ( $0 , job ) } }
. filter { jobId , _ in ! allCurrentlyRunningJobIds . contains ( jobId ) }
. compactMap { _ , job in queues . wrappedValue [ job . variant ] . map { ( job , $0 ) } }
. grouped ( by : { _ , queue in queue } )
. mapValues { data in data . map { job , _ in job } }
// / R e g a r d l e s s o f w h e t h e r t h e j o b s a r e d e p e n d a n t j o b s o r d e p e n d e n c i e s w e w a n t t h e m t o b e m o v e d t o t h e s t a r t o f t h e
// / ` p e n d i n g J o b s Q u e u e ` b e c a u s e a t l e a s t o n e j o b i n t h e j o b c h a i n h a s b e e n t r i g g e r e d s o w e w a n t t o t r y t o c o m p l e t e
// / t h e e n t i r e j o b c h a i n r a t h e r t h a n w o r r y a b o u t d e a d l o c k s b e t w e e n d i f f e r e n t j o b c h a i n s
// /
// / * * N o t e : * * I f a n y o f t h e s e ` d e p e n d a n t J o b s ` h a v e o t h e r d e p e n d e n c i e s t h e n w h e n t h e y a t t e m p t t o s t a r t t h e y w i l l b e
// / r e m o v e d f r o m t h e q u e u e , r e p l a c e d b y t h e i r d e p e n d e n c i e s
jobsToEnqueue . forEach { queue , jobs in
queue . pendingJobsQueue . mutate { pendingJobs in
pendingJobs = pendingJobs
. filter { ! jobs . contains ( $0 ) }
. inserting ( contentsOf : jobs , at : 0 )
}
// S t a r t t h e j o b q u e u e i f n e e d e d ( m i g h t b e a d i f f e r e n t q u e u e f r o m t h e c u r r e n t l y e x e c u t i n g o n e )
queue . start ( using : dependencies )
}
}
public func manuallyTriggerResult ( _ job : Job ? , result : JobRunner . JobResult , using dependencies : Dependencies ) {
guard let job : Job = job , let queue : JobQueue = queues . wrappedValue [ job . variant ] else { return }
switch result {
case . notFound : return
case . succeeded : queue . handleJobSucceeded ( job , shouldStop : false , using : dependencies )
case . deferred : queue . handleJobDeferred ( job , using : dependencies )
case . failed ( let error , let permanent ) :
queue . handleJobFailed ( job , error : error , permanentFailure : permanent , using : dependencies )
}
}
public func afterJob ( _ job : Job ? , state : JobRunner . JobState , callback : @ escaping ( JobResult ) -> ( ) ) {
guard let job : Job = job , let jobId : Int64 = job . id , let queue : JobQueue = queues . wrappedValue [ job . variant ] else {
callback ( . notFound )
return
}
queue . afterJob ( jobId , state : state , callback : callback )
}
public func removePendingJob ( _ job : Job ? ) {
guard let job : Job = job , let jobId : Int64 = job . id else { return }
queues . wrappedValue [ job . variant ] ? . removePendingJob ( jobId )
}
// MARK: - C o n v e n i e n c e
fileprivate static func getRetryInterval ( for job : Job ) -> TimeInterval {
// A r b i t r a r y b a c k o f f f a c t o r . . .
// t r y 1 d e l a y : 0 . 5 s
// t r y 2 d e l a y : 1 s
// . . .
// t r y 5 d e l a y : 1 6 s
// . . .
// t r y 1 1 d e l a y : 5 1 2 s
let maxBackoff : Double = 10 * 60 // 1 0 m i n u t e s
return 0.25 * min ( maxBackoff , pow ( 2 , Double ( job . failureCount ) ) )
}
fileprivate func canAddToQueue ( _ job : Job ) -> Bool {
// W e c a n o n l y s t a r t t h e j o b i f i t ' s a n " o n l a u n c h " j o b o r t h e a p p h a s b e c o m e a c t i v e
return (
job . behaviour = = . runOnceNextLaunch ||
job . behaviour = = . recurringOnLaunch ||
appHasBecomeActive . wrappedValue
)
}
private func validatedJob ( _ db : Database , job : Job ? , validation : Validation ) -> Job ? {
guard let job : Job = job else { return nil }
switch ( validation , job . uniqueHashValue ) {
case ( . enqueueOnly , . none ) : return job
case ( . enqueueOnly , . some ( let uniqueHashValue ) ) :
// N o t h i n g c u r r e n t l y r u n n i n g o r s i t t i n g i n a J o b Q u e u e
guard ! allJobInfo ( ) . contains ( where : { _ , info -> Bool in info . uniqueHashValue = = uniqueHashValue } ) else {
SNLog ( " [JobRunner] Unable to add \( job . variant ) job due to unique constraint " )
return nil
}
return job
case ( . persist , . some ( let uniqueHashValue ) ) :
guard
// N o t h i n g c u r r e n t l y r u n n i n g o r s i t t i n g i n a J o b Q u e u e
! allJobInfo ( ) . contains ( where : { _ , info -> Bool in info . uniqueHashValue = = uniqueHashValue } ) &&
// N o t h i n g i n t h e d a t a b a s e
! Job . filter ( Job . Columns . uniqueHashValue = = uniqueHashValue ) . isNotEmpty ( db )
else {
SNLog ( " [JobRunner] Unable to add \( job . variant ) job due to unique constraint " )
return nil
}
fallthrough // V a l i d a t i o n p a s s e d s o t r y t o p e r s i s t t h e j o b
case ( . persist , . none ) :
guard let updatedJob : Job = try ? job . inserted ( db ) , updatedJob . id != nil else {
SNLog ( " [JobRunner] Unable to add \( job . variant ) job \( job . id = = nil ? " due to missing id " : " " ) " )
return nil
}
return updatedJob
}
}
}
// MARK: - J o b Q u e u e
public final class JobQueue : Hashable {
fileprivate enum QueueType : Hashable {
case blocking
case general ( number : Int )
case messageSend
case messageReceive
case attachmentDownload
case displayPictureDownload
case expirationUpdate
var name : String {
switch self {
case . blocking : return " Blocking "
case . general ( let number ) : return " General- \( number ) "
case . messageSend : return " MessageSend "
case . messageReceive : return " MessageReceive "
case . attachmentDownload : return " AttachmentDownload "
case . displayPictureDownload : return " DisplayPictureDownload "
case . expirationUpdate : return " ExpirationUpdate "
}
}
}
fileprivate enum ExecutionType {
// / A s e r i a l q u e u e w i l l e x e c u t e o n e j o b a t a t i m e u n t i l t h e q u e u e i s e m p t y , t h e n w i l l l o a d a n y n e w / d e f e r r e d
// / j o b s a n d r u n t h o s e o n e a t a t i m e
case serial
// / A c o n c u r r e n t q u e u e w i l l e x e c u t e a s m a n y j o b s a s t h e d e v i c e s u p p o r t s a t o n c e u n t i l t h e q u e u e i s e m p t y ,
// / t h e n w i l l l o a d a n y n e w / d e f e r r e d j o b s a n d t r y t o s t a r t t h e m a l l
case concurrent
}
private class Trigger {
private var timer : Timer ?
fileprivate var fireTimestamp : TimeInterval = 0
static func create (
queue : JobQueue ,
timestamp : TimeInterval ,
using dependencies : Dependencies
) -> Trigger ? {
// / S e t u p t h e t r i g g e r ( w a i t a t l e a s t 1 s e c o n d b e f o r e t r i g g e r i n g )
// /
// / * * N o t e : * * W e u s e t h e ` T i m e r . s c h e d u l e d T i m e r O n M a i n T h r e a d ` m e t h o d b e c a u s e r u n n i n g a t i m e r
// / o n o u r r a n d o m q u e u e t h r e a d s r e s u l t s i n t h e t i m e r n e v e r f i r i n g , t h e ` s t a r t ` m e t h o d w i l l r e d i r e c t i t s e l f t o
// / t h e c o r r e c t t h r e a d
let trigger : Trigger = Trigger ( )
trigger . fireTimestamp = max ( 1 , ( timestamp - dependencies . dateNow . timeIntervalSince1970 ) )
trigger . timer = Timer . scheduledTimerOnMainThread (
withTimeInterval : trigger . fireTimestamp ,
repeats : false ,
using : dependencies ,
block : { [ weak queue ] _ in
queue ? . start ( using : dependencies )
}
)
return trigger
}
func invalidate ( ) {
// N e e d t o d o t h i s t o p r e v e n t a s t r o n g r e f e r e n c e c y c l e
timer ? . invalidate ( )
timer = nil
}
}
fileprivate struct JobKey : Equatable , Hashable {
fileprivate let id : Int64
fileprivate let variant : Job . Variant
fileprivate init ( id : Int64 , variant : Job . Variant ) {
self . id = id
self . variant = variant
}
fileprivate init ? ( _ job : Job ? ) {
guard let id : Int64 = job ? . id , let variant : Job . Variant = job ? . variant else { return nil }
self . id = id
self . variant = variant
}
}
private static let deferralLoopThreshold : Int = 3
private let id : UUID = UUID ( )
fileprivate let type : QueueType
private let executionType : ExecutionType
private let qosClass : DispatchQoS
private let queueKey : DispatchSpecificKey = DispatchSpecificKey < String > ( )
fileprivate let queueContext : String
fileprivate let jobVariants : [ Job . Variant ]
private lazy var internalQueue : DispatchQueue = {
let result : DispatchQueue = DispatchQueue (
label : self . queueContext ,
qos : self . qosClass ,
attributes : ( self . executionType = = . concurrent ? [ . concurrent ] : [ ] ) ,
autoreleaseFrequency : . inherit ,
target : nil
)
result . setSpecific ( key : queueKey , value : queueContext )
return result
} ( )
private var executorMap : Atomic < [ Job . Variant : JobExecutor . Type ] > = Atomic ( [ : ] )
fileprivate var canStart : ( ( JobQueue ? ) -> Bool ) ?
fileprivate var onQueueDrained : ( ( ) -> ( ) ) ?
fileprivate var hasStartedAtLeastOnce : Atomic < Bool > = Atomic ( false )
fileprivate var isRunning : Atomic < Bool > = Atomic ( false )
fileprivate var pendingJobsQueue : Atomic < [ Job ] > = Atomic ( [ ] )
private var nextTrigger : Atomic < Trigger ? > = Atomic ( nil )
fileprivate var jobCallbacks : Atomic < [ Int64 : [ ( JobRunner . JobResult ) -> ( ) ] ] > = Atomic ( [ : ] )
fileprivate var currentlyRunningJobIds : Atomic < Set < Int64 > > = Atomic ( [ ] )
private var currentlyRunningJobInfo : Atomic < [ Int64 : JobRunner . JobInfo ] > = Atomic ( [ : ] )
private var deferLoopTracker : Atomic < [ Int64 : ( count : Int , times : [ TimeInterval ] ) ] > = Atomic ( [ : ] )
private let maxDeferralsPerSecond : Int
fileprivate var hasPendingJobs : Bool { ! pendingJobsQueue . wrappedValue . isEmpty }
// MARK: - I n i t i a l i z a t i o n
fileprivate init (
type : QueueType ,
executionType : ExecutionType ,
qos : DispatchQoS ,
isTestingJobRunner : Bool ,
jobVariants : [ Job . Variant ]
) {
self . type = type
self . executionType = executionType
self . queueContext = " JobQueue- \( type . name ) "
self . qosClass = qos
self . maxDeferralsPerSecond = ( isTestingJobRunner ? 10 : 1 ) // A l l o w f o r t r i p p i n g t h e d e f e r l o o p i n t e s t s
self . jobVariants = jobVariants
}
// MARK: - H a s h a b l e
public func hash ( into hasher : inout Hasher ) {
id . hash ( into : & hasher )
}
public static func = = ( lhs : JobQueue , rhs : JobQueue ) -> Bool {
return ( lhs . id = = rhs . id )
}
// MARK: - C o n f i g u r a t i o n
fileprivate func setExecutor ( _ executor : JobExecutor . Type , for variant : Job . Variant ) {
executorMap . mutate { $0 [ variant ] = executor }
}
// MARK: - E x e c u t i o n
fileprivate func add (
_ db : Database ,
job : Job ,
canStartJob : Bool ,
using dependencies : Dependencies
) {
// C h e c k i f t h e j o b s h o u l d b e a d d e d t o t h e q u e u e
guard
canStartJob ,
job . behaviour != . runOnceNextLaunch ,
job . nextRunTimestamp <= dependencies . dateNow . timeIntervalSince1970
else { return }
guard job . id != nil else {
SNLog ( " [JobRunner] Prevented attempt to add \( job . variant ) job without id to queue " )
return
}
pendingJobsQueue . mutate { $0 . append ( job ) }
// I f t h i s i s a c o n c u r r e n t q u e u e t h e n w e s h o u l d i m m e d i a t e l y s t a r t t h e n e x t j o b
guard executionType = = . concurrent else { return }
// E n s u r e t h a t t h e d a t a b a s e c o m m i t h a s c o m p l e t e d a n d t h e n t r i g g e r t h e n e x t j o b t o r u n ( n e e d
// t o e n s u r e a n y i n t e r a c t i o n s h a v e b e e n c o r r e c t l y i n s e r t e d f i r s t )
db . afterNextTransactionNestedOnce ( dedupeId : " JobRunner-Add: \( job . variant ) " ) { [ weak self ] _ in
self ? . runNextJob ( using : dependencies )
}
}
// / U p s e r t a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
fileprivate func upsert (
_ db : Database ,
job : Job ,
canStartJob : Bool ,
using dependencies : Dependencies
) {
guard let jobId : Int64 = job . id else {
SNLog ( " [JobRunner] Prevented attempt to upsert \( job . variant ) job without id to queue " )
return
}
// L o c k t h e p e n d i n g J o b s Q u e u e w h i l e c h e c k i n g t h e i n d e x a n d i n s e r t i n g t o e n s u r e w e d o n ' t r u n i n t o
// a n y m u l t i - t h r e a d i n g s h e n a n i g a n s
//
// N o t e : c u r r e n t l y r u n n i n g j o b s a r e r e m o v e d f r o m t h e p e n d i n g J o b s Q u e u e s o w e d o n ' t n e e d t o c h e c k
// t h e ' j o b s C u r r e n t l y R u n n i n g ' s e t
var didUpdateExistingJob : Bool = false
pendingJobsQueue . mutate { queue in
if let jobIndex : Array < Job > . Index = queue . firstIndex ( where : { $0 . id = = jobId } ) {
queue [ jobIndex ] = job
didUpdateExistingJob = true
}
}
// I f w e d i d n ' t u p d a t e a n e x i s t i n g j o b t h e n w e n e e d t o a d d i t t o t h e p e n d i n g J o b s Q u e u e
guard ! didUpdateExistingJob else { return }
add ( db , job : job , canStartJob : canStartJob , using : dependencies )
}
fileprivate func insert ( _ job : Job , before otherJob : Job ) {
guard job . id != nil else {
SNLog ( " [JobRunner] Prevented attempt to insert \( job . variant ) job without id to queue " )
return
}
// I n s e r t t h e j o b b e f o r e t h e c u r r e n t j o b ( r e - a d d i n g t h e c u r r e n t j o b t o
// t h e s t a r t o f t h e p e n d i n g J o b s Q u e u e i f i t ' s n o t i n t h e r e ) - t h i s w i l l m e a n t h e n e w
// j o b w i l l r u n a n d t h e n t h e o t h e r J o b w i l l r u n ( o r r u n a g a i n ) o n c e i t ' s
// d o n e
pendingJobsQueue . mutate {
guard let otherJobIndex : Int = $0 . firstIndex ( of : otherJob ) else {
$0 . insert ( contentsOf : [ job , otherJob ] , at : 0 )
return
}
$0 . insert ( job , at : otherJobIndex )
}
}
fileprivate func appDidFinishLaunching (
with jobs : [ Job ] ,
canStart : Bool ,
using dependencies : Dependencies
) {
pendingJobsQueue . mutate { $0 . append ( contentsOf : jobs ) }
// S t a r t t h e j o b r u n n e r i f n e e d e d
if canStart && ! isRunning . wrappedValue {
start ( using : dependencies )
}
}
fileprivate func appDidBecomeActive (
with jobs : [ Job ] ,
canStart : Bool ,
using dependencies : Dependencies
) {
let currentlyRunningJobIds : Set < Int64 > = currentlyRunningJobIds . wrappedValue
pendingJobsQueue . mutate { queue in
// A v o i d r e - a d d i n g j o b s t o t h e q u e u e t h a t a r e a l r e a d y i n i t ( t h i s c a n
// h a p p e n i f t h e u s e r s e n d s t h e a p p t o t h e b a c k g r o u n d b e f o r e t h e ' o n A c t i v e '
// j o b s a n d t h e n b r i n g s i t b a c k t o t h e f o r e g r o u n d )
let jobsNotAlreadyInQueue : [ Job ] = jobs
. filter { job in
! currentlyRunningJobIds . contains ( job . id ? ? - 1 ) &&
! queue . contains ( where : { $0 . id = = job . id } )
}
queue . append ( contentsOf : jobsNotAlreadyInQueue )
}
// S t a r t t h e j o b r u n n e r i f n e e d e d
if canStart && ! isRunning . wrappedValue {
start ( using : dependencies )
}
}
fileprivate func infoForAllCurrentlyRunningJobs ( ) -> [ Int64 : JobRunner . JobInfo ] {
return currentlyRunningJobInfo . wrappedValue
}
fileprivate func afterJob ( _ jobId : Int64 , state : JobRunner . JobState , callback : @ escaping ( JobRunner . JobResult ) -> ( ) ) {
// / C h e c k i f t h e c u r r e n t j o b s t a t e m a t c h e s t h e r e q u e s t e d s t a t e ( i f n o t t h e n t h e j o b i n t h e r e q u e s t e d s t a t e c a n ' t b e f o u n d s o s t o p h e r e )
switch ( state , currentlyRunningJobIds . wrappedValue . contains ( jobId ) ) {
case ( . running , false ) : return callback ( . notFound )
case ( . pending , true ) : return callback ( . notFound )
default : break
}
jobCallbacks . mutate { jobCallbacks in
jobCallbacks [ jobId ] = ( jobCallbacks [ jobId ] ? ? [ ] ) . appending ( callback )
}
}
fileprivate func hasPendingOrRunningJobWith (
threadId : String ? = nil ,
interactionId : Int64 ? = nil ,
detailsData : Data ? = nil
) -> Bool {
let pendingJobs : [ Job ] = pendingJobsQueue . wrappedValue
let currentlyRunningJobInfo : [ Int64 : JobRunner . JobInfo ] = currentlyRunningJobInfo . wrappedValue
var possibleJobIds : Set < Int64 > = Set ( currentlyRunningJobInfo . keys )
. inserting ( contentsOf : pendingJobs . compactMap { $0 . id } . asSet ( ) )
// R e m o v e a n y w h i c h d o n ' t h a v e t h e m a t c h i n g t h r e a d I d ( i f p r o v i d e d )
if let targetThreadId : String = threadId {
let pendingJobIdsWithWrongThreadId : Set < Int64 > = pendingJobs
. filter { $0 . threadId != targetThreadId }
. compactMap { $0 . id }
. asSet ( )
let runningJobIdsWithWrongThreadId : Set < Int64 > = currentlyRunningJobInfo
. filter { _ , info -> Bool in info . threadId != targetThreadId }
. map { key , _ in key }
. asSet ( )
possibleJobIds = possibleJobIds
. subtracting ( pendingJobIdsWithWrongThreadId )
. subtracting ( runningJobIdsWithWrongThreadId )
}
// R e m o v e a n y w h i c h d o n ' t h a v e t h e m a t c h i n g i n t e r a c t i o n I d ( i f p r o v i d e d )
if let targetInteractionId : Int64 = interactionId {
let pendingJobIdsWithWrongInteractionId : Set < Int64 > = pendingJobs
. filter { $0 . interactionId != targetInteractionId }
. compactMap { $0 . id }
. asSet ( )
let runningJobIdsWithWrongInteractionId : Set < Int64 > = currentlyRunningJobInfo
. filter { _ , info -> Bool in info . interactionId != targetInteractionId }
. map { key , _ in key }
. asSet ( )
possibleJobIds = possibleJobIds
. subtracting ( pendingJobIdsWithWrongInteractionId )
. subtracting ( runningJobIdsWithWrongInteractionId )
}
// R e m o v e a n y w h i c h d o n ' t h a v e t h e m a t c h i n g d e t a i l s ( i f p r o v i d e d )
if let targetDetailsData : Data = detailsData {
let pendingJobIdsWithWrongDetailsData : Set < Int64 > = pendingJobs
. filter { $0 . details != targetDetailsData }
. compactMap { $0 . id }
. asSet ( )
let runningJobIdsWithWrongDetailsData : Set < Int64 > = currentlyRunningJobInfo
. filter { _ , info -> Bool in info . detailsData != detailsData }
. map { key , _ in key }
. asSet ( )
possibleJobIds = possibleJobIds
. subtracting ( pendingJobIdsWithWrongDetailsData )
. subtracting ( runningJobIdsWithWrongDetailsData )
}
return ! possibleJobIds . isEmpty
}
fileprivate func removePendingJob ( _ jobId : Int64 ) {
pendingJobsQueue . mutate { queue in
queue = queue . filter { $0 . id != jobId }
}
}
// MARK: - J o b R u n n i n g
fileprivate func start (
forceWhenAlreadyRunning : Bool = false ,
using dependencies : Dependencies
) {
// O n l y s t a r t i f t h e J o b R u n n e r i s a l l o w e d t o s t a r t t h e q u e u e
guard canStart ? ( self ) = = true else { return }
guard forceWhenAlreadyRunning || ! isRunning . wrappedValue else { return }
// T h e J o b R u n n e r r u n s s y n c h r o n o u s l y w e n e e d t o e n s u r e t h i s d o e s n ' t s t a r t
// o n t h e m a i n t h r e a d ( i f i t i s o n t h e m a i n t h r e a d t h e n s w a p t o a d i f f e r e n t t h r e a d )
guard DispatchQueue . with ( key : queueKey , matches : queueContext , using : dependencies ) else {
internalQueue . async ( using : dependencies ) { [ weak self ] in
self ? . start ( using : dependencies )
}
return
}
// F l a g t h e J o b R u n n e r a s r u n n i n g ( t o p r e v e n t s o m e t h i n g e l s e f r o m t r y i n g t o s t a r t i t
// a n d m e s s i n g w i t h t h e e x e c u t i o n b e h a v i o u r )
var wasAlreadyRunning : Bool = false
isRunning . mutate { isRunning in
wasAlreadyRunning = isRunning
isRunning = true
}
hasStartedAtLeastOnce . mutate { $0 = true }
// G e t a n y p e n d i n g j o b s
let jobVariants : [ Job . Variant ] = self . jobVariants
let jobIdsAlreadyRunning : Set < Int64 > = currentlyRunningJobIds . wrappedValue
let jobsAlreadyInQueue : Set < Int64 > = pendingJobsQueue . wrappedValue . compactMap { $0 . id } . asSet ( )
let jobsToRun : [ Job ] = dependencies [ singleton : . storage ] . read ( using : dependencies ) { db in
try Job
. filterPendingJobs (
variants : jobVariants ,
excludeFutureJobs : true ,
includeJobsWithDependencies : false
)
. filter ( ! jobIdsAlreadyRunning . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y r u n n i n g
. filter ( ! jobsAlreadyInQueue . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y i n t h e q u e u e
. fetchAll ( db )
}
. defaulting ( to : [ ] )
// D e t e r m i n e t h e n u m b e r o f j o b s t o r u n
var jobCount : Int = 0
pendingJobsQueue . mutate { queue in
queue . append ( contentsOf : jobsToRun )
jobCount = queue . count
}
// I f t h e r e a r e n o p e n d i n g j o b s a n d n o t h i n g i n t h e q u e u e t h e n s c h e d u l e t h e J o b R u n n e r
// t o s t a r t a g a i n w h e n t h e n e x t s c h e d u l e d j o b s h o u l d s t a r t
guard jobCount > 0 else {
if jobIdsAlreadyRunning . isEmpty {
isRunning . mutate { $0 = false }
scheduleNextSoonestJob ( using : dependencies )
}
return
}
// R u n t h e f i r s t j o b i n t h e p e n d i n g J o b s Q u e u e
if ! wasAlreadyRunning {
SNLogNotTests ( " [JobRunner] Starting \( queueContext ) with ( \( jobCount ) job \( jobCount != 1 ? " s " : " " ) ) " )
}
runNextJob ( using : dependencies )
}
fileprivate func stopAndClearPendingJobs ( ) {
isRunning . mutate { $0 = false }
pendingJobsQueue . mutate { $0 = [ ] }
deferLoopTracker . mutate { $0 = [ : ] }
}
private func runNextJob ( using dependencies : Dependencies ) {
// E n s u r e t h e q u e u e i s r u n n i n g ( i f w e ' v e s t o p p e d t h e q u e u e t h e n w e s h o u l d n ' t s t a r t t h e n e x t j o b )
guard isRunning . wrappedValue else { return }
// E n s u r e t h i s i s r u n n i n g o n t h e c o r r e c t q u e u e
guard DispatchQueue . with ( key : queueKey , matches : queueContext , using : dependencies ) else {
internalQueue . async ( using : dependencies ) { [ weak self ] in
self ? . runNextJob ( using : dependencies )
}
return
}
guard let ( nextJob , numJobsRemaining ) : ( Job , Int ) = pendingJobsQueue . mutate ( { queue in queue . popFirst ( ) . map { ( $0 , queue . count ) } } ) else {
// I f i t ' s a s e r i a l q u e u e , o r t h e r e a r e n o m o r e j o b s r u n n i n g t h e n u p d a t e t h e ' i s R u n n i n g ' f l a g
if executionType != . concurrent || currentlyRunningJobIds . wrappedValue . isEmpty {
isRunning . mutate { $0 = false }
}
// A l w a y s a t t e m p t t o s c h e d u l e t h e n e x t s o o n e s t j o b ( o t h e r w i s e i f e n o u g h j o b s g e t s t a r t e d i n r a p i d
// s u c c e s s i o n t h e n p e n d i n g / f a i l e d j o b s i n t h e d a t a b a s e m a y n e v e r g e t r e - s t a r t e d i n a c o n c u r r e n t q u e u e )
scheduleNextSoonestJob ( using : dependencies )
return
}
guard let jobExecutor : JobExecutor . Type = executorMap . wrappedValue [ nextJob . variant ] else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing executor " )
handleJobFailed (
nextJob ,
error : JobRunnerError . executorMissing ,
permanentFailure : true ,
using : dependencies
)
return
}
guard ! jobExecutor . requiresThreadId || nextJob . threadId != nil else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing required threadId " )
handleJobFailed (
nextJob ,
error : JobRunnerError . requiredThreadIdMissing ,
permanentFailure : true ,
using : dependencies
)
return
}
guard ! jobExecutor . requiresInteractionId || nextJob . interactionId != nil else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing required interactionId " )
handleJobFailed (
nextJob ,
error : JobRunnerError . requiredInteractionIdMissing ,
permanentFailure : true ,
using : dependencies
)
return
}
guard nextJob . id != nil else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing id " )
handleJobFailed (
nextJob ,
error : JobRunnerError . jobIdMissing ,
permanentFailure : false ,
using : dependencies
)
return
}
// I f t h e ' n e x t R u n T i m e s t a m p ' f o r t h e j o b i s i n t h e f u t u r e t h e n d o n ' t r u n i t y e t
guard nextJob . nextRunTimestamp <= dependencies . dateNow . timeIntervalSince1970 else {
handleJobDeferred ( nextJob , using : dependencies )
return
}
// C h e c k i f t h e n e x t j o b h a s a n y d e p e n d e n c i e s
let dependencyInfo : ( expectedCount : Int , jobs : Set < Job > ) = dependencies [ singleton : . storage ] . read ( using : dependencies ) { db in
let expectedDependencies : Set < JobDependencies > = try JobDependencies
. filter ( JobDependencies . Columns . jobId = = nextJob . id )
. fetchSet ( db )
let jobDependencies : Set < Job > = try Job
. filter ( ids : expectedDependencies . compactMap { $0 . dependantId } )
. fetchSet ( db )
return ( expectedDependencies . count , jobDependencies )
}
. defaulting ( to : ( 0 , [ ] ) )
guard dependencyInfo . jobs . count = = dependencyInfo . expectedCount else {
SNLog ( " [JobRunner] \( queueContext ) Removing \( nextJob . variant ) job due to missing dependencies " )
handleJobFailed (
nextJob ,
error : JobRunnerError . missingDependencies ,
permanentFailure : true ,
using : dependencies
)
return
}
guard dependencyInfo . jobs . isEmpty else {
SNLog ( " [JobRunner] \( queueContext ) Deferring \( nextJob . variant ) job until \( dependencyInfo . jobs . count ) dependenc \( dependencyInfo . jobs . count = = 1 ? " y " : " ies " ) are completed " )
// E n q u e u e t h e d e p e n d e n c i e s t h e n d e f e r t h e c u r r e n t j o b
dependencies [ singleton : . jobRunner ] . enqueueDependenciesIfNeeded (
Array ( dependencyInfo . jobs ) ,
using : dependencies
)
handleJobDeferred ( nextJob , using : dependencies )
return
}
// U p d a t e t h e s t a t e t o i n d i c a t e t h e p a r t i c u l a r j o b i s r u n n i n g
//
// N o t e : W e n e e d t o s t o r e ' n u m J o b s R e m a i n i n g ' i n i t ' s o w n v a r i a b l e b e c a u s e
// t h e ' S N L o g ' s e e m s t o d i s p a t c h t o i t ' s o w n q u e u e w h i c h e n d s u p g e t t i n g
// b l o c k e d b y t h e J o b R u n n e r ' s q u e u e b e c u a s e ' j o b Q u e u e ' i s A t o m i c
var numJobsRunning : Int = 0
nextTrigger . mutate { trigger in
trigger ? . invalidate ( ) // N e e d t o i n v a l i d a t e t o p r e v e n t a m e m o r y l e a k
trigger = nil
}
currentlyRunningJobIds . mutate { currentlyRunningJobIds in
currentlyRunningJobIds = currentlyRunningJobIds . inserting ( nextJob . id )
numJobsRunning = currentlyRunningJobIds . count
}
currentlyRunningJobInfo . mutate { currentlyRunningJobInfo in
currentlyRunningJobInfo = currentlyRunningJobInfo . setting (
nextJob . id ,
JobRunner . JobInfo (
variant : nextJob . variant ,
threadId : nextJob . threadId ,
interactionId : nextJob . interactionId ,
detailsData : nextJob . details ,
uniqueHashValue : nextJob . uniqueHashValue
)
)
}
SNLog ( " [JobRunner] \( queueContext ) started \( nextJob . variant ) job ( \( executionType = = . concurrent ? " \( numJobsRunning ) currently running, " : " " ) \( numJobsRemaining ) remaining) " )
// / A s i t t u r n s o u t C o m b i n e d o e s n ' t p l a t t o o n i c e l y w i t h c o n c u r r e n t D i s p a t c h Q u e u e s , i n C o m b i n e e v e n t s a r e d i s p a t c h e d a s y n c h r o n o u s l y t o
// / t h e q u e u e w h i c h m e a n s a n o d d s i t u a t i o n c a n o c c a s i o n a l l y o c c u r w h e r e t h e ` f i n i s h e d ` e v e n t c a n a c t u a l l y r u n b e f o r e t h e ` o u t p u t `
// / e v e n t - t h i s c a n r e s u l t i n u n e x p e c t e d b e h a v i o u r s ( f o r m o r e i n f o r m a t i o n s e e h t t p s : / / g i t h u b . c o m / g r o u e / G R D B . s w i f t / i s s u e s / 1 3 3 4 )
// /
// / D u e t o t h i s i f a j o b i s m e a n t t o r u n o n a c o n c u r r e n t q u e u e t h e n w e a c t u a l l y w a n t t o c r e a t e a t e m p o r a r y s e r i a l q u e u e j u s t f o r t h e e x e c u t i o n
// / o f t h a t j o b
let targetQueue : DispatchQueue = {
guard executionType = = . concurrent else { return internalQueue }
return DispatchQueue (
label : " \( self . queueContext ) -serial " ,
qos : self . qosClass ,
attributes : [ ] ,
autoreleaseFrequency : . inherit ,
target : nil
)
} ( )
jobExecutor . run (
nextJob ,
queue : targetQueue ,
success : handleJobSucceeded ,
failure : handleJobFailed ,
deferred : handleJobDeferred ,
using : dependencies
)
// I f t h i s q u e u e e x e c u t e s c o n c u r r e n t l y a n d t h e r e a r e s t i l l j o b s r e m a i n i n g t h e n i m m e d i a t e l y a t t e m p t
// t o s t a r t t h e n e x t j o b
if executionType = = . concurrent && numJobsRemaining > 0 {
internalQueue . async ( using : dependencies ) { [ weak self ] in
self ? . runNextJob ( using : dependencies )
}
}
}
private func scheduleNextSoonestJob ( using dependencies : Dependencies ) {
let jobVariants : [ Job . Variant ] = self . jobVariants
let jobIdsAlreadyRunning : Set < Int64 > = currentlyRunningJobIds . wrappedValue
let nextJobTimestamp : TimeInterval ? = dependencies [ singleton : . storage ] . read ( using : dependencies ) { db in
try Job
. filterPendingJobs (
variants : jobVariants ,
excludeFutureJobs : false ,
includeJobsWithDependencies : false
)
. select ( . nextRunTimestamp )
. filter ( ! jobIdsAlreadyRunning . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y r u n n i n g
. asRequest ( of : TimeInterval . self )
. fetchOne ( db )
}
// I f t h e r e a r e n o r e m a i n i n g j o b s o r t h e J o b R u n n e r i s n ' t a l l o w e d t o s t a r t a n y q u e u e s t h e n t r i g g e r
// t h e ' o n Q u e u e D r a i n e d ' c a l l b a c k a n d s t o p
guard let nextJobTimestamp : TimeInterval = nextJobTimestamp , canStart ? ( self ) = = true else {
if executionType != . concurrent || currentlyRunningJobIds . wrappedValue . isEmpty {
self . onQueueDrained ? ( )
}
return
}
// I f t h e n e x t j o b i s n ' t s c h e d u l e d i n t h e f u t u r e t h e n j u s t r e s t a r t t h e J o b R u n n e r i m m e d i a t e l y
let secondsUntilNextJob : TimeInterval = ( nextJobTimestamp - dependencies . dateNow . timeIntervalSince1970 )
guard secondsUntilNextJob > 0 else {
// O n l y l o g t h a t t h e q u e u e i s g e t t i n g r e s t a r t e d i f t h i s q u e u e h a d a c t u a l l y b e e n a b o u t t o s t o p
if executionType != . concurrent || currentlyRunningJobIds . wrappedValue . isEmpty {
let timingString : String = ( nextJobTimestamp = = 0 ?
" that should be in the queue " :
" scheduled \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) ago "
)
SNLog ( " [JobRunner] Restarting \( queueContext ) immediately for job \( timingString ) " )
}
// T r i g g e r t h e ' s t a r t ' f u n c t i o n t o l o a d i n a n y p e n d i n g j o b s t h a t a r e n ' t a l r e a d y i n t h e
// q u e u e ( f o r c o n c u r r e n t q u e u e s w e w a n t t o f o r c e t h e m t o l o a d i n p e n d i n g j o b s a n d a d d
// t h e m t o t h e q u e u e r e g a r d l e s s o f w h e t h e r t h e q u e u e i s a l r e a d y r u n n i n g )
internalQueue . async ( using : dependencies ) { [ weak self ] in
self ? . start ( forceWhenAlreadyRunning : ( self ? . executionType = = . concurrent ) , using : dependencies )
}
return
}
// O n l y s c h e d u l e a t r i g g e r i f t h i s q u e u e h a s a c t u a l l y c o m p l e t e d
guard executionType != . concurrent || currentlyRunningJobIds . wrappedValue . isEmpty else { return }
// S e t u p a t r i g g e r
SNLog ( " [JobRunner] Stopping \( queueContext ) until next job in \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) " )
nextTrigger . mutate { trigger in
trigger ? . invalidate ( ) // N e e d t o i n v a l i d a t e t h e o l d t r i g g e r t o p r e v e n t a m e m o r y l e a k
trigger = Trigger . create ( queue : self , timestamp : nextJobTimestamp , using : dependencies )
}
}
// MARK: - H a n d l i n g R e s u l t s
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b s u c c e e d s
fileprivate func handleJobSucceeded (
_ job : Job ,
shouldStop : Bool ,
using dependencies : Dependencies
) {
// / R e t r i e v e t h e d e p e n d a n t j o b s f i r s t ( t h e ` J o b D e p e n d e c i e s ` t a b l e h a s c a s c a d i n g d e l e t i o n w h e n t h e o r i g i n a l ` J o b ` i s
// / r e m o v e d s o w e n e e d t o r e t r i e v e t h e s e r e c o r d s b e f o r e t h a t h a p p e n s )
let dependantJobs : [ Job ] = dependencies [ singleton : . storage ]
. read ( using : dependencies ) { db in try job . dependantJobs . fetchAll ( db ) }
. defaulting ( to : [ ] )
switch job . behaviour {
case . runOnce , . runOnceNextLaunch :
dependencies [ singleton : . storage ] . write ( using : dependencies ) { db in
// / S i n c e t h i s j o b h a s b e e n c o m p l e t e d w e c a n u p d a t e t h e d e p e n d e n c i e s s o o t h e r j o b t h a t w e r e d e p e n d a n t
// / o n t h i s o n e c a n b e r u n
_ = try JobDependencies
. filter ( JobDependencies . Columns . dependantId = = job . id )
. deleteAll ( db )
_ = try job . delete ( db )
}
case . recurring where shouldStop = = true :
dependencies [ singleton : . storage ] . write ( using : dependencies ) { db in
// / S i n c e t h i s j o b h a s b e e n c o m p l e t e d w e c a n u p d a t e t h e d e p e n d e n c i e s s o o t h e r j o b t h a t w e r e d e p e n d a n t
// / o n t h i s o n e c a n b e r u n
_ = try JobDependencies
. filter ( JobDependencies . Columns . dependantId = = job . id )
. deleteAll ( db )
_ = try job . delete ( db )
}
// / F o r ` r e c u r r i n g ` j o b s w h i c h h a v e a l r e a d y r u n , t h e y s h o u l d a u t o m a t i c a l l y r u n a g a i n b u t w e w a n t a t l e a s t 1 s e c o n d
// / t o p a s s b e f o r e d o i n g s o - t h e j o b i t s e l f s h o u l d r e a l l y u p d a t e i t ' s o w n ` n e x t R u n T i m e s t a m p ` ( t h i s i s j u s t a s a f e t y n e t )
case . recurring where job . nextRunTimestamp <= dependencies . dateNow . timeIntervalSince1970 :
guard let jobId : Int64 = job . id else { break }
dependencies [ singleton : . storage ] . write ( using : dependencies ) { db in
_ = try Job
. filter ( id : jobId )
. updateAll (
db ,
Job . Columns . failureCount . set ( to : 0 ) ,
Job . Columns . nextRunTimestamp . set ( to : ( dependencies . dateNow . timeIntervalSince1970 + 1 ) )
)
}
// / F o r ` r e c u r r i n g O n L a u n c h / A c t i v e ` j o b s w h i c h h a v e a l r e a d y r u n b u t f a i l e d o n c e , w e n e e d t o c l e a r t h e i r
// / ` f a i l u r e C o u n t ` a n d ` n e x t R u n T i m e s t a m p ` t o p r e v e n t t h e m f r o m e n d l e s s l y r u n n i n g o v e r a n d o v e r a g a i n
case . recurringOnLaunch , . recurringOnActive :
guard
let jobId : Int64 = job . id ,
job . failureCount != 0 &&
job . nextRunTimestamp > TimeInterval . leastNonzeroMagnitude
else { break }
dependencies [ singleton : . storage ] . write ( using : dependencies ) { db in
_ = try Job
. filter ( id : jobId )
. updateAll (
db ,
Job . Columns . failureCount . set ( to : 0 ) ,
Job . Columns . nextRunTimestamp . set ( to : 0 )
)
}
default : break
}
// / N o w t h a t t h e j o b h a s b e e n c o m p l e t e d w e w a n t t o e n q u e u e a n y j o b s t h a t w e r e d e p e n d a n t o n i t
dependencies [ singleton : . jobRunner ] . enqueueDependenciesIfNeeded (
dependantJobs ,
using : dependencies
)
// P e r f o r m j o b c l e a n u p a n d s t a r t t h e n e x t j o b
performCleanUp ( for : job , result : . succeeded , using : dependencies )
internalQueue . async ( using : dependencies ) { [ weak self ] in
self ? . runNextJob ( using : dependencies )
}
}
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b f a i l s , i f i t ' s w a s n ' t a p e r m a n e n t f a i l u r e t h e n t h e ' f a i l u r e C o u n t ' f o r t h e j o b w i l l b e i n c r e m e n t e d a n d i t ' l l
// / b e r e - r u n a f t e r a r e t r y i n t e r v a l h a s p a s s e d
fileprivate func handleJobFailed (
_ job : Job ,
error : Error ? ,
permanentFailure : Bool ,
using dependencies : Dependencies
) {
guard dependencies [ singleton : . storage ] . read ( using : dependencies , { db in try Job . exists ( db , id : job . id ? ? - 1 ) } ) = = true else {
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job canceled " )
performCleanUp ( for : job , result : . failed ( error , permanentFailure ) , using : dependencies )
internalQueue . async ( using : dependencies ) { [ weak self ] in
self ? . runNextJob ( using : dependencies )
}
return
}
// I f t h i s i s t h e b l o c k i n g q u e u e a n d a " b l o c k i n g " j o b f a i l e d t h e n r e r u n i t
// i m m e d i a t e l y ( i n t h i s c a s e w e d o n ' t t r i g g e r a n y j o b c a l l b a c k s b e c a u s e t h e
// j o b i s n ' t a c t u a l l y d o n e , i t ' s g o i n g t o t r y a g a i n i m m e d i a t e l y )
if self . type = = . blocking && job . shouldBlock {
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job failed; retrying immediately " )
// I f i t w a s a p o s s i b l e d e f e r r a l l o o p t h e n w e d o n ' t a c t u a l l y w a n t t o
// r e t r y t h e j o b ( e v e n i f i t ' s a b l o c k i n g o n e , t h i s g i v e s a s m a l l c h a n c e
// t h a t t h e a p p c o u l d c o n t i n u e t o f u n c t i o n )
let wasPossibleDeferralLoop : Bool = {
if let error = error , case JobRunnerError . possibleDeferralLoop = error { return true }
return false
} ( )
performCleanUp (
for : job ,
result : . failed ( error , permanentFailure ) ,
shouldTriggerCallbacks : wasPossibleDeferralLoop ,
using : dependencies
)
// O n l y a d d i t b a c k t o t h e q u e u e i f i t w a s n ' t a d e f e r r a l l o o p
if ! wasPossibleDeferralLoop {
pendingJobsQueue . mutate { $0 . insert ( job , at : 0 ) }
}
internalQueue . async ( using : dependencies ) { [ weak self ] in
self ? . runNextJob ( using : dependencies )
}
return
}
// G e t t h e m a x f a i l u r e c o u n t f o r t h e j o b ( a v a l u e o f ' - 1 ' m e a n s i t w i l l r e t r y i n d e f i n i t e l y )
let maxFailureCount : Int = ( executorMap . wrappedValue [ job . variant ] ? . maxFailureCount ? ? 0 )
let nextRunTimestamp : TimeInterval = ( dependencies . dateNow . timeIntervalSince1970 + JobRunner . getRetryInterval ( for : job ) )
var dependantJobIds : [ Int64 ] = [ ]
var failureText : String = " failed "
dependencies [ singleton : . storage ] . write ( using : dependencies ) { db in
// / R e t r i e v e a l i s t o f d e p e n d a n t j o b s s o w e c a n c l e a r t h e m f r o m t h e q u e u e
dependantJobIds = try job . dependantJobs
. select ( . id )
. asRequest ( of : Int64 . self )
. fetchAll ( db )
// / D e l e t e / u p d a t e t h e f a i l e d j o b s a n d a n y d e p e n d e n c i e s
let updatedFailureCount : UInt = ( job . failureCount + 1 )
guard
! permanentFailure && (
maxFailureCount < 0 ||
updatedFailureCount <= maxFailureCount
)
else {
failureText = ( maxFailureCount >= 0 && updatedFailureCount > maxFailureCount ?
" failed permanently; too many retries " :
" failed permanently "
)
// I f t h e j o b p e r m a n e n t l y f a i l e d o r w e h a v e p e r f o r m e d a l l o f o u r r e t r y a t t e m p t s
// t h e n d e l e t e t h e j o b a n d a l l o f i t ' s d e p e n d a n t j o b s ( i t ' l l p r o b a b l y n e v e r s u c c e e d )
_ = try job . dependantJobs
. deleteAll ( db )
_ = try job . delete ( db )
return
}
failureText = " failed; scheduling retry (failure count is \( updatedFailureCount ) ) "
try job
. with (
failureCount : updatedFailureCount ,
nextRunTimestamp : nextRunTimestamp
)
. upserted ( db )
// U p d a t e t h e f a i l u r e C o u n t a n d n e x t R u n T i m e s t a m p o n d e p e n d a n t j o b s a s w e l l ( u p d a t e t h e
// ' n e x t R u n T i m e s t a m p ' v a l u e t o b e 1 m s l a t e r s o w h e n t h e q u e u e g e t s r e g e n e r a t e d t h e y ' l l
// c o m e a f t e r t h e d e p e n d e n c y )
try job . dependantJobs
. updateAll (
db ,
Job . Columns . failureCount . set ( to : updatedFailureCount ) ,
Job . Columns . nextRunTimestamp . set ( to : ( nextRunTimestamp + ( 1 / 1000 ) ) )
)
}
// / R e m o v e a n y d e p e n d a n t j o b s f r o m t h e q u e u e ( s h o u l d n ' t b e i n t h e r e b u t f i l t e r t h e q u e u e j u s t i n c a s e s o w e d o n ' t t r y
// / t o r u n a d e l e t e d j o b o r g e t s t u c k i n a l o o p o f t r y i n g t o r u n d e p e n d e n c i e s i n d e f i n i t e l y )
if ! dependantJobIds . isEmpty {
pendingJobsQueue . mutate { queue in
queue = queue . filter { ! dependantJobIds . contains ( $0 . id ? ? - 1 ) }
}
}
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job \( failureText ) " )
performCleanUp ( for : job , result : . failed ( error , permanentFailure ) , using : dependencies )
internalQueue . async ( using : dependencies ) { [ weak self ] in
self ? . runNextJob ( using : dependencies )
}
}
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b n e i t h e r s u c c e e d s o r f a i l s ( t h i s s h o u l d o n l y o c c u r i f t h e j o b h a s s p e c i f i c l o g i c t h a t m a k e s i t d e p e n d a n t
// / o n o t h e r j o b s , a n d i t s h o u l d a u t o m a t i c a l l y m a n a g e t h o s e d e p e n d e n c i e s )
fileprivate func handleJobDeferred (
_ job : Job ,
using dependencies : Dependencies
) {
var stuckInDeferLoop : Bool = false
deferLoopTracker . mutate {
guard let lastRecord : ( count : Int , times : [ TimeInterval ] ) = $0 [ job . id ] else {
$0 = $0 . setting (
job . id ,
( 1 , [ dependencies . dateNow . timeIntervalSince1970 ] )
)
return
}
let timeNow : TimeInterval = dependencies . dateNow . timeIntervalSince1970
stuckInDeferLoop = (
lastRecord . count >= JobQueue . deferralLoopThreshold &&
( timeNow - lastRecord . times [ 0 ] ) < CGFloat ( lastRecord . count * maxDeferralsPerSecond )
)
$0 = $0 . setting (
job . id ,
(
lastRecord . count + 1 ,
// O n l y s t o r e t h e l a s t ' d e f e r r a l L o o p T h r e s h o l d ' t i m e s t o e n s u r e w e a r e n ' t r u n n i n g f a s t e r
// t h a n o n e l o o p p e r s e c o n d
lastRecord . times . suffix ( JobQueue . deferralLoopThreshold - 1 ) + [ timeNow ]
)
)
}
// I t ' s p o s s i b l e ( b y i n t r o d u c i n g b u g s ) t o c r e a t e a l o o p w h e r e a J o b t r i e s t o r u n a n d i m m e d i a t e l y
// d e f e r s i t s e l f b u t t h e n a t t e m p t s t o r u n a g a i n ( r e s u l t i n g i n a n i n f i n i t e l o o p ) ; t h i s w o n ' t b l o c k
// t h e a p p s i n c e i t ' s o n a b a c k g r o u n d t h r e a d b u t c a n r e s u l t i n 1 0 0 % o f a C P U b e i n g u s e d ( a n d a
// b a t t e r y d r a i n )
//
// T h i s c o d e w i l l m a i n t a i n a n i n - m e m o r y s t o r e f o r a n y j o b s w h i c h a r e d e f e r r e d t o o q u i c k l y ( i e .
// m o r e t h a n ' d e f e r r a l L o o p T h r e s h o l d ' t i m e s w i t h i n ' d e f e r r a l L o o p T h r e s h o l d ' s e c o n d s )
guard ! stuckInDeferLoop else {
deferLoopTracker . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
handleJobFailed (
job ,
error : JobRunnerError . possibleDeferralLoop ,
permanentFailure : false ,
using : dependencies
)
return
}
performCleanUp ( for : job , result : . deferred , using : dependencies )
internalQueue . async ( using : dependencies ) { [ weak self ] in
self ? . runNextJob ( using : dependencies )
}
}
fileprivate func performCleanUp (
for job : Job ,
result : JobRunner . JobResult ,
shouldTriggerCallbacks : Bool = true ,
using dependencies : Dependencies
) {
// T h e j o b i s r e m o v e d f r o m t h e q u e u e b e f o r e i t r u n s s o a l l w e n e e d t o t o i s r e m o v e i t
// f r o m t h e ' c u r r e n t l y R u n n i n g ' s e t
currentlyRunningJobIds . mutate { $0 = $0 . removing ( job . id ) }
currentlyRunningJobInfo . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
guard shouldTriggerCallbacks else { return }
// R u n a n y j o b c a l l b a c k s n o w t h a t i t ' s d o n e
var jobCallbacksToRun : [ ( JobRunner . JobResult ) -> ( ) ] = [ ]
jobCallbacks . mutate { jobCallbacks in
jobCallbacksToRun = ( jobCallbacks [ job . id ] ? ? [ ] )
jobCallbacks = jobCallbacks . removingValue ( forKey : job . id )
}
DispatchQueue . global ( qos : . default ) . async ( using : dependencies ) {
jobCallbacksToRun . forEach { $0 ( result ) }
}
}
}
// MARK: - F o r m a t t i n g
extension String . StringInterpolation {
mutating func appendInterpolation ( _ variant : Job . Variant ? ) {
appendLiteral ( variant . map { " \( $0 ) " } ? ? " unknown " ) // s t r i n g l i n t : d i s a b l e
}
mutating func appendInterpolation ( _ behaviour : Job . Behaviour ? ) {
appendLiteral ( behaviour . map { " \( $0 ) " } ? ? " unknown " ) // s t r i n g l i n t : d i s a b l e
}
}