Updated GRDB and refactored internal Storage operations

• Updated to GRDB 7.3.0 (from 6.29.3)
• Updated the ConfigMessageReceiveJob and MessageReceiveJob to use the `writeAsync` function (instead of the blocking `write` function) so that they aren't subject to the `Storage.transactionDeadlockTimeoutSeconds`
• Refactored the `Storage.performOperation` and `Storage.performPublisherOperation` to rely on the new cancellable async/await `Task` logic that GRDB 7 supports (as apparently the other async methods don't support cancellation...)
• Cleaned up some "Sendable" related warnings
• Minor tweaks to `Log.assertOnMainThread` to make it a little more readable
pull/1061/head
Morgan Pretty 1 month ago
parent 2642d3925e
commit e29758e401

@ -10262,7 +10262,7 @@
repositoryURL = "https://github.com/session-foundation/session-grdb-swift.git"; repositoryURL = "https://github.com/session-foundation/session-grdb-swift.git";
requirement = { requirement = {
kind = upToNextMajorVersion; kind = upToNextMajorVersion;
minimumVersion = 106.29.3; minimumVersion = 107.3.0;
}; };
}; };
FD756BEE2D06686500BD7199 /* XCRemoteSwiftPackageReference "session-lucide" */ = { FD756BEE2D06686500BD7199 /* XCRemoteSwiftPackageReference "session-lucide" */ = {

@ -96,8 +96,8 @@
"kind" : "remoteSourceControl", "kind" : "remoteSourceControl",
"location" : "https://github.com/session-foundation/session-grdb-swift.git", "location" : "https://github.com/session-foundation/session-grdb-swift.git",
"state" : { "state" : {
"revision" : "b3643613f1e0f392fa41072ee499da93b4c06b67", "revision" : "c69f8bf8a7ede8727c20f7c36eeffd3f55598487",
"version" : "106.29.3" "version" : "107.3.0"
} }
}, },
{ {

@ -422,7 +422,10 @@ final class ConversationVC: BaseVC, LibSessionRespondingViewController, Conversa
using: dependencies using: dependencies
) )
dependencies[singleton: .storage].addObserver(viewModel.pagedDataObserver) /// Dispatch adding the database observation to a background thread
DispatchQueue.global(qos: .userInitiated).async { [weak viewModel] in
dependencies[singleton: .storage].addObserver(viewModel?.pagedDataObserver)
}
super.init(nibName: nil, bundle: nil) super.init(nibName: nil, bundle: nil)
} }
@ -704,14 +707,18 @@ final class ConversationVC: BaseVC, LibSessionRespondingViewController, Conversa
// Stop observing changes // Stop observing changes
self?.stopObservingChanges() self?.stopObservingChanges()
dependencies[singleton: .storage].removeObserver(self?.viewModel.pagedDataObserver) DispatchQueue.global(qos: .userInitiated).async {
dependencies[singleton: .storage].removeObserver(self?.viewModel.pagedDataObserver)
}
// Swap the observing to the updated thread // Swap the observing to the updated thread
let newestVisibleMessageId: Int64? = self?.fullyVisibleCellViewModels()?.last?.id let newestVisibleMessageId: Int64? = self?.fullyVisibleCellViewModels()?.last?.id
self?.viewModel.swapToThread(updatedThreadId: unblindedId, focussedMessageId: newestVisibleMessageId) self?.viewModel.swapToThread(updatedThreadId: unblindedId, focussedMessageId: newestVisibleMessageId)
// Start observing changes again /// Start observing changes again (on a background thread)
dependencies[singleton: .storage].addObserver(self?.viewModel.pagedDataObserver) DispatchQueue.global(qos: .userInitiated).async {
dependencies[singleton: .storage].addObserver(self?.viewModel.pagedDataObserver)
}
self?.startObservingChanges() self?.startObservingChanges()
return return
} }

@ -31,7 +31,10 @@ public final class HomeVC: BaseVC, LibSessionRespondingViewController, UITableVi
init(using dependencies: Dependencies) { init(using dependencies: Dependencies) {
self.viewModel = HomeViewModel(using: dependencies) self.viewModel = HomeViewModel(using: dependencies)
dependencies[singleton: .storage].addObserver(viewModel.pagedDataObserver) /// Dispatch adding the database observation to a background thread
DispatchQueue.global(qos: .userInitiated).async { [weak viewModel] in
dependencies[singleton: .storage].addObserver(viewModel?.pagedDataObserver)
}
super.init(nibName: nil, bundle: nil) super.init(nibName: nil, bundle: nil)
} }

@ -34,7 +34,11 @@ public class DocumentTileViewController: UIViewController, UITableViewDelegate,
init(viewModel: MediaGalleryViewModel, using dependencies: Dependencies) { init(viewModel: MediaGalleryViewModel, using dependencies: Dependencies) {
self.dependencies = dependencies self.dependencies = dependencies
self.viewModel = viewModel self.viewModel = viewModel
dependencies[singleton: .storage].addObserver(viewModel.pagedDataObserver)
/// Dispatch adding the database observation to a background thread
DispatchQueue.global(qos: .userInitiated).async { [weak viewModel] in
dependencies[singleton: .storage].addObserver(viewModel?.pagedDataObserver)
}
super.init(nibName: nil, bundle: nil) super.init(nibName: nil, bundle: nil)
} }

@ -44,7 +44,11 @@ public class MediaTileViewController: UIViewController, UICollectionViewDataSour
init(viewModel: MediaGalleryViewModel, using dependencies: Dependencies) { init(viewModel: MediaGalleryViewModel, using dependencies: Dependencies) {
self.dependencies = dependencies self.dependencies = dependencies
self.viewModel = viewModel self.viewModel = viewModel
dependencies[singleton: .storage].addObserver(viewModel.pagedDataObserver)
/// Dispatch adding the database observation to a background thread
DispatchQueue.global(qos: .userInitiated).async { [weak viewModel] in
dependencies[singleton: .storage].addObserver(viewModel?.pagedDataObserver)
}
super.init(nibName: nil, bundle: nil) super.init(nibName: nil, bundle: nil)
} }

@ -17,7 +17,10 @@ protocol PagedObservationSource {
extension PagedObservationSource { extension PagedObservationSource {
public func didInit(using dependencies: Dependencies) { public func didInit(using dependencies: Dependencies) {
dependencies[singleton: .storage].addObserver(pagedDataObserver) /// Dispatch adding the database observation to a background thread
DispatchQueue.global(qos: .userInitiated).async { [weak pagedDataObserver] in
dependencies[singleton: .storage].addObserver(pagedDataObserver)
}
} }
} }

@ -55,11 +55,8 @@ public enum ConfigMessageReceiveJob: JobExecutor {
return failure(job, JobRunnerError.missingRequiredDetails, true) return failure(job, JobRunnerError.missingRequiredDetails, true)
} }
var lastError: Error? dependencies[singleton: .storage].writeAsync(
updates: { db in
dependencies[singleton: .storage].write { db in
// Send any SharedConfigMessages to the LibSession to handle it
do {
try dependencies.mutate(cache: .libSession) { cache in try dependencies.mutate(cache: .libSession) { cache in
try cache.handleConfigMessages( try cache.handleConfigMessages(
db, db,
@ -67,19 +64,19 @@ public enum ConfigMessageReceiveJob: JobExecutor {
messages: details.messages messages: details.messages
) )
} }
} },
catch { lastError = error } completion: { result in
} // Handle the result
switch result {
// Handle the result case .failure(let error):
switch lastError { Log.error(.cat, "Couldn't receive config message due to error: \(error)")
case .some(let error): removeDependencyOnMessageReceiveJobs()
Log.error(.cat, "Couldn't receive config message due to error: \(error)") failure(job, error, true)
removeDependencyOnMessageReceiveJobs()
failure(job, error, true)
case .none: success(job, false) case .success: success(job, false)
} }
}
)
} }
} }

@ -51,65 +51,74 @@ public enum MessageReceiveJob: JobExecutor {
} }
} }
dependencies[singleton: .storage].write { db in dependencies[singleton: .storage].writeAsync(
for (messageInfo, protoContent) in messageData { updates: { db -> Error? in
do { for (messageInfo, protoContent) in messageData {
try MessageReceiver.handle( do {
db, try MessageReceiver.handle(
threadId: threadId, db,
threadVariant: messageInfo.threadVariant, threadId: threadId,
message: messageInfo.message, threadVariant: messageInfo.threadVariant,
serverExpirationTimestamp: messageInfo.serverExpirationTimestamp, message: messageInfo.message,
associatedWithProto: protoContent, serverExpirationTimestamp: messageInfo.serverExpirationTimestamp,
using: dependencies associatedWithProto: protoContent,
) using: dependencies
)
}
catch {
// If the current message is a permanent failure then override it with the
// new error (we want to retry if there is a single non-permanent error)
switch error {
// Ignore duplicate and self-send errors (these will usually be caught during
// parsing but sometimes can get past and conflict at database insertion - eg.
// for open group messages) we also don't bother logging as it results in
// excessive logging which isn't useful)
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
DatabaseError.SQLITE_CONSTRAINT, // Sometimes thrown for UNIQUE
MessageReceiverError.duplicateMessage,
MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend:
break
case let receiverError as MessageReceiverError where !receiverError.isRetryable:
Log.error(.cat, "Permanently failed message due to error: \(error)")
continue
default:
Log.error(.cat, "Couldn't receive message due to error: \(error)")
lastError = error
// We failed to process this message but it is a retryable error
// so add it to the list to re-process
remainingMessagesToProcess.append(messageInfo)
}
}
} }
catch {
// If the current message is a permanent failure then override it with the // If any messages failed to process then we want to update the job to only include
// new error (we want to retry if there is a single non-permanent error) // those failed messages
switch error { guard !remainingMessagesToProcess.isEmpty else { return nil }
// Ignore duplicate and self-send errors (these will usually be caught during
// parsing but sometimes can get past and conflict at database insertion - eg. updatedJob = try job
// for open group messages) we also don't bother logging as it results in .with(details: Details(messages: remainingMessagesToProcess))
// excessive logging which isn't useful) .defaulting(to: job)
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, .upserted(db)
DatabaseError.SQLITE_CONSTRAINT, // Sometimes thrown for UNIQUE
MessageReceiverError.duplicateMessage, return lastError
MessageReceiverError.duplicateControlMessage, },
MessageReceiverError.selfSend: completion: { result in
break // TODO: [REFACTOR] Need to test this!!!
// Handle the result
case let receiverError as MessageReceiverError where !receiverError.isRetryable: switch result {
Log.error(.cat, "Permanently failed message due to error: \(error)") case .failure(let error): failure(updatedJob, error, false)
continue case .success(.some(let error as MessageReceiverError)) where !error.isRetryable:
failure(updatedJob, error, true)
default: case .success(.some(let error)): failure(updatedJob, error, false)
Log.error(.cat, "Couldn't receive message due to error: \(error)") case .success: success(updatedJob, false)
lastError = error
// We failed to process this message but it is a retryable error
// so add it to the list to re-process
remainingMessagesToProcess.append(messageInfo)
}
} }
} }
)
// If any messages failed to process then we want to update the job to only include
// those failed messages
guard !remainingMessagesToProcess.isEmpty else { return }
updatedJob = try job
.with(details: Details(messages: remainingMessagesToProcess))
.defaulting(to: job)
.upserted(db)
}
// Handle the result
switch lastError {
case let error as MessageReceiverError where !error.isRetryable: failure(updatedJob, error, true)
case .some(let error): failure(updatedJob, error, false)
case .none: success(updatedJob, false)
}
} }
} }

@ -53,6 +53,9 @@ open class Storage {
private static let writeTransactionStartTimeout: TimeInterval = 5 private static let writeTransactionStartTimeout: TimeInterval = 5
/// If a transaction takes longer than this duration then we should fail the transaction rather than keep hanging /// If a transaction takes longer than this duration then we should fail the transaction rather than keep hanging
///
/// **Note:** This timeout only applies to synchronous operations (the assumption being that if we know an operation is going to
/// take a long time then we should probably be handling it asynchronously rather than a synchronous way)
private static let transactionDeadlockTimeoutSeconds: Int = 5 private static let transactionDeadlockTimeoutSeconds: Int = 5
private static var sharedDatabaseDirectoryPath: String { "\(SessionFileManager.nonInjectedAppSharedDataDirectoryPath)/database" } private static var sharedDatabaseDirectoryPath: String { "\(SessionFileManager.nonInjectedAppSharedDataDirectoryPath)/database" }
@ -142,11 +145,6 @@ open class Storage {
var config = Configuration() var config = Configuration()
config.label = Storage.queuePrefix config.label = Storage.queuePrefix
config.maximumReaderCount = 10 /// Increase the max read connection limit - Default is 5 config.maximumReaderCount = 10 /// Increase the max read connection limit - Default is 5
/// It seems we should do this per https://github.com/groue/GRDB.swift/pull/1485 but with this change
/// we then need to define how long a write transaction should wait for before timing out (read transactions always run
/// in`DEFERRED` mode so won't be affected by these settings)
config.defaultTransactionKind = .immediate
config.busyMode = .timeout(Storage.writeTransactionStartTimeout) config.busyMode = .timeout(Storage.writeTransactionStartTimeout)
/// Load in the SQLCipher keys /// Load in the SQLCipher keys
@ -551,6 +549,13 @@ open class Storage {
case valid(DatabaseWriter) case valid(DatabaseWriter)
case invalid(Error) case invalid(Error)
var forcedError: Error {
switch self {
case .valid: return StorageError.validStorageIncorrectlyHandledAsError
case .invalid(let error): return error
}
}
init(_ storage: Storage?) { init(_ storage: Storage?) {
switch (storage?.isSuspended, storage?.isValid, storage?.dbWriter) { switch (storage?.isSuspended, storage?.isValid, storage?.dbWriter) {
case (true, _, _): self = .invalid(StorageError.databaseSuspended) case (true, _, _): self = .invalid(StorageError.databaseSuspended)
@ -559,38 +564,46 @@ open class Storage {
} }
} }
static func logIfNeeded(_ error: Error, isWrite: Bool) { fileprivate static func logIfNeeded(_ error: Error, info: Storage.CallInfo) {
let action: String = (info.isWrite ? "write" : "read")
switch error { switch error {
case DatabaseError.SQLITE_ABORT, DatabaseError.SQLITE_INTERRUPT, DatabaseError.SQLITE_ERROR: case DatabaseError.SQLITE_ABORT, DatabaseError.SQLITE_INTERRUPT, DatabaseError.SQLITE_ERROR:
let message: String = ((error as? DatabaseError)?.message ?? "Unknown") let message: String = ((error as? DatabaseError)?.message ?? "Unknown")
Log.error(.storage, "Database \(isWrite ? "write" : "read") failed due to error: \(message)") Log.error(.storage, "Database \(action) failed due to error: \(message) - [ \(info.callInfo) ]")
case StorageError.databaseInvalid: case StorageError.databaseInvalid:
Log.error(.storage, "Database \(isWrite ? "write" : "read") failed as the database is invalid.") Log.error(.storage, "Database \(action) failed as the database is invalid - [ \(info.callInfo) ]")
case StorageError.databaseSuspended: case StorageError.databaseSuspended:
Log.error(.storage, "Database \(isWrite ? "write" : "read") failed as the database is suspended.") Log.error(.storage, "Database \(action) failed as the database is suspended - [ \(info.callInfo) ]")
case StorageError.transactionDeadlockTimeout: case StorageError.transactionDeadlockTimeout:
Log.critical("[Storage] Database \(isWrite ? "write" : "read") failed due to a potential synchronous query deadlock timeout.") Log.critical(.storage, "Database \(action) failed due to a potential synchronous query deadlock timeout - [ \(info.callInfo) ]")
default: break default: break
} }
} }
static func logIfNeeded<T>(_ error: Error, isWrite: Bool) -> T? { fileprivate static func logIfNeeded<T>(_ error: Error, info: Storage.CallInfo) -> T? {
logIfNeeded(error, isWrite: isWrite) logIfNeeded(error, info: info)
return nil return nil
} }
static func logIfNeeded<T>(_ error: Error, isWrite: Bool) -> AnyPublisher<T, Error> { fileprivate static func logIfNeeded<T>(_ error: Error, info: Storage.CallInfo) -> AnyPublisher<T, Error> {
logIfNeeded(error, isWrite: isWrite) logIfNeeded(error, info: info)
return Fail<T, Error>(error: error).eraseToAnyPublisher() return Fail<T, Error>(error: error).eraseToAnyPublisher()
} }
} }
// MARK: - Operations // MARK: - Operations
/// Internal type to wrap the database operation `Task` so it can be cancelled when used with `Combine` (since GRDB doesn't
/// actually handle publishers publishers)
final class TaskHolder {
var task: Task<(), Never>?
}
private static func track<T>( private static func track<T>(
_ db: Database, _ db: Database,
_ info: CallInfo, _ info: CallInfo,
@ -634,122 +647,108 @@ open class Storage {
_ dependencies: Dependencies, _ dependencies: Dependencies,
_ operation: @escaping (Database) throws -> T, _ operation: @escaping (Database) throws -> T,
_ asyncCompletion: ((Result<T, Error>) -> Void)? = nil _ asyncCompletion: ((Result<T, Error>) -> Void)? = nil
) -> Result<T, Error> { ) -> (result: Result<T, Error>, task: Task<(), Never>?) {
// A serial queue for synchronizing completion updates. /// Ensure we are in a valid state
let syncQueue = DispatchQueue(label: "com.session.performOperation.syncQueue") let storageState: StorageState = StorageState(info.storage)
guard case .valid(let dbWriter) = storageState else {
if info.isAsync { asyncCompletion?(.failure(storageState.forcedError)) }
return (.failure(storageState.forcedError), nil)
}
weak var queryDb: Database? /// Setup required variables
var didTimeout: Bool = false let syncQueue = DispatchQueue(label: "com.session.performOperation.syncQueue")
let semaphore: DispatchSemaphore = DispatchSemaphore(value: 0)
var operationResult: Result<T, Error>? var operationResult: Result<T, Error>?
let semaphore: DispatchSemaphore? = (info.isAsync ? nil : DispatchSemaphore(value: 0))
let logErrorIfNeeded: (Result<T, Error>) -> Result<T, Error> = { result in let logErrorIfNeeded: (Result<T, Error>) -> Result<T, Error> = { result in
switch result { switch result {
case .success: break case .success: break
case .failure(let error): StorageState.logIfNeeded(error, isWrite: info.isWrite) case .failure(let error): StorageState.logIfNeeded(error, info: info)
} }
return result return result
} }
/// Convenience function to remove duplication
func completeOperation(with result: Result<T, Error>) { func completeOperation(with result: Result<T, Error>) {
syncQueue.sync { syncQueue.sync {
guard !didTimeout && operationResult == nil else { return } guard operationResult == nil else { return }
operationResult = result operationResult = result
semaphore?.signal() semaphore.signal()
// For async operations, log and invoke the completion closure. /// For async operations, log and invoke the completion closure.
if info.isAsync { if info.isAsync {
asyncCompletion?(logErrorIfNeeded(result)) asyncCompletion?(logErrorIfNeeded(result))
} }
} }
} }
/// Perform the actual operation let task: Task<(), Never> = Task {
switch (StorageState(info.storage), info.isWrite) { return await withThrowingTaskGroup(of: T.self) { group in
case (.invalid(let error), _): completeOperation(with: .failure(error)) /// Add the task to perform the actual database operation
case (.valid(let dbWriter), true): group.addTask {
dbWriter.asyncWrite( let trackedOperation: @Sendable (Database) throws -> T = { db in
{ db in
syncQueue.sync { queryDb = db }
defer { syncQueue.sync { queryDb = nil } }
if dependencies[feature: .forceSlowDatabaseQueries] { if dependencies[feature: .forceSlowDatabaseQueries] {
Thread.sleep(forTimeInterval: 1) Thread.sleep(forTimeInterval: 1)
} }
return try Storage.track(db, info, operation) return try Storage.track(db, info, operation)
}, }
completion: { _, dbResult in completeOperation(with: dbResult) }
) return (info.isWrite ?
try await dbWriter.write(trackedOperation) :
try await dbWriter.read(trackedOperation)
)
}
case (.valid(let dbWriter), false): /// If this is a syncronous task then we want to the operation to timeout to ensure we don't unintentionally
dbWriter.asyncRead { dbResult in /// create a deadlock
do { if !info.isAsync {
switch dbResult { group.addTask {
case .failure(let error): throw error let timeoutNanoseconds: UInt64 = UInt64(Storage.transactionDeadlockTimeoutSeconds * 1_000_000_000)
case .success(let db):
syncQueue.sync { queryDb = db } /// If the debugger is attached then we want to have a lot of shorter sleep iterations as the clock doesn't get
defer { syncQueue.sync { queryDb = nil } } /// paused when stopped on a breakpoint (and we don't want to end up having a bunch of false positive
/// database timeouts while debugging code)
if dependencies[feature: .forceSlowDatabaseQueries] { ///
Thread.sleep(forTimeInterval: 1) /// **Note:** `isDebuggerAttached` will always return `false` in production builds
} if isDebuggerAttached() {
let numIterations: UInt64 = 50
completeOperation(with: .success(try Storage.track(db, info, operation)))
for _ in (0..<numIterations) {
try await Task.sleep(nanoseconds: (timeoutNanoseconds / numIterations))
}
} }
} catch { else if info.isWrite {
completeOperation(with: .failure(error)) /// This if statement is redundant **but** it means when we get symbolicated crash logs we can distinguish
/// between the database threads which are reading and writing
try await Task.sleep(nanoseconds: timeoutNanoseconds)
}
else {
try await Task.sleep(nanoseconds: timeoutNanoseconds)
}
throw StorageError.transactionDeadlockTimeout
} }
} }
}
/// If this is a synchronous operation then `semaphore` will exist and will block here waiting on the signal from one of the
/// above closures to be sent
///
/// **Note:** Unfortunately this timeout can be really annoying when debugging because the semaphore timeout is based on
/// system time which doesn't get paused when stopping on a breakpoint (which means if you break in the middle of a database
/// query it's pretty much guaranteed to timeout)
///
/// To try to avoid this we have the below code to try to replicate the behaviour of the proper semaphore timeout while the debugger
/// is attached as this approach does seem to get paused (or at least only perform a single iteration per debugger step)
if let semaphore: DispatchSemaphore = semaphore {
var semaphoreResult: DispatchTimeoutResult
#if DEBUG
if isDebuggerAttached() {
semaphoreResult = debugWait(semaphore: semaphore, info: info)
}
else {
semaphoreResult = semaphore.wait(timeout: .now() + .seconds(Storage.transactionDeadlockTimeoutSeconds))
}
#else
/// This if statement is redundant **but** it means when we get symbolicated crash logs we can distinguish
/// between the database threads which are reading and writing
if info.isWrite {
semaphoreResult = semaphore.wait(timeout: .now() + .seconds(Storage.transactionDeadlockTimeoutSeconds))
}
else {
semaphoreResult = semaphore.wait(timeout: .now() + .seconds(Storage.transactionDeadlockTimeoutSeconds))
}
#endif
/// Check if the query timed out in the `syncQueue` to ensure that we don't run into a race condition between handling
/// the timeout and handling the query completion
///
/// If it did timeout then we should interrupt the query (don't want the query thread to remain blocked when we've
/// already handled it as a failure)
syncQueue.sync {
guard semaphoreResult == .timedOut && operationResult == nil else { return }
didTimeout = true /// Wait for the first task to finish
queryDb?.interrupt() ///
/// **Note:** THe case where `nextResult` returns `nil` is only meant to happen when the group has no
/// tasks, so shouldn't be considered a valid case (hence the `invalidQueryResult` fallback)
let result: Result<T, Error>? = await group.nextResult()
group.cancelAll()
completeOperation(with: result ?? .failure(StorageError.invalidQueryResult))
} }
return logErrorIfNeeded(operationResult ?? .failure(StorageError.transactionDeadlockTimeout))
} }
/// For the `async` operation the returned value should be ignored so just return the `invalidQueryResult` error /// For the `async` operation the returned value should be ignored so just return the `invalidQueryResult` error
return .failure(StorageError.invalidQueryResult) guard !info.isAsync else {
return (.failure(StorageError.invalidQueryResult), task)
}
/// Block until we have a result
semaphore.wait()
return (logErrorIfNeeded(operationResult ?? .failure(StorageError.transactionDeadlockTimeout)), task)
} }
private func performPublisherOperation<T>( private func performPublisherOperation<T>(
@ -759,59 +758,33 @@ open class Storage {
isWrite: Bool, isWrite: Bool,
_ operation: @escaping (Database) throws -> T _ operation: @escaping (Database) throws -> T
) -> AnyPublisher<T, Error> { ) -> AnyPublisher<T, Error> {
let info: CallInfo = CallInfo(self, fileName, functionName, lineNumber, (isWrite ? .asyncWrite : .asyncRead))
switch StorageState(self) { switch StorageState(self) {
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false) case .invalid(let error): return StorageState.logIfNeeded(error, info: info)
case .valid: case .valid:
/// **Note:** GRDB does have `readPublisher`/`writePublisher` functions but it appears to asynchronously /// **Note:** GRDB does have `readPublisher`/`writePublisher` functions but it appears to asynchronously
/// trigger both the `output` and `complete` closures at the same time which causes a lot of unexpected /// trigger both the `output` and `complete` closures at the same time which causes a lot of unexpected
/// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code /// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code
/// for more information see https://github.com/groue/GRDB.swift/issues/1334) /// for more information see https://github.com/groue/GRDB.swift/issues/1334)
/// ///
/// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduled /// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduler
/// which behaves in a much more expected way than the GRDB `readPublisher`/`writePublisher` does /// (which behaves in a much more expected way than the GRDB `readPublisher`/`writePublisher` does)
let info: CallInfo = CallInfo(self, fileName, functionName, lineNumber, .syncWrite) /// and hooking that into our `performOperation` function which uses the GRDB async/await functions that support
/// cancellation (as we want to support cancellation as well)
let holder: TaskHolder = TaskHolder()
return Deferred { [dependencies] in return Deferred { [dependencies] in
Future { resolver in Future { resolver in
resolver(Storage.performOperation(info, dependencies, operation)) let (_, task) = Storage.performOperation(info, dependencies, operation) { result in
resolver(result)
}
holder.task = task
} }
}.eraseToAnyPublisher() }
} .handleEvents(receiveCancel: { holder.task?.cancel() })
} .eraseToAnyPublisher()
private static func debugWait(semaphore: DispatchSemaphore, info: CallInfo) -> DispatchTimeoutResult {
let pollQueue: DispatchQueue = DispatchQueue(label: "com.session.debugWaitTimer.\(UUID().uuidString)")
let standardPollInterval: DispatchTimeInterval = .milliseconds(100)
var iterations: Int = 0
let maxIterations: Int = ((Storage.transactionDeadlockTimeoutSeconds * 1000) / standardPollInterval.milliseconds)
let pollCompletionSemaphore: DispatchSemaphore = DispatchSemaphore(value: 0)
/// Stagger the size of the `pollIntervals` to avoid holding up the thread in case the query resolves very quickly (this
/// means the timeout will occur ~500ms early but helps prevent false main thread lag appearing when debugging that wouldn't
/// affect production)
let pollIntervals: [DispatchTimeInterval] = [
.milliseconds(5), .milliseconds(5), .milliseconds(10), .milliseconds(10), .milliseconds(10),
standardPollInterval
]
func pollSemaphore() {
iterations += 1
guard iterations < maxIterations && semaphore.wait(timeout: .now()) != .success else {
pollCompletionSemaphore.signal()
return
}
let nextInterval: DispatchTimeInterval = pollIntervals[min(iterations, pollIntervals.count - 1)]
pollQueue.asyncAfter(deadline: .now() + nextInterval) {
pollSemaphore()
}
} }
/// Poll the semaphore in a background queue
pollQueue.asyncAfter(deadline: .now() + pollIntervals[0]) { pollSemaphore() }
pollCompletionSemaphore.wait() // Wait indefinitely for the timer semaphore
return (iterations >= 50 ? .timedOut : .success)
} }
// MARK: - Functions // MARK: - Functions
@ -823,8 +796,8 @@ open class Storage {
updates: @escaping (Database) throws -> T? updates: @escaping (Database) throws -> T?
) -> T? { ) -> T? {
switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncWrite), dependencies, updates) { switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncWrite), dependencies, updates) {
case .failure: return nil case (.failure, _): return nil
case .success(let result): return result case (.success(let result), _): return result
} }
} }
@ -854,8 +827,8 @@ open class Storage {
_ value: @escaping (Database) throws -> T? _ value: @escaping (Database) throws -> T?
) -> T? { ) -> T? {
switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncRead), dependencies, value) { switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncRead), dependencies, value) {
case .failure: return nil case (.failure, _): return nil
case .success(let result): return result case (.success(let result), _): return result
} }
} }
@ -896,30 +869,32 @@ open class Storage {
) )
} }
/// Add a database observation
///
/// **Note:** This function **MUST NOT** be called from the main thread
public func addObserver(_ observer: TransactionObserver?) { public func addObserver(_ observer: TransactionObserver?) {
guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return } guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return }
guard let observer: TransactionObserver = observer else { return } guard let observer: TransactionObserver = observer else { return }
// Note: This actually triggers a write to the database so can be blocked by other /// This actually triggers a write to the database so can be blocked by other writes so shouldn't be called on the main thread,
// writes, since it's usually called on the main thread when creating a view controller /// we don't dispatch to an async thread in here because `TransactionObserver` isn't `Sendable` so instead just require
// this can result in the UI hanging - to avoid this we dispatch (and hope there isn't /// that it isn't called on the main thread
// negative impact) Log.assertNotOnMainThread()
DispatchQueue.global(qos: .default).async { dbWriter.add(transactionObserver: observer)
dbWriter.add(transactionObserver: observer)
}
} }
/// Remove a database observation
///
/// **Note:** This function **MUST NOT** be called from the main thread
public func removeObserver(_ observer: TransactionObserver?) { public func removeObserver(_ observer: TransactionObserver?) {
guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return } guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return }
guard let observer: TransactionObserver = observer else { return } guard let observer: TransactionObserver = observer else { return }
// Note: This actually triggers a write to the database so can be blocked by other /// This actually triggers a write to the database so can be blocked by other writes so shouldn't be called on the main thread,
// writes, since it's usually called on the main thread when creating a view controller /// we don't dispatch to an async thread in here because `TransactionObserver` isn't `Sendable` so instead just require
// this can result in the UI hanging - to avoid this we dispatch (and hope there isn't /// that it isn't called on the main thread
// negative impact) Log.assertNotOnMainThread()
DispatchQueue.global(qos: .default).async { dbWriter.remove(transactionObserver: observer)
dbWriter.remove(transactionObserver: observer)
}
} }
} }
@ -1028,7 +1003,7 @@ private extension Storage {
result?.timer = nil result?.timer = nil
let action: String = (info.isWrite ? "write" : "read") let action: String = (info.isWrite ? "write" : "read")
Log.warn("[Storage] Slow \(action) taking longer than \(Storage.slowTransactionThreshold, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]") Log.warn(.storage, "Slow \(action) taking longer than \(Storage.slowTransactionThreshold, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]")
result?.wasSlowTransaction = true result?.wasSlowTransaction = true
} }
result.timer?.resume() result.timer?.resume()
@ -1044,7 +1019,7 @@ private extension Storage {
let end: CFTimeInterval = CACurrentMediaTime() let end: CFTimeInterval = CACurrentMediaTime()
let action: String = (info.isWrite ? "write" : "read") let action: String = (info.isWrite ? "write" : "read")
Log.warn("[Storage] Slow \(action) completed after \(end - start, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]") Log.warn(.storage, "Slow \(action) completed after \(end - start, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]")
} }
} }
} }
@ -1159,13 +1134,18 @@ public extension Storage {
} }
} }
#if DEBUG /// Function to determine if the debugger is attached
///
/// **Note:** Only contains logic when `DEBUG` is defined, otherwise it always returns false
func isDebuggerAttached() -> Bool { func isDebuggerAttached() -> Bool {
#if DEBUG
var info = kinfo_proc() var info = kinfo_proc()
var size = MemoryLayout<kinfo_proc>.stride var size = MemoryLayout<kinfo_proc>.stride
var mib: [Int32] = [CTL_KERN, KERN_PROC, KERN_PROC_PID, getpid()] var mib: [Int32] = [CTL_KERN, KERN_PROC, KERN_PROC_PID, getpid()]
let sysctlResult = sysctl(&mib, UInt32(mib.count), &info, &size, nil, 0) let sysctlResult = sysctl(&mib, UInt32(mib.count), &info, &size, nil, 0)
guard sysctlResult == 0 else { return false } guard sysctlResult == 0 else { return false }
return (info.kp_proc.p_flag & P_TRACED) != 0 return (info.kp_proc.p_flag & P_TRACED) != 0
} #else
return false
#endif #endif
}

@ -14,7 +14,12 @@ public enum StorageError: Error {
case keySpecInaccessible case keySpecInaccessible
case decodingFailed case decodingFailed
case invalidQueryResult case invalidQueryResult
/// This error is thrown when a synchronous operation takes longer than `Storage.transactionDeadlockTimeoutSeconds`,
/// the assumption being that if we know an operation is going to take a long time then we should probably be handling it asynchronously
/// rather than a synchronous way
case transactionDeadlockTimeout case transactionDeadlockTimeout
case validStorageIncorrectlyHandledAsError
case failedToSave case failedToSave
case objectNotFound case objectNotFound

@ -335,12 +335,29 @@ public enum Log {
function: StaticString = #function, function: StaticString = #function,
line: UInt = #line line: UInt = #line
) { ) {
guard !Thread.isMainThread else { return } switch Thread.isMainThread {
case true: return
let filename: String = URL(fileURLWithPath: "\(file)").lastPathComponent case false:
let formattedMessage: String = "[\(filename):\(line) \(function)] Must be on main thread." let filename: String = URL(fileURLWithPath: "\(file)").lastPathComponent
custom(.critical, [], formattedMessage, file: file, function: function, line: line) let formattedMessage: String = "[\(filename):\(line) \(function)] Must be on main thread."
assertionFailure(formattedMessage) custom(.critical, [], formattedMessage, file: file, function: function, line: line)
assertionFailure(formattedMessage)
}
}
public static func assertNotOnMainThread(
file: StaticString = #file,
function: StaticString = #function,
line: UInt = #line
) {
switch Thread.isMainThread {
case false: return
case true:
let filename: String = URL(fileURLWithPath: "\(file)").lastPathComponent
let formattedMessage: String = "[\(filename):\(line) \(function)] Must NOT be on main thread."
custom(.critical, [], formattedMessage, file: file, function: function, line: line)
assertionFailure(formattedMessage)
}
} }
public static func custom( public static func custom(

Loading…
Cancel
Save