@ -670,14 +670,16 @@ open class Storage {
// / a n d j u s t b l o c k t h e t h r e a d w h e n w e w a n t t o p e r f o r m a s y n c h r o n o u s o p e r a t i o n
@ discardableResult private static func performOperation < T > (
_ info : CallInfo ,
_ dependencies : Dependencies ,
_ operation : @ escaping ( Database ) throws -> T ,
_ completion : ( ( Result < T , Error > ) -> Void ) ? = nil
) -> Result < T , Error > {
let queryDbLock = NSLock ( )
// A s e r i a l q u e u e f o r s y n c h r o n i z i n g c o m p l e t i o n u p d a t e s .
let syncQueue = DispatchQueue ( label : " com.session.performOperation.syncQueue " )
var queryDb : Database ?
let completionLock = NSLock ( )
var didComplete : Bool = false
var r esult: Result < T , Error > = . failure ( StorageError . invalidQueryResult )
var finalR esult: Result < T , Error > = . failure ( StorageError . invalidQueryResult )
let semaphore : DispatchSemaphore ? = ( info . isAsync ? nil : DispatchSemaphore ( value : 0 ) )
let logErrorIfNeeded : ( Result < T , Error > ) -> ( ) = { result in
switch result {
@ -685,46 +687,37 @@ open class Storage {
case . failure ( let error ) : StorageState . logIfNeeded ( error , isWrite : info . isWrite )
}
}
let completeOperation : ( Result < T , Error > ) -> Void = { operationResult in
completionLock . lock ( )
defer { completionLock . unlock ( ) }
guard ! didComplete else { return }
// / I f t h e q u e r y t i m e d o u t t h e n w e s h o u l d i n t e r r u p t t h e q u e r y ( d o n ' t w a n t t h e q u e r y t h r e a d t o r e m a i n b l o c k e d w h e n w e ' v e
// / a l r e a d y h a n d l e d i t a s a f a i l u r e )
switch operationResult {
case . failure ( let error ) where error as ? StorageError = = StorageError . transactionDeadlockTimeout :
queryDbLock . lock ( )
defer { queryDbLock . unlock ( ) }
queryDb ? . interrupt ( )
default : break
}
func completeOperation ( with result : Result < T , Error > ) {
syncQueue . sync {
if didComplete { return }
didComplete = true
result = operationResult
finalResult = result
semaphore ? . signal ( )
/ / / F o r a s y n c o p e r a t i o n s , l o g th e e r r o r a n d c a l l t h e c o m p l e t i o n c l o s u r e
// F o r a s y n c o p e r a t i o n s , l o g a n d i n v o k e t h e c o m p l e t i o n c l o s u r e .
if info . isAsync {
logErrorIfNeeded ( result )
completion ? ( result )
}
}
}
// / P e r f o r m t h e a c t u a l o p e r a t i o n
switch ( StorageState ( info . storage ) , info . isWrite ) {
case ( . invalid ( let error ) , _ ) : completeOperation ( . failure ( error ) )
case ( . invalid ( let error ) , _ ) : completeOperation ( with : . failure ( error ) )
case ( . valid ( let dbWriter ) , true ) :
dbWriter . asyncWrite (
{ db in
queryDbLock . lock ( )
defer { queryDbLock . unlock ( ) }
syncQueue . sync { queryDb = db }
if dependencies [ feature : . forceSlowDatabaseQueries ] {
Thread . sleep ( forTimeInterval : 1 )
}
queryDb = db
return try Storage . track ( db , info , operation )
} ,
completion : { _ , dbResult in completeOperation ( dbResult) }
completion : { _ , dbResult in completeOperation ( with: dbResult) }
)
case ( . valid ( let dbWriter ) , false ) :
@ -733,14 +726,16 @@ open class Storage {
switch dbResult {
case . failure ( let error ) : throw error
case . success ( let db ) :
queryDbLock . lock ( )
defer { queryDbLock . unlock ( ) }
syncQueue . sync { queryDb = db }
queryDb = db
completeOperation ( . success ( try Storage . track ( db , info , operation ) ) )
if dependencies [ feature : . forceSlowDatabaseQueries ] {
Thread . sleep ( forTimeInterval : 1 )
}
completeOperation ( with : . success ( try Storage . track ( db , info , operation ) ) )
}
} catch {
completeOperation ( . failure ( error ) )
completeOperation ( with : . failure ( error ) )
}
}
}
@ -760,38 +755,50 @@ open class Storage {
if ! isDebuggerAttached ( ) {
semaphoreResult = semaphore ? . wait ( timeout : . now ( ) + . seconds ( Storage . transactionDeadlockTimeoutSeconds ) )
}
else if ! info . isAsync {
let timerSemaphore : DispatchSemaphore = DispatchSemaphore ( value : 0 )
let timerQueue = DispatchQueue ( label : " org.session.debugSemaphoreTimer " , qos : . userInteractive )
let timer = DispatchSource . makeTimerSource ( queue : timerQueue )
var iterations : UInt64 = 0
else if ! info . isAsync , let semaphore : DispatchSemaphore = semaphore {
let pollQueue : DispatchQueue = DispatchQueue . global ( qos : . userInitiated )
var iterations : Int = 0
let maxIterations : Int = 50
let pollCompletionSemaphore : DispatchSemaphore = DispatchSemaphore ( value : 0 )
// / S t a g g e r t h e s i z e o f t h e ` p o l l I n t e r v a l s ` t o a v o i d h o l d i n g u p t h e t h r e a d i n c a s e t h e q u e r y r e s o l v e s v e r y q u i c k l y
let pollIntervals : [ DispatchTimeInterval ] = [
. milliseconds ( 5 ) , . milliseconds ( 5 ) , . milliseconds ( 10 ) , . milliseconds ( 10 ) , . milliseconds ( 10 ) ,
. milliseconds ( 100 )
]
// / E v e r y t i c k o f t h e t i m e r c h e c k i f t h e s e m a p h o r e h a s c o m p l e t e d o r w e h a v e t i m e d o u t
timer . schedule ( deadline : . now ( ) , repeating : . milliseconds ( 100 ) )
timer . setEventHandler {
func pollSemaphore ( ) {
iterations += 1
if iterations >= 50 || semaphore ? . wait ( timeout : . now ( ) ) = = . success {
timer . cancel ( )
timerSemaphore . signal ( )
guard iterations < maxIterations && semaphore . wait ( timeout : . now ( ) ) != . success else {
pollCompletionSemaphore . signal ( )
return
}
let nextInterval : DispatchTimeInterval = pollIntervals [ min ( iterations , pollIntervals . count - 1 ) ]
pollQueue . asyncAfter ( deadline : . now ( ) + nextInterval ) {
pollSemaphore ( )
}
}
timer . resume ( )
timerSemaphore . wait ( ) // W a i t i n d e f i n i t e l y f o r t h e t i m e r s e m a p h o r e
// / P o l l t h e s e m a p h o r e i n a b a c k g r o u n d q u e u e
pollQueue . asyncAfter ( deadline : . now ( ) + pollIntervals [ 0 ] ) { pollSemaphore ( ) }
pollCompletionSemaphore . wait ( ) // W a i t i n d e f i n i t e l y f o r t h e t i m e r s e m a p h o r e
semaphoreResult = ( iterations >= 50 ? . timedOut : . success )
}
#else
semaphoreResult = semaphore ? . wait ( timeout : . now ( ) + . seconds ( Storage . transactionDeadlockTimeoutSeconds ) )
#endif
// / I f t h e t r a n s a c t i o n t i m e d o u t t h e n l o g t h e e r r o r a n d r e p o r t a f a i l u r e , o t h e r w i s e h a n d l e w h a t e v e r t h e r e s u l t w a s
completeOperation ( semaphoreResult != . timedOut ?
result :
. failure ( StorageError . transactionDeadlockTimeout )
)
// / I f t h e q u e r y t i m e d o u t t h e n w e s h o u l d i n t e r r u p t t h e q u e r y ( d o n ' t w a n t t h e q u e r y t h r e a d t o r e m a i n b l o c k e d w h e n w e ' v e
// / a l r e a d y h a n d l e d i t a s a f a i l u r e ) a n d n e e d t o c a l l ` c o m p l e t e O p e r a t i o n ` a s i t w o u l d n ' t h a v e b e e n c a l l e d w i t h i n t h e
// / d b t r a n s a c t i o n y e t
if semaphoreResult = = . timedOut {
syncQueue . sync { queryDb ? . interrupt ( ) }
completeOperation ( with : . failure ( StorageError . transactionDeadlockTimeout ) )
}
return result
return finalR esult
}
private func performPublisherOperation < T > (
@ -812,9 +819,9 @@ open class Storage {
// / I n s t e a d o f t h i s w e a r e j u s t u s i n g ` D e f e r r e d { F u t u r e { } } ` w h i c h i s e x e c u t e d o n t h e s p e c i f i e d s c h e d u l e d
// / w h i c h b e h a v e s i n a m u c h m o r e e x p e c t e d w a y t h a n t h e G R D B ` r e a d P u b l i s h e r ` / ` w r i t e P u b l i s h e r ` d o e s
let info : CallInfo = CallInfo ( self , fileName , functionName , lineNumber , . syncWrite )
return Deferred {
return Deferred { [ dependencies ] in
Future { resolver in
resolver ( Storage . performOperation ( info , operation) )
resolver ( Storage . performOperation ( info , dependencies, operation) )
}
} . eraseToAnyPublisher ( )
}
@ -828,7 +835,7 @@ open class Storage {
lineNumber line : Int = #line ,
updates : @ escaping ( Database ) throws -> T ?
) -> T ? {
switch Storage . performOperation ( CallInfo ( self , file , funcN , line , . syncWrite ) , updates) {
switch Storage . performOperation ( CallInfo ( self , file , funcN , line , . syncWrite ) , dependencies, updates) {
case . failure : return nil
case . success ( let result ) : return result
}
@ -841,7 +848,7 @@ open class Storage {
updates : @ escaping ( Database ) throws -> T ,
completion : @ escaping ( Result < T , Error > ) -> Void = { _ in }
) {
Storage . performOperation ( CallInfo ( self , file , funcN , line , . asyncWrite ) , updates, completion )
Storage . performOperation ( CallInfo ( self , file , funcN , line , . asyncWrite ) , dependencies, updates, completion )
}
open func writePublisher < T > (
@ -859,7 +866,7 @@ open class Storage {
lineNumber line : Int = #line ,
_ value : @ escaping ( Database ) throws -> T ?
) -> T ? {
switch Storage . performOperation ( CallInfo ( self , file , funcN , line , . syncRead ) , value) {
switch Storage . performOperation ( CallInfo ( self , file , funcN , line , . syncRead ) , dependencies, value) {
case . failure : return nil
case . success ( let result ) : return result
}