// C o p y r i g h t © 2 0 2 2 R a n g e p r o o f P t y L t d . A l l r i g h t s r e s e r v e d .
import Foundation
import GRDB
import SignalCoreKit
public protocol JobExecutor {
// / T h e m a x i m u m n u m b e r o f t i m e s t h e j o b c a n f a i l b e f o r e i t f a i l s p e r m a n e n t l y
// /
// / * * N o t e : * * A v a l u e o f ` - 1 ` m e a n s i t w i l l r e t r y i n d e f i n i t e l y
static var maxFailureCount : Int { get }
static var requiresThreadId : Bool { get }
static var requiresInteractionId : Bool { get }
// / T h i s m e t h o d c o n t a i n s t h e l o g i c n e e d e d t o c o m p l e t e a j o b
// /
// / * * N o t e : * * T h e c o d e i n t h i s m e t h o d s h o u l d r u n s y n c h r o n o u s l y a n d t h e v a r i o u s
// / " r e s u l t " b l o c k s s h o u l d n o t b e c a l l e d w i t h i n a d a t a b a s e c l o s u r e
// /
// / - P a r a m e t e r s :
// / - j o b : T h e j o b w h i c h i s b e i n g r u n
// / - s u c c e s s : T h e c l o s u r e w h i c h i s c a l l e d w h e n t h e j o b s u c c e e d s ( w i t h a n
// / u p d a t e d ` j o b ` a n d a f l a g i n d i c a t i n g w h e t h e r t h e j o b s h o u l d f o r c i b l y s t o p r u n n i n g )
// / - f a i l u r e : T h e c l o s u r e w h i c h i s c a l l e d w h e n t h e j o b f a i l s ( w i t h a n u p d a t e d
// / ` j o b ` , a n ` E r r o r ` ( i f a p p l i c a b l e ) a n d a f l a g i n d i c a t i n g w h e t h e r i t w a s a p e r m a n e n t
// / f a i l u r e )
// / - d e f e r r e d : T h e c l o s u r e w h i c h i s c a l l e d w h e n t h e j o b i s d e f e r r e d ( w i t h a n
// / u p d a t e d ` j o b ` )
static func run (
_ job : Job ,
queue : DispatchQueue ,
success : @ escaping ( Job , Bool ) -> ( ) ,
failure : @ escaping ( Job , Error ? , Bool ) -> ( ) ,
deferred : @ escaping ( Job ) -> ( )
)
}
public final class JobRunner {
private static let blockingQueue : Atomic < JobQueue ? > = Atomic (
JobQueue (
type : . blocking ,
qos : . default ,
jobVariants : [ ] ,
onQueueDrained : {
// O n c e a l l b l o c k i n g j o b s h a v e b e e n c o m p l e t e d w e w a n t t o s t a r t r u n n i n g
// t h e r e m a i n i n g j o b q u e u e s
queues . wrappedValue . forEach { _ , queue in queue . start ( ) }
}
)
)
private static let queues : Atomic < [ Job . Variant : JobQueue ] > = {
var jobVariants : Set < Job . Variant > = Job . Variant . allCases . asSet ( )
let messageSendQueue : JobQueue = JobQueue (
type : . messageSend ,
executionType : . concurrent , // A l l o w a s m a n y j o b s t o r u n a t o n c e a s s u p p o r t e d b y t h e d e v i c e
qos : . default ,
jobVariants : [
jobVariants . remove ( . attachmentUpload ) ,
jobVariants . remove ( . messageSend ) ,
jobVariants . remove ( . notifyPushServer ) ,
jobVariants . remove ( . sendReadReceipts )
] . compactMap { $0 }
)
let messageReceiveQueue : JobQueue = JobQueue (
type : . messageReceive ,
// E x p l i c i t l y s e r i a l a s e x e c u t i n g c o n c u r r e n t l y m e a n s m e s s a g e r e c e i v e s g e t t i n g p r o c e s s e d a t
// d i f f e r e n t s p e e d s w h i c h c a n r e s u l t i n :
// • S m a l l b a t c h e s o f m e s s a g e s a p p e a r i n g i n t h e U I b e f o r e l a r g e r b a t c h e s
// • C l o s e d g r o u p m e s s a g e s e n c r y p t e d w i t h u p d a t e d k e y s c o u l d s t a r t p a r s i n g b e f o r e i t ' s k e y
// u p d a t e m e s s a g e h a s b e e n p r o c e s s e d ( i e . g u a r a n t e e d t o f a i l )
executionType : . serial ,
qos : . default ,
jobVariants : [
jobVariants . remove ( . messageReceive )
] . compactMap { $0 }
)
let attachmentDownloadQueue : JobQueue = JobQueue (
type : . attachmentDownload ,
qos : . utility ,
jobVariants : [
jobVariants . remove ( . attachmentDownload )
] . compactMap { $0 }
)
let generalQueue : JobQueue = JobQueue (
type : . general ( number : 0 ) ,
qos : . utility ,
jobVariants : Array ( jobVariants )
)
return Atomic ( [
messageSendQueue ,
messageReceiveQueue ,
attachmentDownloadQueue ,
generalQueue
] . reduce ( into : [ : ] ) { prev , next in
next . jobVariants . forEach { variant in
prev [ variant ] = next
}
} )
} ( )
internal static var executorMap : Atomic < [ Job . Variant : JobExecutor . Type ] > = Atomic ( [ : ] )
fileprivate static var perSessionJobsCompleted : Atomic < Set < Int64 > > = Atomic ( [ ] )
private static var hasCompletedInitialBecomeActive : Atomic < Bool > = Atomic ( false )
// MARK: - C o n f i g u r a t i o n
public static func add ( executor : JobExecutor . Type , for variant : Job . Variant ) {
executorMap . mutate { $0 [ variant ] = executor }
}
// MARK: - E x e c u t i o n
// / A d d a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
public static func add ( _ db : Database , job : Job ? , canStartJob : Bool = true ) {
// S t o r e t h e j o b i n t o t h e d a t a b a s e ( g e t t i n g a n i d f o r i t )
guard let updatedJob : Job = try ? job ? . inserted ( db ) else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return
}
queues . mutate { $0 [ updatedJob . variant ] ? . add ( updatedJob , canStartJob : canStartJob ) }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionCommit { _ in
queues . wrappedValue [ updatedJob . variant ] ? . start ( )
}
}
// / U p s e r t a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
public static func upsert ( _ db : Database , job : Job ? , canStartJob : Bool = true ) {
guard let job : Job = job else { return } // I g n o r e n u l l j o b s
queues . wrappedValue [ job . variant ] ? . upsert ( job , canStartJob : canStartJob )
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionCommit { _ in
queues . wrappedValue [ job . variant ] ? . start ( )
}
}
@ discardableResult public static func insert ( _ db : Database , job : Job ? , before otherJob : Job ) -> Job ? {
switch job ? . behaviour {
case . recurringOnActive , . recurringOnLaunch , . runOnceNextLaunch :
SNLog ( " [JobRunner] Attempted to insert \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job before the current one even though it's behaviour is \( job . map { " \( $0 . behaviour ) " } ? ? " unknown " ) " )
return nil
default : break
}
// S t o r e t h e j o b i n t o t h e d a t a b a s e ( g e t t i n g a n i d f o r i t )
guard let updatedJob : Job = try ? job ? . inserted ( db ) else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return nil
}
queues . wrappedValue [ updatedJob . variant ] ? . insert ( updatedJob , before : otherJob )
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionCommit { _ in
queues . wrappedValue [ updatedJob . variant ] ? . start ( )
}
return updatedJob
}
public static func appDidFinishLaunching ( ) {
// N o t e : ' a p p D i d B e c o m e A c t i v e ' w i l l r u n o n f i r s t l a u n c h a n y w a y s o w e c a n
// l e a v e t h o s e j o b s o u t a n d c a n w a i t u n t i l t h e n t o s t a r t t h e J o b R u n n e r
let jobsToRun : ( blocking : [ Job ] , nonBlocking : [ Job ] ) = Storage . shared
. read { db in
let blockingJobs : [ Job ] = try Job
. filter (
[
Job . Behaviour . recurringOnLaunch ,
Job . Behaviour . runOnceNextLaunch
] . contains ( Job . Columns . behaviour )
)
. filter ( Job . Columns . shouldBlock = = true )
. order ( Job . Columns . id )
. fetchAll ( db )
let nonblockingJobs : [ Job ] = try Job
. filter (
[
Job . Behaviour . recurringOnLaunch ,
Job . Behaviour . runOnceNextLaunch
] . contains ( Job . Columns . behaviour )
)
. filter ( Job . Columns . shouldBlock = = false )
. order ( Job . Columns . id )
. fetchAll ( db )
return ( blockingJobs , nonblockingJobs )
}
. defaulting ( to : ( [ ] , [ ] ) )
guard ! jobsToRun . blocking . isEmpty || ! jobsToRun . nonBlocking . isEmpty else { return }
// A d d a n d s t a r t a n y b l o c k i n g j o b s
blockingQueue . wrappedValue ? . appDidFinishLaunching ( with : jobsToRun . blocking , canStart : true )
// A d d a n y n o n - b l o c k i n g j o b s ( w e d o n ' t s t a r t t h e s e i n c a s e t h e r e a r e b l o c k i n g " o n a c t i v e "
// j o b s a s w e l l )
let jobsByVariant : [ Job . Variant : [ Job ] ] = jobsToRun . nonBlocking . grouped ( by : \ . variant )
let jobQueues : [ Job . Variant : JobQueue ] = queues . wrappedValue
jobsByVariant . forEach { variant , jobs in
jobQueues [ variant ] ? . appDidFinishLaunching ( with : jobs , canStart : false )
}
}
public static func appDidBecomeActive ( ) {
let hasCompletedInitialBecomeActive : Bool = JobRunner . hasCompletedInitialBecomeActive . wrappedValue
let jobsToRun : [ Job ] = Storage . shared
. read { db in
return try Job
. filter ( Job . Columns . behaviour = = Job . Behaviour . recurringOnActive )
. order ( Job . Columns . id )
. fetchAll ( db )
}
. defaulting ( to : [ ] )
. filter { hasCompletedInitialBecomeActive || ! $0 . shouldSkipLaunchBecomeActive }
// S t o r e t h e c u r r e n t q u e u e s t a t e l o c a l l y t o a v o i d m u l t i p l e a t o m i c r e t r i e v a l s
let jobQueues : [ Job . Variant : JobQueue ] = queues . wrappedValue
let blockingQueueIsRunning : Bool = ( blockingQueue . wrappedValue ? . isRunning . wrappedValue = = true )
guard ! jobsToRun . isEmpty else {
if ! blockingQueueIsRunning {
jobQueues . forEach { _ , queue in queue . start ( ) }
}
return
}
// A d d a n d s t a r t a n y n o n - b l o c k i n g j o b s ( i f t h e r e a r e n o b l o c k i n g j o b s )
let jobsByVariant : [ Job . Variant : [ Job ] ] = jobsToRun . grouped ( by : \ . variant )
jobQueues . forEach { variant , queue in
queue . appDidBecomeActive (
with : ( jobsByVariant [ variant ] ? ? [ ] ) ,
canStart : ! blockingQueueIsRunning
)
}
JobRunner . hasCompletedInitialBecomeActive . mutate { $0 = true }
}
public static func isCurrentlyRunning ( _ job : Job ? ) -> Bool {
guard let job : Job = job , let jobId : Int64 = job . id else { return false }
return ( queues . wrappedValue [ job . variant ] ? . isCurrentlyRunning ( jobId ) = = true )
}
public static func defailsForCurrentlyRunningJobs ( of variant : Job . Variant ) -> [ Int64 : Data ? ] {
return ( queues . wrappedValue [ variant ] ? . detailsForAllCurrentlyRunningJobs ( ) )
. defaulting ( to : [ : ] )
}
public static func hasPendingOrRunningJob < T : Encodable > ( with variant : Job . Variant , details : T ) -> Bool {
guard let targetQueue : JobQueue = queues . wrappedValue [ variant ] else { return false }
guard let detailsData : Data = try ? JSONEncoder ( ) . encode ( details ) else { return false }
return targetQueue . hasPendingOrRunningJob ( with : detailsData )
}
// MARK: - C o n v e n i e n c e
fileprivate static func getRetryInterval ( for job : Job ) -> TimeInterval {
// A r b i t r a r y b a c k o f f f a c t o r . . .
// t r y 1 d e l a y : 0 . 5 s
// t r y 2 d e l a y : 1 s
// . . .
// t r y 5 d e l a y : 1 6 s
// . . .
// t r y 1 1 d e l a y : 5 1 2 s
let maxBackoff : Double = 10 * 60 // 1 0 m i n u t e s
return 0.25 * min ( maxBackoff , pow ( 2 , Double ( job . failureCount ) ) )
}
}
// MARK: - J o b Q u e u e
private final class JobQueue {
fileprivate enum QueueType : Hashable {
case blocking
case general ( number : Int )
case messageSend
case messageReceive
case attachmentDownload
var name : String {
switch self {
case . blocking : return " Blocking "
case . general ( let number ) : return " General- \( number ) "
case . messageSend : return " MessageSend "
case . messageReceive : return " MessageReceive "
case . attachmentDownload : return " AttachmentDownload "
}
}
}
fileprivate enum ExecutionType {
// / A s e r i a l q u e u e w i l l e x e c u t e o n e j o b a t a t i m e u n t i l t h e q u e u e i s e m p t y , t h e n w i l l l o a d a n y n e w / d e f e r r e d
// / j o b s a n d r u n t h o s e o n e a t a t i m e
case serial
// / A c o n c u r r e n t q u e u e w i l l e x e c u t e a s m a n y j o b s a s t h e d e v i c e s u p p o r t s a t o n c e u n t i l t h e q u e u e i s e m p t y ,
// / t h e n w i l l l o a d a n y n e w / d e f e r r e d j o b s a n d t r y t o s t a r t t h e m a l l
case concurrent
}
private class Trigger {
private var timer : Timer ?
fileprivate var fireTimestamp : TimeInterval = 0
static func create ( queue : JobQueue , timestamp : TimeInterval ) -> Trigger ? {
// / S e t u p t h e t r i g g e r ( w a i t a t l e a s t 1 s e c o n d b e f o r e t r i g g e r i n g )
// /
// / * * N o t e : * * W e u s e t h e ` T i m e r . s c h e d u l e d T i m e r O n M a i n T h r e a d ` m e t h o d b e c a u s e r u n n i n g a t i m e r
// / o n o u r r a n d o m q u e u e t h r e a d s r e s u l t s i n t h e t i m e r n e v e r f i r i n g , t h e ` s t a r t ` m e t h o d w i l l r e d i r e c t i t s e l f t o
// / t h e c o r r e c t t h r e a d
let trigger : Trigger = Trigger ( )
trigger . fireTimestamp = max ( 1 , ( timestamp - Date ( ) . timeIntervalSince1970 ) )
trigger . timer = Timer . scheduledTimerOnMainThread (
withTimeInterval : trigger . fireTimestamp ,
repeats : false ,
block : { [ weak queue ] _ in
queue ? . start ( )
}
)
return trigger
}
func invalidate ( ) {
// N e e d t o d o t h i s t o p r e v e n t a s t r o n g r e f e r e n c e c y c l e
timer ? . invalidate ( )
timer = nil
}
}
private let type : QueueType
private let executionType : ExecutionType
private let qosClass : DispatchQoS
private let queueKey : DispatchSpecificKey = DispatchSpecificKey < String > ( )
private let queueContext : String
// / T h e s p e c i f i c t y p e s o f j o b s t h i s q u e u e m a n a g e s , i f t h i s i s l e f t e m p t y i t w i l l h a n d l e a l l j o b s n o t h a n d l e d b y o t h e r q u e u e s
fileprivate let jobVariants : [ Job . Variant ]
private let onQueueDrained : ( ( ) -> ( ) ) ?
private lazy var internalQueue : DispatchQueue = {
let result : DispatchQueue = DispatchQueue (
label : self . queueContext ,
qos : self . qosClass ,
attributes : ( self . executionType = = . concurrent ? [ . concurrent ] : [ ] ) ,
autoreleaseFrequency : . inherit ,
target : nil
)
result . setSpecific ( key : queueKey , value : queueContext )
return result
} ( )
private var nextTrigger : Atomic < Trigger ? > = Atomic ( nil )
fileprivate var isRunning : Atomic < Bool > = Atomic ( false )
private var queue : Atomic < [ Job ] > = Atomic ( [ ] )
private var jobsCurrentlyRunning : Atomic < Set < Int64 > > = Atomic ( [ ] )
private var detailsForCurrentlyRunningJobs : Atomic < [ Int64 : Data ? ] > = Atomic ( [ : ] )
fileprivate var hasPendingJobs : Bool { ! queue . wrappedValue . isEmpty }
// MARK: - I n i t i a l i z a t i o n
init (
type : QueueType ,
executionType : ExecutionType = . serial ,
qos : DispatchQoS ,
jobVariants : [ Job . Variant ] ,
onQueueDrained : ( ( ) -> ( ) ) ? = nil
) {
self . type = type
self . executionType = executionType
self . queueContext = " JobQueue- \( type . name ) "
self . qosClass = qos
self . jobVariants = jobVariants
self . onQueueDrained = onQueueDrained
}
// MARK: - E x e c u t i o n
fileprivate func add ( _ job : Job , canStartJob : Bool = true ) {
// C h e c k i f t h e j o b s h o u l d b e a d d e d t o t h e q u e u e
guard
canStartJob ,
job . behaviour != . runOnceNextLaunch ,
job . nextRunTimestamp <= Date ( ) . timeIntervalSince1970
else { return }
queue . mutate { $0 . append ( job ) }
}
// / U p s e r t a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
fileprivate func upsert ( _ job : Job , canStartJob : Bool = true ) {
guard let jobId : Int64 = job . id else {
add ( job , canStartJob : canStartJob )
return
}
// L o c k t h e q u e u e w h i l e c h e c k i n g t h e i n d e x a n d i n s e r t i n g t o e n s u r e w e d o n ' t r u n i n t o
// a n y m u l t i - t h r e a d i n g s h e n a n i g a n s
//
// N o t e : c u r r e n t l y r u n n i n g j o b s a r e r e m o v e d f r o m t h e q u e u e s o w e d o n ' t n e e d t o c h e c k
// t h e ' j o b s C u r r e n t l y R u n n i n g ' s e t
var didUpdateExistingJob : Bool = false
queue . mutate { queue in
if let jobIndex : Array < Job > . Index = queue . firstIndex ( where : { $0 . id = = jobId } ) {
queue [ jobIndex ] = job
didUpdateExistingJob = true
}
}
// I f w e d i d n ' t u p d a t e a n e x i s t i n g j o b t h e n w e n e e d t o a d d i t t o t h e q u e u e
guard ! didUpdateExistingJob else { return }
add ( job , canStartJob : canStartJob )
}
fileprivate func insert ( _ job : Job , before otherJob : Job ) {
// I n s e r t t h e j o b b e f o r e t h e c u r r e n t j o b ( r e - a d d i n g t h e c u r r e n t j o b t o
// t h e s t a r t o f t h e q u e u e i f i t ' s n o t i n t h e r e ) - t h i s w i l l m e a n t h e n e w
// j o b w i l l r u n a n d t h e n t h e o t h e r J o b w i l l r u n ( o r r u n a g a i n ) o n c e i t ' s
// d o n e
queue . mutate {
guard let otherJobIndex : Int = $0 . firstIndex ( of : otherJob ) else {
$0 . insert ( contentsOf : [ job , otherJob ] , at : 0 )
return
}
$0 . insert ( job , at : otherJobIndex )
}
}
fileprivate func appDidFinishLaunching ( with jobs : [ Job ] , canStart : Bool ) {
queue . mutate { $0 . append ( contentsOf : jobs ) }
// S t a r t t h e j o b r u n n e r i f n e e d e d
if canStart && ! isRunning . wrappedValue {
start ( )
}
}
fileprivate func appDidBecomeActive ( with jobs : [ Job ] , canStart : Bool ) {
queue . mutate { queue in
// A v o i d r e - a d d i n g j o b s t o t h e q u e u e t h a t a r e a l r e a d y i n i t ( t h i s c a n
// h a p p e n i f t h e u s e r s e n d s t h e a p p t o t h e b a c k g r o u n d b e f o r e t h e ' o n A c t i v e '
// j o b s a n d t h e n b r i n g s i t b a c k t o t h e f o r e g r o u n d )
let jobsNotAlreadyInQueue : [ Job ] = jobs
. filter { job in ! queue . contains ( where : { $0 . id = = job . id } ) }
queue . append ( contentsOf : jobsNotAlreadyInQueue )
}
// S t a r t t h e j o b r u n n e r i f n e e d e d
if canStart && ! isRunning . wrappedValue {
start ( )
}
}
fileprivate func isCurrentlyRunning ( _ jobId : Int64 ) -> Bool {
return jobsCurrentlyRunning . wrappedValue . contains ( jobId )
}
fileprivate func detailsForAllCurrentlyRunningJobs ( ) -> [ Int64 : Data ? ] {
return detailsForCurrentlyRunningJobs . wrappedValue
}
fileprivate func hasPendingOrRunningJob ( with detailsData : Data ? ) -> Bool {
let pendingJobs : [ Job ] = queue . wrappedValue
return pendingJobs . contains { job in job . details = = detailsData }
}
// MARK: - J o b R u n n i n g
fileprivate func start ( force : Bool = false ) {
// W e o n l y w a n t t h e J o b R u n n e r t o r u n i n t h e m a i n a p p
guard CurrentAppContext ( ) . isMainApp else { return }
guard force || ! isRunning . wrappedValue else { return }
// T h e J o b R u n n e r r u n s s y n c h r o n o u s l y w e n e e d t o e n s u r e t h i s d o e s n ' t s t a r t
// o n t h e m a i n t h r e a d ( i f i t i s o n t h e m a i n t h r e a d t h e n s w a p t o a d i f f e r e n t t h r e a d )
guard DispatchQueue . getSpecific ( key : queueKey ) = = queueContext else {
internalQueue . async { [ weak self ] in
self ? . start ( )
}
return
}
// F l a g t h e J o b R u n n e r a s r u n n i n g ( t o p r e v e n t s o m e t h i n g e l s e f r o m t r y i n g t o s t a r t i t
// a n d m e s s i n g w i t h t h e e x e c u t i o n b e h a v i o u r )
var wasAlreadyRunning : Bool = false
isRunning . mutate { isRunning in
wasAlreadyRunning = isRunning
isRunning = true
}
// G e t a n y p e n d i n g j o b s
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let jobsAlreadyInQueue : Set < Int64 > = queue . wrappedValue . compactMap { $0 . id } . asSet ( )
let jobsToRun : [ Job ] = Storage . shared . read { db in
try Job . filterPendingJobs ( variants : jobVariants )
. filter ( ! jobIdsAlreadyRunning . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y r u n n i n g
. filter ( ! jobsAlreadyInQueue . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y i n t h e q u e u e
. fetchAll ( db )
}
. defaulting ( to : [ ] )
// D e t e r m i n e t h e n u m b e r o f j o b s t o r u n
var jobCount : Int = 0
queue . mutate { queue in
queue . append ( contentsOf : jobsToRun )
jobCount = queue . count
}
// I f t h e r e a r e n o p e n d i n g j o b s a n d n o t h i n g i n t h e q u e u e t h e n s c h e d u l e t h e J o b R u n n e r
// t o s t a r t a g a i n w h e n t h e n e x t s c h e d u l e d j o b s h o u l d s t a r t
guard jobCount > 0 else {
if jobIdsAlreadyRunning . isEmpty {
isRunning . mutate { $0 = false }
scheduleNextSoonestJob ( )
}
return
}
// R u n t h e f i r s t j o b i n t h e q u e u e
if ! wasAlreadyRunning {
SNLog ( " [JobRunner] Starting \( queueContext ) with ( \( jobCount ) job \( jobCount != 1 ? " s " : " " ) ) " )
}
runNextJob ( )
}
private func runNextJob ( ) {
// E n s u r e t h i s i s r u n n i n g o n t h e c o r r e c t q u e u e
guard DispatchQueue . getSpecific ( key : queueKey ) = = queueContext else {
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
}
return
}
guard let ( nextJob , numJobsRemaining ) : ( Job , Int ) = queue . mutate ( { queue in queue . popFirst ( ) . map { ( $0 , queue . count ) } } ) else {
// I f i t ' s a s e r i a l q u e u e , o r t h e r e a r e n o m o r e j o b s r u n n i n g t h e n u p d a t e t h e ' i s R u n n i n g ' f l a g
if executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty {
isRunning . mutate { $0 = false }
}
// A l w a y s a t t e m p t t o s c h e d u l e t h e n e x t s o o n e s t j o b ( o t h e r w i s e i f e n o u g h j o b s g e t s t a r t e d i n r a p i d
// s u c c e s s i o n t h e n p e n d i n g / f a i l e d j o b s i n t h e d a t a b a s e m a y n e v e r g e t r e - s t a r t e d i n a c o n c u r r e n t q u e u e )
scheduleNextSoonestJob ( )
return
}
guard let jobExecutor : JobExecutor . Type = JobRunner . executorMap . wrappedValue [ nextJob . variant ] else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing executor " )
handleJobFailed ( nextJob , error : JobRunnerError . executorMissing , permanentFailure : true )
return
}
guard ! jobExecutor . requiresThreadId || nextJob . threadId != nil else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing required threadId " )
handleJobFailed ( nextJob , error : JobRunnerError . requiredThreadIdMissing , permanentFailure : true )
return
}
guard ! jobExecutor . requiresInteractionId || nextJob . interactionId != nil else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing required interactionId " )
handleJobFailed ( nextJob , error : JobRunnerError . requiredInteractionIdMissing , permanentFailure : true )
return
}
// I f t h e ' n e x t R u n T i m e s t a m p ' f o r t h e j o b i s i n t h e f u t u r e t h e n d o n ' t r u n i t y e t
guard nextJob . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 else {
handleJobDeferred ( nextJob )
return
}
// C h e c k i f t h e n e x t j o b h a s a n y d e p e n d e n c i e s
let dependencyInfo : ( expectedCount : Int , jobs : [ Job ] ) = Storage . shared . read { db in
let numExpectedDependencies : Int = try JobDependencies
. filter ( JobDependencies . Columns . jobId = = nextJob . id )
. fetchCount ( db )
let jobDependencies : [ Job ] = try nextJob . dependencies . fetchAll ( db )
return ( numExpectedDependencies , jobDependencies )
}
. defaulting ( to : ( 0 , [ ] ) )
guard dependencyInfo . jobs . count = = dependencyInfo . expectedCount else {
SNLog ( " [JobRunner] \( queueContext ) found job with missing dependencies, removing the job " )
handleJobFailed ( nextJob , error : JobRunnerError . missingDependencies , permanentFailure : true )
return
}
guard dependencyInfo . jobs . isEmpty else {
SNLog ( " [JobRunner] \( queueContext ) found job with \( dependencyInfo . jobs . count ) dependencies, running those first " )
let jobDependencyIds : [ Int64 ] = dependencyInfo . jobs
. compactMap { $0 . id }
let jobIdsNotInQueue : Set < Int64 > = jobDependencyIds
. asSet ( )
. subtracting ( queue . wrappedValue . compactMap { $0 . id } )
// I f t h e r e a r e d e p e n d e n c i e s w h i c h a r e n ' t i n t h e q u e u e w e s h o u l d j u s t a p p e n d t h e m
guard ! jobIdsNotInQueue . isEmpty else {
queue . mutate { queue in
queue . append (
contentsOf : dependencyInfo . jobs
. filter { jobIdsNotInQueue . contains ( $0 . id ? ? - 1 ) }
)
queue . append ( nextJob )
}
handleJobDeferred ( nextJob )
return
}
// O t h e r w i s e r e - a d d t h e c u r r e n t j o b a f t e r i t ' s d e p e n d e n c i e s ( i f t h i s i s n ' t a c o n c u r r e n t
// q u e u e - d o n ' t w a n t t o i m m e d i a t e l y t r y t o s t a r t t h e j o b a g a i n o n l y f o r i t t o e n d u p b a c k
// i n h e r e )
if executionType != . concurrent {
queue . mutate { queue in
guard let lastDependencyIndex : Int = queue . lastIndex ( where : { jobDependencyIds . contains ( $0 . id ? ? - 1 ) } ) else {
queue . append ( nextJob )
return
}
queue . insert ( nextJob , at : lastDependencyIndex + 1 )
}
}
handleJobDeferred ( nextJob )
return
}
// U p d a t e t h e s t a t e t o i n d i c a t e i t ' s r u n n i n g
//
// N o t e : W e n e e d t o s t o r e ' n u m J o b s R e m a i n i n g ' i n i t ' s o w n v a r i a b l e b e c a u s e
// t h e ' S N L o g ' s e e m s t o d i s p a t c h t o i t ' s o w n q u e u e w h i c h e n d s u p g e t t i n g
// b l o c k e d b y t h e J o b R u n n e r ' s q u e u e b e c u a s e ' j o b Q u e u e ' i s A t o m i c
var numJobsRunning : Int = 0
nextTrigger . mutate { trigger in
trigger ? . invalidate ( ) // N e e d t o i n v a l i d a t e t o p r e v e n t a m e m o r y l e a k
trigger = nil
}
isRunning . mutate { $0 = true }
jobsCurrentlyRunning . mutate { jobsCurrentlyRunning in
jobsCurrentlyRunning = jobsCurrentlyRunning . inserting ( nextJob . id )
numJobsRunning = jobsCurrentlyRunning . count
}
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . setting ( nextJob . id , nextJob . details ) }
SNLog ( " [JobRunner] \( queueContext ) started job ( \( executionType = = . concurrent ? " \( numJobsRunning ) currently running, " : " " ) \( numJobsRemaining ) remaining) " )
jobExecutor . run (
nextJob ,
queue : internalQueue ,
success : handleJobSucceeded ,
failure : handleJobFailed ,
deferred : handleJobDeferred
)
// I f t h i s q u e u e e x e c u t e s c o n c u r r e n t l y a n d t h e r e a r e s t i l l j o b s r e m a i n i n g t h e n i m m e d i a t e l y a t t e m p t
// t o s t a r t t h e n e x t j o b
if executionType = = . concurrent && numJobsRemaining > 0 {
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
}
}
}
private func scheduleNextSoonestJob ( ) {
let nextJobTimestamp : TimeInterval ? = Storage . shared . read { db in
try Job . filterPendingJobs ( variants : jobVariants , excludeFutureJobs : false )
. select ( . nextRunTimestamp )
. asRequest ( of : TimeInterval . self )
. fetchOne ( db )
}
// I f t h e r e a r e n o r e m a i n i n g j o b s t h e t r i g g e r t h e ' o n Q u e u e D r a i n e d ' c a l l b a c k a n d s t o p
guard let nextJobTimestamp : TimeInterval = nextJobTimestamp else {
if executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty {
self . onQueueDrained ? ( )
}
return
}
// I f t h e n e x t j o b i s n ' t s c h e d u l e d i n t h e f u t u r e t h e n j u s t r e s t a r t t h e J o b R u n n e r i m m e d i a t e l y
let secondsUntilNextJob : TimeInterval = ( nextJobTimestamp - Date ( ) . timeIntervalSince1970 )
guard secondsUntilNextJob > 0 else {
// O n l y l o g t h a t t h e q u e u e i s g e t t i n g r e s t a r t e d i f t h i s q u e u e h a d a c t u a l l y b e e n a b o u t t o s t o p
if executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty {
let timingString : String = ( nextJobTimestamp = = 0 ?
" that should be in the queue " :
" scheduled \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) ago "
)
SNLog ( " [JobRunner] Restarting \( queueContext ) immediately for job \( timingString ) " )
}
// T r i g g e r t h e ' s t a r t ' f u n c t i o n t o l o a d i n a n y p e n d i n g j o b s t h a t a r e n ' t a l r e a d y i n t h e
// q u e u e ( f o r c o n c u r r e n t q u e u e s w e w a n t t o f o r c e t h e m t o l o a d i n p e n d i n g j o b s a n d a d d
// t h e m t o t h e q u e u e r e g a r d l e s s o f w h e t h e r t h e q u e u e i s a l r e a d y r u n n i n g )
internalQueue . async { [ weak self ] in
self ? . start ( force : ( self ? . executionType = = . concurrent ) )
}
return
}
// O n l y s c h e d u l e a t r i g g e r i f t h i s q u e u e h a s a c t u a l l y c o m p l e t e d
guard executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty else { return }
// S e t u p a t r i g g e r
SNLog ( " [JobRunner] Stopping \( queueContext ) until next job in \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) " )
nextTrigger . mutate { trigger in
trigger ? . invalidate ( ) // N e e d t o i n v a l i d a t e t h e o l d t r i g g e r t o p r e v e n t a m e m o r y l e a k
trigger = Trigger . create ( queue : self , timestamp : nextJobTimestamp )
}
}
// MARK: - H a n d l i n g R e s u l t s
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b s u c c e e d s
private func handleJobSucceeded ( _ job : Job , shouldStop : Bool ) {
switch job . behaviour {
case . runOnce , . runOnceNextLaunch :
Storage . shared . write { db in
// F i r s t r e m o v e a n y J o b D e p e n d e n c i e s r e q u i r i n g t h i s j o b t o b e c o m p l e t e d ( i f
// w e d o n ' t t h e n t h e d e p e n d a n t j o b s w i l l a u t o m a t i c a l l y b e d e l e t e d )
_ = try JobDependencies
. filter ( JobDependencies . Columns . dependantId = = job . id )
. deleteAll ( db )
_ = try job . delete ( db )
}
case . recurring where shouldStop = = true :
Storage . shared . write { db in
// F i r s t r e m o v e a n y J o b D e p e n d e n c i e s r e q u i r i n g t h i s j o b t o b e c o m p l e t e d ( i f
// w e d o n ' t t h e n t h e d e p e n d a n t j o b s w i l l a u t o m a t i c a l l y b e d e l e t e d )
_ = try JobDependencies
. filter ( JobDependencies . Columns . dependantId = = job . id )
. deleteAll ( db )
_ = try job . delete ( db )
}
// F o r ` r e c u r r i n g ` j o b s w h i c h h a v e a l r e a d y r u n , t h e y s h o u l d a u t o m a t i c a l l y r u n a g a i n
// b u t w e w a n t a t l e a s t 1 s e c o n d t o p a s s b e f o r e d o i n g s o - t h e j o b i t s e l f s h o u l d
// r e a l l y u p d a t e i t ' s o w n ' n e x t R u n T i m e s t a m p ' ( t h i s i s j u s t a s a f e t y n e t )
case . recurring where job . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 :
Storage . shared . write { db in
_ = try job
. with ( nextRunTimestamp : ( Date ( ) . timeIntervalSince1970 + 1 ) )
. saved ( db )
}
default : break
}
// F o r c o n c u r r e n t q u e u e s r e t r i e v e a n y ' d e p e n d a n t ' j o b s a n d r e - a d d t h e m h e r e ( i f t h e y h a v e o t h e r
// d e p e n d e n c i e s t h e y w i l l b e r e m o v e d a g a i n w h e n t h e y t r y t o e x e c u t e )
if executionType = = . concurrent {
let dependantJobs : [ Job ] = Storage . shared
. read { db in try job . dependantJobs . fetchAll ( db ) }
. defaulting ( to : [ ] )
let dependantJobIds : [ Int64 ] = dependantJobs
. compactMap { $0 . id }
let jobIdsNotInQueue : Set < Int64 > = dependantJobIds
. asSet ( )
. subtracting ( queue . wrappedValue . compactMap { $0 . id } )
// I f t h e r e a r e d e p e n d a n t j o b s w h i c h a r e n ' t i n t h e q u e u e w e s h o u l d j u s t a p p e n d t h e m
if ! jobIdsNotInQueue . isEmpty {
queue . mutate { queue in
queue . append (
contentsOf : dependantJobs
. filter { jobIdsNotInQueue . contains ( $0 . id ? ? - 1 ) }
)
}
}
}
// T h e j o b i s r e m o v e d f r o m t h e q u e u e b e f o r e i t r u n s s o a l l w e n e e d t o t o i s r e m o v e i t
// f r o m t h e ' c u r r e n t l y R u n n i n g ' s e t a n d s t a r t t h e n e x t o n e
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
}
}
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b f a i l s , i f i t ' s w a s n ' t a p e r m a n e n t f a i l u r e t h e n t h e ' f a i l u r e C o u n t ' f o r t h e j o b w i l l b e i n c r e m e n t e d a n d i t ' l l
// / b e r e - r u n a f t e r a r e t r y i n t e r v a l h a s p a s s e d
private func handleJobFailed ( _ job : Job , error : Error ? , permanentFailure : Bool ) {
guard Storage . shared . read ( { db in try Job . exists ( db , id : job . id ? ? - 1 ) } ) = = true else {
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job canceled " )
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
}
return
}
// I f t h i s i s t h e b l o c k i n g q u e u e a n d a " b l o c k i n g " j o b f a i l e d t h e n r e r u n i t i m m e d i a t e l y
if self . type = = . blocking && job . shouldBlock {
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job failed; retrying immediately " )
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
queue . mutate { $0 . insert ( job , at : 0 ) }
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
}
return
}
// G e t t h e m a x f a i l u r e c o u n t f o r t h e j o b ( a v a l u e o f ' - 1 ' m e a n s i t w i l l r e t r y i n d e f i n i t e l y )
let maxFailureCount : Int = ( JobRunner . executorMap . wrappedValue [ job . variant ] ? . maxFailureCount ? ? 0 )
let nextRunTimestamp : TimeInterval = ( Date ( ) . timeIntervalSince1970 + JobRunner . getRetryInterval ( for : job ) )
Storage . shared . write { db in
guard
! permanentFailure && (
maxFailureCount < 0 ||
job . failureCount + 1 < maxFailureCount
)
else {
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) failed permanently \( maxFailureCount >= 0 ? " ; too many retries " : " " ) " )
let dependantJobIds : [ Int64 ] = try job . dependantJobs
. select ( . id )
. asRequest ( of : Int64 . self )
. fetchAll ( db )
// I f t h e j o b p e r m a n e n t l y f a i l e d o r w e h a v e p e r f o r m e d a l l o f o u r r e t r y a t t e m p t s
// t h e n d e l e t e t h e j o b a n d a l l o f i t ' s d e p e n d a n t j o b s ( i t ' l l p r o b a b l y n e v e r s u c c e e d )
_ = try job . dependantJobs
. deleteAll ( db )
_ = try job . delete ( db )
// R e m o v e t h e d e p e n d a n t j o b s f r o m t h e q u e u e ( s o w e d o n ' t t r y t o r u n a d e l e t e d j o b )
if ! dependantJobIds . isEmpty {
queue . mutate { queue in
queue = queue . filter { ! dependantJobIds . contains ( $0 . id ? ? - 1 ) }
}
}
return
}
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job failed; scheduling retry (failure count is \( job . failureCount + 1 ) ) " )
_ = try job
. with (
failureCount : ( job . failureCount + 1 ) ,
nextRunTimestamp : nextRunTimestamp
)
. saved ( db )
// U p d a t e t h e f a i l u r e C o u n t a n d n e x t R u n T i m e s t a m p o n d e p e n d a n t j o b s a s w e l l ( u p d a t e t h e
// ' n e x t R u n T i m e s t a m p ' v a l u e t o b e 1 m s l a t e r s o w h e n t h e q u e u e g e t s r e g e n e r a t e d i t ' l l
// c o m e a f t e r t h e d e p e n d e n c y )
try job . dependantJobs
. updateAll (
db ,
Job . Columns . failureCount . set ( to : job . failureCount ) ,
Job . Columns . nextRunTimestamp . set ( to : ( nextRunTimestamp + ( 1 / 1000 ) ) )
)
let dependantJobIds : [ Int64 ] = try job . dependantJobs
. select ( . id )
. asRequest ( of : Int64 . self )
. fetchAll ( db )
// R e m o v e t h e d e p e n d a n t j o b s f r o m t h e q u e u e ( s o w e d o n ' t g e t s t u c k i n a l o o p o f t r y i n g
// t o r u n d e p e n d e c i e s i n d e f i n i t e l y )
if ! dependantJobIds . isEmpty {
queue . mutate { queue in
queue = queue . filter { ! dependantJobIds . contains ( $0 . id ? ? - 1 ) }
}
}
}
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
}
}
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b n e i t h e r s u c c e e d s o r f a i l s ( t h i s s h o u l d o n l y o c c u r i f t h e j o b h a s s p e c i f i c l o g i c t h a t m a k e s i t d e p e n d a n t
// / o n o t h e r j o b s , a n d i t s h o u l d a u t o m a t i c a l l y m a n a g e t h o s e d e p e n d e n c i e s )
private func handleJobDeferred ( _ job : Job ) {
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
}
}
}