@ -42,6 +42,12 @@ public final class JobRunner {
case notFound
}
public struct JobInfo {
public let threadId : String ?
public let interactionId : Int64 ?
public let detailsData : Data ?
}
private static let blockingQueue : Atomic < JobQueue ? > = Atomic (
JobQueue (
type : . blocking ,
@ -80,7 +86,8 @@ public final class JobRunner {
executionType : . serial ,
qos : . default ,
jobVariants : [
jobVariants . remove ( . messageReceive )
jobVariants . remove ( . messageReceive ) ,
jobVariants . remove ( . configMessageReceive )
] . compactMap { $0 }
)
let attachmentDownloadQueue : JobQueue = JobQueue (
@ -127,26 +134,30 @@ public final class JobRunner {
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
public static func add ( _ db : Database , job : Job ? , canStartJob : Bool = true ) {
@ discardableResult public static func add ( _ db : Database , job : Job ? , canStartJob : Bool = true ) -> Job ? {
// S t o r e t h e j o b i n t o t h e d a t a b a s e ( g e t t i n g a n i d f o r i t )
guard let updatedJob : Job = try ? job ? . inserted ( db ) else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return
return nil
}
guard ! canStartJob || updatedJob . id != nil else {
SNLog ( " [JobRunner] Not starting \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job due to missing id " )
return
return nil
}
queues . wrappedValue [ updatedJob . variant ] ? . add ( db , job : updatedJob , canStartJob : canStartJob )
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionNestedOnce ( dedupeId : " JobRunner-Start: \( updatedJob . variant ) " ) { _ in
// W a i t u n t i l t h e t r a n s a c t i o n h a s b e e n c o m p l e t e d b e f o r e u p d a t i n g t h e q u e u e ( t o e n s u r e a n y t h i n g
// c r e a t e d d u r i n g t h e t r a n s a c t i o n h a s b e e n s a v e d t o t h e d a t a b a s e b e f o r e a n y c o r r e s p o n d i n g j o b s
// a r e r u n )
db . afterNextTransactionNested { _ in
queues . wrappedValue [ updatedJob . variant ] ? . add ( updatedJob , canStartJob : canStartJob )
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return }
queues . wrappedValue [ updatedJob . variant ] ? . start ( )
}
return updatedJob
}
// / U p s e r t a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
@ -161,17 +172,22 @@ public final class JobRunner {
return
}
queues . wrappedValue [ job . variant ] ? . upsert ( db , job : job , canStartJob : canStartJob )
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionNestedOnce ( dedupeId : " JobRunner-Start: \( job . variant ) " ) { _ in
// W a i t u n t i l t h e t r a n s a c t i o n h a s b e e n c o m p l e t e d b e f o r e u p d a t i n g t h e q u e u e ( t o e n s u r e a n y t h i n g
// c r e a t e d d u r i n g t h e t r a n s a c t i o n h a s b e e n s a v e d t o t h e d a t a b a s e b e f o r e a n y c o r r e s p o n d i n g j o b s
// a r e r u n )
db . afterNextTransactionNested { _ in
queues . wrappedValue [ job . variant ] ? . upsert ( job , canStartJob : canStartJob )
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return }
queues . wrappedValue [ job . variant ] ? . start ( )
}
}
// / I n s e r t a j o b b e f o r e a n o t h e r j o b i n t h e q u e u e
// /
// / * * N o t e : * * T h i s f u n c t i o n a s s u m e s t h e r e l e v a n t j o b q u e u e i s a l r e a d y r u n n i n g a n d a s s u c h * * w i l l n o t * * s t a r t t h e q u e u e i f i t i s n ' t r u n n i n g
@ discardableResult public static func insert ( _ db : Database , job : Job ? , before otherJob : Job ) -> ( Int64 , Job ) ? {
switch job ? . behaviour {
case . recurringOnActive , . recurringOnLaunch , . runOnceNextLaunch :
@ -191,7 +207,12 @@ public final class JobRunner {
return nil
}
queues . wrappedValue [ updatedJob . variant ] ? . insert ( updatedJob , before : otherJob )
// W a i t u n t i l t h e t r a n s a c t i o n h a s b e e n c o m p l e t e d b e f o r e u p d a t i n g t h e q u e u e ( t o e n s u r e a n y t h i n g
// c r e a t e d d u r i n g t h e t r a n s a c t i o n h a s b e e n s a v e d t o t h e d a t a b a s e b e f o r e a n y c o r r e s p o n d i n g j o b s
// a r e r u n )
db . afterNextTransactionNested { _ in
queues . wrappedValue [ updatedJob . variant ] ? . insert ( updatedJob , before : otherJob )
}
return ( jobId , updatedJob )
}
@ -366,8 +387,8 @@ public final class JobRunner {
return ( queues . wrappedValue [ job . variant ] ? . isCurrentlyRunning ( jobId ) = = true )
}
public static func defails ForCurrentlyRunningJobs( of variant : Job . Variant ) -> [ Int64 : Data? ] {
return ( queues . wrappedValue [ variant ] ? . details ForAllCurrentlyRunningJobs( ) )
public static func info ForCurrentlyRunningJobs( of variant : Job . Variant ) -> [ Int64 : JobInfo ] {
return ( queues . wrappedValue [ variant ] ? . info ForAllCurrentlyRunningJobs( ) )
. defaulting ( to : [ : ] )
}
@ -380,11 +401,24 @@ public final class JobRunner {
queue . afterCurrentlyRunningJob ( jobId , callback : callback )
}
public static func hasPendingOrRunningJob < T : Encodable > ( with variant : Job . Variant , details : T ) -> Bool {
public static func hasPendingOrRunningJob < T : Encodable > (
with variant : Job . Variant ,
threadId : String ? = nil ,
interactionId : Int64 ? = nil ,
details : T ? = nil
) -> Bool {
guard let targetQueue : JobQueue = queues . wrappedValue [ variant ] else { return false }
guard let detailsData : Data = try ? JSONEncoder ( ) . encode ( details ) else { return false }
return targetQueue . hasPendingOrRunningJob ( with : detailsData )
// E n s u r e w e c a n e n c o d e t h e d e t a i l s ( i f p r o v i d e d )
let detailsData : Data ? = details . map { try ? JSONEncoder ( ) . encode ( $0 ) }
guard details = = nil || detailsData != nil else { return false }
return targetQueue . hasPendingOrRunningJobWith (
threadId : threadId ,
interactionId : interactionId ,
detailsData : detailsData
)
}
public static func removePendingJob ( _ job : Job ? ) {
@ -498,9 +532,9 @@ private final class JobQueue {
private var nextTrigger : Atomic < Trigger ? > = Atomic ( nil )
fileprivate var isRunning : Atomic < Bool > = Atomic ( false )
private var queue : Atomic < [ Job ] > = Atomic ( [ ] )
private var jobsCurrentlyRunning : Atomic < Set < Int64 > > = Atomic ( [ ] )
private var jobCallbacks : Atomic < [ Int64 : [ ( JobRunner . JobResult ) -> ( ) ] ] > = Atomic ( [ : ] )
private var detailsForCurrentlyRunningJobs : Atomic < [ Int64 : Data ? ] > = Atomic ( [ : ] )
private var currentlyRunningJobIds : Atomic < Set < Int64 > > = Atomic ( [ ] )
private var currentlyRunningJobInfo : Atomic < [ Int64 : JobRunner . JobInfo ] > = Atomic ( [ : ] )
private var deferLoopTracker : Atomic < [ Int64 : ( count : Int , times : [ TimeInterval ] ) ] > = Atomic ( [ : ] )
fileprivate var hasPendingJobs : Bool { ! queue . wrappedValue . isEmpty }
@ -524,7 +558,7 @@ private final class JobQueue {
// MARK: - E x e c u t i o n
fileprivate func add ( _ db: Database , job: Job , canStartJob : Bool = true ) {
fileprivate func add ( _ job: Job , canStartJob : Bool = true ) {
// C h e c k i f t h e j o b s h o u l d b e a d d e d t o t h e q u e u e
guard
canStartJob ,
@ -541,11 +575,7 @@ private final class JobQueue {
// I f t h i s i s a c o n c u r r e n t q u e u e t h e n w e s h o u l d i m m e d i a t e l y s t a r t t h e n e x t j o b
guard executionType = = . concurrent else { return }
// E n s u r e t h a t t h e d a t a b a s e c o m m i t h a s c o m p l e t e d a n d t h e n t r i g g e r t h e n e x t j o b t o r u n ( n e e d
// t o e n s u r e a n y i n t e r a c t i o n s h a v e b e e n c o r r e c t l y i n s e r t e d f i r s t )
db . afterNextTransactionNestedOnce ( dedupeId : " JobRunner-Add: \( job . variant ) " ) { [ weak self ] _ in
self ? . runNextJob ( )
}
runNextJob ( )
}
// / U p s e r t a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
@ -553,7 +583,7 @@ private final class JobQueue {
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
fileprivate func upsert ( _ db: Database , job: Job , canStartJob : Bool = true ) {
fileprivate func upsert ( _ job: Job , canStartJob : Bool = true ) {
guard let jobId : Int64 = job . id else {
SNLog ( " [JobRunner] Prevented attempt to upsert \( job . variant ) job without id to queue " )
return
@ -576,7 +606,7 @@ private final class JobQueue {
// I f w e d i d n ' t u p d a t e a n e x i s t i n g j o b t h e n w e n e e d t o a d d i t t o t h e q u e u e
guard ! didUpdateExistingJob else { return }
add ( db, job : job, canStartJob : canStartJob )
add ( job, canStartJob : canStartJob )
}
fileprivate func insert ( _ job : Job , before otherJob : Job ) {
@ -609,7 +639,7 @@ private final class JobQueue {
}
fileprivate func appDidBecomeActive ( with jobs : [ Job ] , canStart : Bool ) {
let currentlyRunningJobIds : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let currentlyRunningJobIds : Set < Int64 > = currentlyRunningJobIds . wrappedValue
queue . mutate { queue in
// A v o i d r e - a d d i n g j o b s t o t h e q u e u e t h a t a r e a l r e a d y i n i t ( t h i s c a n
@ -631,11 +661,11 @@ private final class JobQueue {
}
fileprivate func isCurrentlyRunning ( _ jobId : Int64 ) -> Bool {
return jobsCurrentlyRunning . wrappedValue . contains ( jobId )
return currentlyRunningJobIds . wrappedValue . contains ( jobId )
}
fileprivate func detailsForAllCurrentlyRunningJobs( ) -> [ Int64 : Data ? ] {
return detailsForCurrentlyRunningJobs . wrappedValue
fileprivate func infoForAllCurrentlyRunningJobs( ) -> [ Int64 : JobRunner . JobInfo ] {
return currentlyRunningJobInfo . wrappedValue
}
fileprivate func afterCurrentlyRunningJob ( _ jobId : Int64 , callback : @ escaping ( JobRunner . JobResult ) -> ( ) ) {
@ -649,14 +679,65 @@ private final class JobQueue {
}
}
fileprivate func hasPendingOrRunningJob ( with detailsData : Data ? ) -> Bool {
guard let detailsData : Data = detailsData else { return false }
fileprivate func hasPendingOrRunningJobWith (
threadId : String ? = nil ,
interactionId : Int64 ? = nil ,
detailsData : Data ? = nil
) -> Bool {
let pendingJobs : [ Job ] = queue . wrappedValue
let currentlyRunningJobInfo : [ Int64 : JobRunner . JobInfo ] = currentlyRunningJobInfo . wrappedValue
var possibleJobIds : Set < Int64 > = Set ( currentlyRunningJobInfo . keys )
. inserting ( contentsOf : pendingJobs . compactMap { $0 . id } . asSet ( ) )
// R e m o v e a n y w h i c h d o n ' t h a v e t h e m a t c h i n g t h r e a d I d ( i f p r o v i d e d )
if let targetThreadId : String = threadId {
let pendingJobIdsWithWrongThreadId : Set < Int64 > = pendingJobs
. filter { $0 . threadId != targetThreadId }
. compactMap { $0 . id }
. asSet ( )
let runningJobIdsWithWrongThreadId : Set < Int64 > = currentlyRunningJobInfo
. filter { _ , info -> Bool in info . threadId != targetThreadId }
. map { key , _ in key }
. asSet ( )
possibleJobIds = possibleJobIds
. subtracting ( pendingJobIdsWithWrongThreadId )
. subtracting ( runningJobIdsWithWrongThreadId )
}
// R e m o v e a n y w h i c h d o n ' t h a v e t h e m a t c h i n g i n t e r a c t i o n I d ( i f p r o v i d e d )
if let targetInteractionId : Int64 = interactionId {
let pendingJobIdsWithWrongInteractionId : Set < Int64 > = pendingJobs
. filter { $0 . interactionId != targetInteractionId }
. compactMap { $0 . id }
. asSet ( )
let runningJobIdsWithWrongInteractionId : Set < Int64 > = currentlyRunningJobInfo
. filter { _ , info -> Bool in info . interactionId != targetInteractionId }
. map { key , _ in key }
. asSet ( )
possibleJobIds = possibleJobIds
. subtracting ( pendingJobIdsWithWrongInteractionId )
. subtracting ( runningJobIdsWithWrongInteractionId )
}
// R e m o v e a n y w h i c h d o n ' t h a v e t h e m a t c h i n g d e t a i l s ( i f p r o v i d e d )
if let targetDetailsData : Data = detailsData {
let pendingJobIdsWithWrongDetailsData : Set < Int64 > = pendingJobs
. filter { $0 . details != targetDetailsData }
. compactMap { $0 . id }
. asSet ( )
let runningJobIdsWithWrongDetailsData : Set < Int64 > = currentlyRunningJobInfo
. filter { _ , info -> Bool in info . detailsData != detailsData }
. map { key , _ in key }
. asSet ( )
possibleJobIds = possibleJobIds
. subtracting ( pendingJobIdsWithWrongDetailsData )
. subtracting ( runningJobIdsWithWrongDetailsData )
}
guard ! pendingJobs . contains ( where : { job in job . details = = detailsData } ) else { return true }
return detailsForCurrentlyRunningJobs . wrappedValue . values . contains ( detailsData )
return ! possibleJobIds . isEmpty
}
fileprivate func removePendingJob ( _ jobId : Int64 ) {
@ -695,7 +776,7 @@ private final class JobQueue {
}
// G e t a n y p e n d i n g j o b s
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let jobIdsAlreadyRunning : Set < Int64 > = currentlyRunningJobIds . wrappedValue
let jobsAlreadyInQueue : Set < Int64 > = queue . wrappedValue . compactMap { $0 . id } . asSet ( )
let jobsToRun : [ Job ] = Storage . shared . read { db in
try Job
@ -754,7 +835,7 @@ private final class JobQueue {
}
guard let ( nextJob , numJobsRemaining ) : ( Job , Int ) = queue . mutate ( { queue in queue . popFirst ( ) . map { ( $0 , queue . count ) } } ) else {
// I f i t ' s a s e r i a l q u e u e , o r t h e r e a r e n o m o r e j o b s r u n n i n g t h e n u p d a t e t h e ' i s R u n n i n g ' f l a g
if executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty {
if executionType != . concurrent || currentlyRunningJobIds . wrappedValue . isEmpty {
isRunning . mutate { $0 = false }
}
@ -816,7 +897,7 @@ private final class JobQueue {
// /
// / * * N o t e : * * W e d o n ' t a d d t h e c u r r e n t j o b b a c k t h e t h e q u e u e b e c a u s e i t s h o u l d o n l y b e r e - a d d e d i f i t ' s d e p e n d e n c i e s
// / a r e s u c c e s s f u l l y c o m p l e t e d
let currentlyRunningJobIds : [ Int64 ] = Array ( detailsForCurrentlyRunningJobs. wrappedValue . keys )
let currentlyRunningJobIds : [ Int64 ] = Array ( currentlyRunningJobIds. wrappedValue )
let dependencyJobsNotCurrentlyRunning : [ Job ] = dependencyInfo . jobs
. filter { job in ! currentlyRunningJobIds . contains ( job . id ? ? - 1 ) }
. sorted { lhs , rhs in ( lhs . id ? ? - 1 ) < ( rhs . id ? ? - 1 ) }
@ -840,11 +921,20 @@ private final class JobQueue {
trigger ? . invalidate ( ) // N e e d t o i n v a l i d a t e t o p r e v e n t a m e m o r y l e a k
trigger = nil
}
jobsCurrentlyRunning . mutate { jobsCurrentlyRunning in
jobsCurrentlyRunning = jobsCurrentlyRunning . inserting ( nextJob . id )
numJobsRunning = jobsCurrentlyRunning . count
currentlyRunningJobIds . mutate { currentlyRunningJobIds in
currentlyRunningJobIds = currentlyRunningJobIds . inserting ( nextJob . id )
numJobsRunning = currentlyRunningJobIds . count
}
currentlyRunningJobInfo . mutate { currentlyRunningJobInfo in
currentlyRunningJobInfo = currentlyRunningJobInfo . setting (
nextJob . id ,
JobRunner . JobInfo (
threadId : nextJob . threadId ,
interactionId : nextJob . interactionId ,
detailsData : nextJob . details
)
)
}
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . setting ( nextJob . id , nextJob . details ) }
SNLog ( " [JobRunner] \( queueContext ) started \( nextJob . variant ) job ( \( executionType = = . concurrent ? " \( numJobsRunning ) currently running, " : " " ) \( numJobsRemaining ) remaining) " )
// / A s i t t u r n s o u t C o m b i n e d o e s n ' t p l a t t o o n i c e l y w i t h c o n c u r r e n t D i s p a t c h Q u e u e s , i n C o m b i n e e v e n t s a r e d i s p a t c h e d a s y n c h r o n o u s l y t o
@ -883,7 +973,7 @@ private final class JobQueue {
}
private func scheduleNextSoonestJob ( ) {
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let jobIdsAlreadyRunning : Set < Int64 > = currentlyRunningJobIds . wrappedValue
let nextJobTimestamp : TimeInterval ? = Storage . shared . read { db in
try Job
. filterPendingJobs (
@ -900,7 +990,7 @@ private final class JobQueue {
// I f t h e r e a r e n o r e m a i n i n g j o b s o r t h e J o b R u n n e r i s n ' t a l l o w e d t o s t a r t a n y q u e u e s t h e n t r i g g e r
// t h e ' o n Q u e u e D r a i n e d ' c a l l b a c k a n d s t o p
guard let nextJobTimestamp : TimeInterval = nextJobTimestamp , JobRunner . canStartQueues . wrappedValue else {
if executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty {
if executionType != . concurrent || currentlyRunningJobIds . wrappedValue . isEmpty {
self . onQueueDrained ? ( )
}
return
@ -911,7 +1001,7 @@ private final class JobQueue {
guard secondsUntilNextJob > 0 else {
// O n l y l o g t h a t t h e q u e u e i s g e t t i n g r e s t a r t e d i f t h i s q u e u e h a d a c t u a l l y b e e n a b o u t t o s t o p
if executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty {
if executionType != . concurrent || currentlyRunningJobIds . wrappedValue . isEmpty {
let timingString : String = ( nextJobTimestamp = = 0 ?
" that should be in the queue " :
" scheduled \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) ago "
@ -929,7 +1019,7 @@ private final class JobQueue {
}
// O n l y s c h e d u l e a t r i g g e r i f t h i s q u e u e h a s a c t u a l l y c o m p l e t e d
guard executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty else { return }
guard executionType != . concurrent || currentlyRunningJobIds . wrappedValue . isEmpty else { return }
// S e t u p a t r i g g e r
SNLog ( " [JobRunner] Stopping \( queueContext ) until next job in \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) " )
@ -1018,7 +1108,7 @@ private final class JobQueue {
// / * * N o t e : * * I f a n y o f t h e s e ` d e p e n d a n t J o b s ` h a v e o t h e r d e p e n d e n c i e s t h e n w h e n t h e y a t t e m p t t o s t a r t t h e y w i l l b e
// / r e m o v e d f r o m t h e q u e u e , r e p l a c e d b y t h e i r d e p e n d e n c i e s
if ! dependantJobs . isEmpty {
let currentlyRunningJobIds : [ Int64 ] = Array ( detailsForCurrentlyRunningJobs. wrappedValue . keys )
let currentlyRunningJobIds : [ Int64 ] = Array ( currentlyRunningJobIds. wrappedValue )
let dependantJobsNotCurrentlyRunning : [ Job ] = dependantJobs
. filter { job in ! currentlyRunningJobIds . contains ( job . id ? ? - 1 ) }
. sorted { lhs , rhs in ( lhs . id ? ? - 1 ) < ( rhs . id ? ? - 1 ) }
@ -1200,8 +1290,8 @@ private final class JobQueue {
private func performCleanUp ( for job : Job , result : JobRunner . JobResult , shouldTriggerCallbacks : Bool = true ) {
// T h e j o b i s r e m o v e d f r o m t h e q u e u e b e f o r e i t r u n s s o a l l w e n e e d t o t o i s r e m o v e i t
// f r o m t h e ' c u r r e n t l y R u n n i n g ' s e t
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
currentlyRunningJobIds . mutate { $0 = $0 . removing ( job . id ) }
currentlyRunningJobInfo . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
guard shouldTriggerCallbacks else { return }