Moved some logic outside of DBWrite closures to prevent hangs

Updated the SessionApp.presentConversation function from using the DBWrite thread if it didn't need to
Updated the PagedDatabaseObserver to process database commits async on a serial queue to avoid holding up the DBWrite thread
Moved another Atomic mutation outside of a DBWrite closure
Refactored the PagedDatabaseObserver 'databaseDidCommit' logic to be much more straightforward
Tweaked a couple of flaky unit tests
pull/751/head
Morgan Pretty 2 years ago
parent a7761697a9
commit 6f4bdcdccb

@ -6599,7 +6599,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic; CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO; COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 416; CURRENT_PROJECT_VERSION = 417;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7; DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = "$(inherited)"; FRAMEWORK_SEARCH_PATHS = "$(inherited)";
@ -6671,7 +6671,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic; CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO; COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 416; CURRENT_PROJECT_VERSION = 417;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7; DEVELOPMENT_TEAM = SUQ8J2PCT7;
ENABLE_NS_ASSERTIONS = NO; ENABLE_NS_ASSERTIONS = NO;
@ -6736,7 +6736,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic; CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO; COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 416; CURRENT_PROJECT_VERSION = 417;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7; DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = "$(inherited)"; FRAMEWORK_SEARCH_PATHS = "$(inherited)";
@ -6810,7 +6810,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic; CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO; COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 416; CURRENT_PROJECT_VERSION = 417;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7; DEVELOPMENT_TEAM = SUQ8J2PCT7;
ENABLE_NS_ASSERTIONS = NO; ENABLE_NS_ASSERTIONS = NO;
@ -7718,7 +7718,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer"; CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 416; CURRENT_PROJECT_VERSION = 417;
DEVELOPMENT_TEAM = SUQ8J2PCT7; DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = ( FRAMEWORK_SEARCH_PATHS = (
"$(inherited)", "$(inherited)",
@ -7789,7 +7789,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer"; CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 416; CURRENT_PROJECT_VERSION = 417;
DEVELOPMENT_TEAM = SUQ8J2PCT7; DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = ( FRAMEWORK_SEARCH_PATHS = (
"$(inherited)", "$(inherited)",

@ -356,8 +356,12 @@ final class NewClosedGroupVC: BaseVC, UITableViewDataSource, UITableViewDelegate
} }
}, },
receiveValue: { thread in receiveValue: { thread in
self?.presentingViewController?.dismiss(animated: true, completion: nil) SessionApp.presentConversationCreatingIfNeeded(
SessionApp.presentConversation(for: thread.id, action: .compose, animated: false) for: thread.id,
variant: thread.variant,
dismissing: self?.presentingViewController,
animated: false
)
} }
) )
} }

@ -179,16 +179,13 @@ final class NewConversationVC: BaseVC, ThemedNavigation, UITableViewDelegate, UI
tableView.deselectRow(at: indexPath, animated: true) tableView.deselectRow(at: indexPath, animated: true)
let sessionId = newConversationViewModel.sectionData[indexPath.section].contacts[indexPath.row].id let sessionId = newConversationViewModel.sectionData[indexPath.section].contacts[indexPath.row].id
let maybeThread: SessionThread? = Storage.shared.write { db in
try SessionThread
.fetchOrCreate(db, id: sessionId, variant: .contact, shouldBeVisible: nil)
}
guard maybeThread != nil else { return }
self.navigationController?.dismiss(animated: true, completion: nil) SessionApp.presentConversationCreatingIfNeeded(
for: sessionId,
SessionApp.presentConversation(for: sessionId, action: .compose, animated: false) variant: .contact,
dismissing: navigationController,
animated: false
)
} }
func tableView(_ tableView: UITableView, willDisplayHeaderView view: UIView, forSection section: Int) { func tableView(_ tableView: UITableView, willDisplayHeaderView view: UIView, forSection section: Int) {

@ -260,16 +260,12 @@ final class NewDMVC: BaseVC, UIPageViewControllerDataSource, UIPageViewControlle
} }
private func startNewDM(with sessionId: String) { private func startNewDM(with sessionId: String) {
let maybeThread: SessionThread? = Storage.shared.write { db in SessionApp.presentConversationCreatingIfNeeded(
try SessionThread for: sessionId,
.fetchOrCreate(db, id: sessionId, variant: .contact, shouldBeVisible: nil) variant: .contact,
} dismissing: presentingViewController,
animated: false
guard maybeThread != nil else { return } )
presentingViewController?.dismiss(animated: true, completion: nil)
SessionApp.presentConversation(for: sessionId, action: .compose, animated: false)
} }
} }

@ -35,59 +35,78 @@ public struct SessionApp {
// MARK: - View Convenience Methods // MARK: - View Convenience Methods
public static func presentConversation(for threadId: String, action: ConversationViewModel.Action = .none, animated: Bool) { public static func presentConversationCreatingIfNeeded(
let maybeThreadInfo: (thread: SessionThread, isMessageRequest: Bool)? = Storage.shared.write { db in for threadId: String,
let thread: SessionThread = try SessionThread variant: SessionThread.Variant,
.fetchOrCreate(db, id: threadId, variant: .contact, shouldBeVisible: nil) action: ConversationViewModel.Action = .none,
dismissing presentingViewController: UIViewController?,
animated: Bool
) {
let threadInfo: (threadExists: Bool, isMessageRequest: Bool)? = Storage.shared.read { db in
let isMessageRequest: Bool = {
switch variant {
case .contact:
return SessionThread
.isMessageRequest(
id: threadId,
variant: .contact,
currentUserPublicKey: getUserHexEncodedPublicKey(db),
shouldBeVisible: nil,
contactIsApproved: (try? Contact
.filter(id: threadId)
.select(.isApproved)
.asRequest(of: Bool.self)
.fetchOne(db))
.defaulting(to: false),
includeNonVisible: true
)
default: return false
}
}()
return (thread, thread.isMessageRequest(db)) return (SessionThread.filter(id: threadId).isNotEmpty(db), isMessageRequest)
} }
guard // Store the post-creation logic in a closure to avoid duplication
let variant: SessionThread.Variant = maybeThreadInfo?.thread.variant, let afterThreadCreated: () -> () = {
let isMessageRequest: Bool = maybeThreadInfo?.isMessageRequest presentingViewController?.dismiss(animated: true, completion: nil)
else { return }
homeViewController.wrappedValue?.show(
threadId,
variant: variant,
isMessageRequest: (threadInfo?.isMessageRequest == true),
with: action,
focusedInteractionInfo: nil,
animated: animated
)
}
self.presentConversation( /// The thread should generally exist at the time of calling this method, but on the off change it doesn't then we need to `fetchOrCreate` it and
for: threadId, /// should do it on a background thread just in case something is keeping the DBWrite thread busy as in the past this could cause the app to hang
threadVariant: variant, guard threadInfo?.threadExists == true else {
isMessageRequest: isMessageRequest, DispatchQueue.global(qos: .userInitiated).async {
action: action, Storage.shared.write { db in
focusInteractionInfo: nil, try SessionThread.fetchOrCreate(db, id: threadId, variant: variant, shouldBeVisible: nil)
animated: animated }
)
} // Send back to main thread for UI transitions
DispatchQueue.main.async {
public static func presentConversation( afterThreadCreated()
for threadId: String, }
threadVariant: SessionThread.Variant, }
isMessageRequest: Bool, return
action: ConversationViewModel.Action, }
focusInteractionInfo: Interaction.TimestampInfo?,
animated: Bool // Send to main thread if needed
) {
guard Thread.isMainThread else { guard Thread.isMainThread else {
DispatchQueue.main.async { DispatchQueue.main.async {
self.presentConversation( afterThreadCreated()
for: threadId,
threadVariant: threadVariant,
isMessageRequest: isMessageRequest,
action: action,
focusInteractionInfo: focusInteractionInfo,
animated: animated
)
} }
return return
} }
homeViewController.wrappedValue?.show( afterThreadCreated()
threadId,
variant: threadVariant,
isMessageRequest: isMessageRequest,
with: action,
focusedInteractionInfo: focusInteractionInfo,
animated: animated
)
} }
// MARK: - Functions // MARK: - Functions

@ -37,6 +37,7 @@ enum AppNotificationAction: CaseIterable {
struct AppNotificationUserInfoKey { struct AppNotificationUserInfoKey {
static let threadId = "Signal.AppNotificationsUserInfoKey.threadId" static let threadId = "Signal.AppNotificationsUserInfoKey.threadId"
static let threadVariantRaw = "Signal.AppNotificationsUserInfoKey.threadVariantRaw"
static let callBackNumber = "Signal.AppNotificationsUserInfoKey.callBackNumber" static let callBackNumber = "Signal.AppNotificationsUserInfoKey.callBackNumber"
static let localCallId = "Signal.AppNotificationsUserInfoKey.localCallId" static let localCallId = "Signal.AppNotificationsUserInfoKey.localCallId"
static let threadNotificationCounter = "Session.AppNotificationsUserInfoKey.threadNotificationCounter" static let threadNotificationCounter = "Session.AppNotificationsUserInfoKey.threadNotificationCounter"
@ -232,8 +233,9 @@ public class NotificationPresenter: NotificationsProtocol {
// "no longer verified". // "no longer verified".
let category = AppNotificationCategory.incomingMessage let category = AppNotificationCategory.incomingMessage
let userInfo = [ let userInfo: [AnyHashable: Any] = [
AppNotificationUserInfoKey.threadId: thread.id AppNotificationUserInfoKey.threadId: thread.id,
AppNotificationUserInfoKey.threadVariantRaw: thread.variant.rawValue
] ]
let userPublicKey: String = getUserHexEncodedPublicKey(db) let userPublicKey: String = getUserHexEncodedPublicKey(db)
@ -301,8 +303,9 @@ public class NotificationPresenter: NotificationsProtocol {
let previewType: Preferences.NotificationPreviewType = db[.preferencesNotificationPreviewType] let previewType: Preferences.NotificationPreviewType = db[.preferencesNotificationPreviewType]
.defaulting(to: .nameAndPreview) .defaulting(to: .nameAndPreview)
let userInfo = [ let userInfo: [AnyHashable: Any] = [
AppNotificationUserInfoKey.threadId: thread.id AppNotificationUserInfoKey.threadId: thread.id,
AppNotificationUserInfoKey.threadVariantRaw: thread.variant.rawValue
] ]
let notificationTitle: String = "Session" let notificationTitle: String = "Session"
@ -378,8 +381,9 @@ public class NotificationPresenter: NotificationsProtocol {
let category = AppNotificationCategory.incomingMessage let category = AppNotificationCategory.incomingMessage
let userInfo = [ let userInfo: [AnyHashable: Any] = [
AppNotificationUserInfoKey.threadId: thread.id AppNotificationUserInfoKey.threadId: thread.id,
AppNotificationUserInfoKey.threadVariantRaw: thread.variant.rawValue
] ]
let threadName: String = SessionThread.displayName( let threadName: String = SessionThread.displayName(
@ -440,8 +444,9 @@ public class NotificationPresenter: NotificationsProtocol {
let notificationBody = NotificationStrings.failedToSendBody let notificationBody = NotificationStrings.failedToSendBody
let userInfo = [ let userInfo: [AnyHashable: Any] = [
AppNotificationUserInfoKey.threadId: thread.id AppNotificationUserInfoKey.threadId: thread.id,
AppNotificationUserInfoKey.threadVariantRaw: thread.variant.rawValue
] ]
let fallbackSound: Preferences.Sound = db[.defaultNotificationSound] let fallbackSound: Preferences.Sound = db[.defaultNotificationSound]
.defaulting(to: Preferences.Sound.defaultNotificationSound) .defaulting(to: Preferences.Sound.defaultNotificationSound)
@ -609,15 +614,22 @@ class NotificationActionHandler {
} }
func showThread(userInfo: [AnyHashable: Any]) -> AnyPublisher<Void, Never> { func showThread(userInfo: [AnyHashable: Any]) -> AnyPublisher<Void, Never> {
guard let threadId = userInfo[AppNotificationUserInfoKey.threadId] as? String else { guard
return showHomeVC() let threadId = userInfo[AppNotificationUserInfoKey.threadId] as? String,
} let threadVariantRaw = userInfo[AppNotificationUserInfoKey.threadVariantRaw] as? Int,
let threadVariant: SessionThread.Variant = SessionThread.Variant(rawValue: threadVariantRaw)
else { return showHomeVC() }
// If this happens when the the app is not, visible we skip the animation so the thread // If this happens when the the app is not, visible we skip the animation so the thread
// can be visible to the user immediately upon opening the app, rather than having to watch // can be visible to the user immediately upon opening the app, rather than having to watch
// it animate in from the homescreen. // it animate in from the homescreen.
let shouldAnimate: Bool = (UIApplication.shared.applicationState == .active) SessionApp.presentConversationCreatingIfNeeded(
SessionApp.presentConversation(for: threadId, animated: shouldAnimate) for: threadId,
variant: threadVariant,
dismissing: nil,
animated: (UIApplication.shared.applicationState == .active)
)
return Just(()) return Just(())
.eraseToAnyPublisher() .eraseToAnyPublisher()
} }

@ -217,12 +217,10 @@ final class JoinOpenGroupVC: BaseVC, UIPageViewControllerDataSource, UIPageViewC
self?.presentingViewController?.dismiss(animated: true, completion: nil) self?.presentingViewController?.dismiss(animated: true, completion: nil)
if shouldOpenCommunity { if shouldOpenCommunity {
SessionApp.presentConversation( SessionApp.presentConversationCreatingIfNeeded(
for: OpenGroup.idFor(roomToken: roomToken, server: server), for: OpenGroup.idFor(roomToken: roomToken, server: server),
threadVariant: .community, variant: .community,
isMessageRequest: false, dismissing: nil,
action: .compose,
focusInteractionInfo: nil,
animated: false animated: false
) )
} }

@ -138,16 +138,12 @@ final class QRCodeVC : BaseVC, UIPageViewControllerDataSource, UIPageViewControl
self.present(modal, animated: true) self.present(modal, animated: true)
} }
else { else {
let maybeThread: SessionThread? = Storage.shared.write { db in SessionApp.presentConversationCreatingIfNeeded(
try SessionThread for: hexEncodedPublicKey,
.fetchOrCreate(db, id: hexEncodedPublicKey, variant: .contact, shouldBeVisible: nil) variant: .contact,
} dismissing: presentingViewController,
animated: false
guard maybeThread != nil else { return } )
presentingViewController?.dismiss(animated: true, completion: nil)
SessionApp.presentConversation(for: hexEncodedPublicKey, action: .compose, animated: false)
} }
} }
} }

@ -192,20 +192,6 @@ public extension SessionThread {
) )
} }
func isMessageRequest(_ db: Database, includeNonVisible: Bool = false) -> Bool {
return (
(includeNonVisible || shouldBeVisible) &&
variant == .contact &&
id != getUserHexEncodedPublicKey(db) && // Note to self
(try? Contact
.filter(id: id)
.select(.isApproved)
.asRequest(of: Bool.self)
.fetchOne(db))
.defaulting(to: false) == false
)
}
static func canSendReadReceipt( static func canSendReadReceipt(
_ db: Database, _ db: Database,
threadId: String, threadId: String,
@ -431,6 +417,38 @@ public extension SessionThread {
).sqlExpression ).sqlExpression
} }
func isMessageRequest(_ db: Database, includeNonVisible: Bool = false) -> Bool {
return SessionThread.isMessageRequest(
id: id,
variant: variant,
currentUserPublicKey: getUserHexEncodedPublicKey(db),
shouldBeVisible: shouldBeVisible,
contactIsApproved: (try? Contact
.filter(id: id)
.select(.isApproved)
.asRequest(of: Bool.self)
.fetchOne(db))
.defaulting(to: false),
includeNonVisible: includeNonVisible
)
}
static func isMessageRequest(
id: String,
variant: SessionThread.Variant?,
currentUserPublicKey: String,
shouldBeVisible: Bool?,
contactIsApproved: Bool?,
includeNonVisible: Bool = false
) -> Bool {
return (
(includeNonVisible || shouldBeVisible == true) &&
variant == .contact &&
id != currentUserPublicKey && // Note to self
((contactIsApproved ?? false) == false)
)
}
func isNoteToSelf(_ db: Database? = nil) -> Bool { func isNoteToSelf(_ db: Database? = nil) -> Bool {
return ( return (
variant == .contact && variant == .contact &&

@ -265,12 +265,15 @@ public struct ProfileManager {
return return
} }
// Update the cache first (in case the DBWrite thread is blocked, this way other threads
// can retrieve from the cache and avoid triggering a download)
profileAvatarCache.mutate { $0[fileName] = decryptedData }
// Store the updated 'profilePictureFileName' // Store the updated 'profilePictureFileName'
Storage.shared.write { db in Storage.shared.write { db in
_ = try? Profile _ = try? Profile
.filter(id: profile.id) .filter(id: profile.id)
.updateAll(db, Profile.Columns.profilePictureFileName.set(to: fileName)) .updateAll(db, Profile.Columns.profilePictureFileName.set(to: fileName))
profileAvatarCache.mutate { $0[fileName] = decryptedData }
} }
} }
) )

@ -438,14 +438,6 @@ class OpenGroupManagerSpec: QuickSpec {
mockOGMCache.when { $0.isPolling }.thenReturn(true) mockOGMCache.when { $0.isPolling }.thenReturn(true)
mockOGMCache.when { $0.pollers }.thenReturn(["testserver": OpenGroupAPI.Poller(for: "testserver")]) mockOGMCache.when { $0.pollers }.thenReturn(["testserver": OpenGroupAPI.Poller(for: "testserver")])
mockUserDefaults
.when { (defaults: inout any UserDefaultsType) -> Any? in
defaults.object(forKey: SNUserDefaults.Date.lastOpen.rawValue)
}
.thenReturn(Date(timeIntervalSince1970: 1234567890))
openGroupManager.startPolling(using: dependencies)
} }
it("removes all pollers") { it("removes all pollers") {

@ -246,16 +246,9 @@ class ThreadDisappearingMessagesSettingsViewModelSpec: QuickSpec {
try DisappearingMessagesConfiguration.fetchOne(db, id: "TestId") try DisappearingMessagesConfiguration.fetchOne(db, id: "TestId")
} }
expect(updatedConfig?.isEnabled) expect(updatedConfig?.isEnabled).to(beTrue())
.toEventually(
beTrue(),
timeout: .milliseconds(100)
)
expect(updatedConfig?.durationSeconds) expect(updatedConfig?.durationSeconds)
.toEventually( .to(equal(DisappearingMessagesConfiguration.validDurationsSeconds.last ?? -1))
equal(DisappearingMessagesConfiguration.validDurationsSeconds.last ?? -1),
timeout: .milliseconds(100)
)
} }
} }
} }

@ -10,6 +10,12 @@ import DifferenceKit
/// ///
/// **Note:** We **MUST** have accurate `filterSQL` and `orderSQL` values otherwise the indexing won't work /// **Note:** We **MUST** have accurate `filterSQL` and `orderSQL` values otherwise the indexing won't work
public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where ObservedTable: TableRecord & ColumnExpressible & Identifiable, T: FetchableRecordWithRowId & Identifiable { public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where ObservedTable: TableRecord & ColumnExpressible & Identifiable, T: FetchableRecordWithRowId & Identifiable {
private let commitProcessingQueue: DispatchQueue = DispatchQueue(
label: "PagedDatabaseObserver.commitProcessingQueue",
qos: .userInitiated,
attributes: [] // Must be serial in order to avoid updates getting processed in the wrong order
)
// MARK: - Variables // MARK: - Variables
private let pagedTableName: String private let pagedTableName: String
@ -145,74 +151,58 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
changesInCommit.mutate { $0.insert(trackedChange) } changesInCommit.mutate { $0.insert(trackedChange) }
} }
// Note: We will process all updates which come through this method even if /// We will process all updates which come through this method even if 'onChange' is null because if the UI stops observing and then starts
// 'onChange' is null because if the UI stops observing and then starts again /// again later we don't want to have missed any changes which happened while the UI wasn't subscribed (and doing a full re-query seems painful...)
// later we don't want to have missed any changes which happened while the UI ///
// wasn't subscribed (and doing a full re-query seems painful...) /// **Note:** This function is generally called within the DBWrite thread but we don't actually need write access to process the commit, in order
/// to avoid blocking the DBWrite thread we dispatch to a serial `commitProcessingQueue` to process the incoming changes (in the past not doing
/// so was resulting in hanging when there was a lot of activity happening)
public func databaseDidCommit(_ db: Database) { public func databaseDidCommit(_ db: Database) {
// Since we can't be sure the behaviours of 'databaseDidChange' and 'databaseDidCommit' won't change in
// the future we extract and clear the values in 'changesInCommit' since it's 'Atomic<T>' so will different
// threads modifying the data resulting in us missing a change
var committedChanges: Set<PagedData.TrackedChange> = [] var committedChanges: Set<PagedData.TrackedChange> = []
self.changesInCommit.mutate { cachedChanges in self.changesInCommit.mutate { cachedChanges in
committedChanges = cachedChanges committedChanges = cachedChanges
cachedChanges.removeAll() cachedChanges.removeAll()
} }
// Note: This method will be called regardless of whether there were actually changes commitProcessingQueue.async { [weak self] in
// in the areas we are observing so we want to early-out if there aren't any relevant self?.processDatabaseCommit(committedChanges: committedChanges)
// updated rows }
}
private func processDatabaseCommit(committedChanges: Set<PagedData.TrackedChange>) {
// Do nothing when there are no changes
guard !committedChanges.isEmpty else { return } guard !committedChanges.isEmpty else { return }
typealias AssociatedDataInfo = [(hasChanges: Bool, data: ErasedAssociatedRecord)]
typealias UpdatedData = (cache: DataCache<T>, pageInfo: PagedData.PageInfo, hasChanges: Bool, associatedData: AssociatedDataInfo)
// Store the instance variables locally to avoid unwrapping
let dataCache: DataCache<T> = self.dataCache.wrappedValue
let pageInfo: PagedData.PageInfo = self.pageInfo.wrappedValue
let joinSQL: SQL? = self.joinSQL let joinSQL: SQL? = self.joinSQL
let orderSQL: SQL = self.orderSQL let orderSQL: SQL = self.orderSQL
let filterSQL: SQL = self.filterSQL let filterSQL: SQL = self.filterSQL
let associatedRecords: [ErasedAssociatedRecord] = self.associatedRecords let associatedRecords: [ErasedAssociatedRecord] = self.associatedRecords
let getAssociatedDataInfo: (Database, PagedData.PageInfo) -> AssociatedDataInfo = { db, updatedPageInfo in
let updateDataAndCallbackIfNeeded: (DataCache<T>, PagedData.PageInfo, Bool) -> () = { [weak self] updatedDataCache, updatedPageInfo, cacheHasChanges in associatedRecords.map { associatedRecord in
let associatedDataInfo: [(hasChanges: Bool, data: ErasedAssociatedRecord)] = associatedRecords let hasChanges: Bool = associatedRecord.tryUpdateForDatabaseCommit(
.map { associatedRecord in db,
let hasChanges: Bool = associatedRecord.tryUpdateForDatabaseCommit( changes: committedChanges,
db, joinSQL: joinSQL,
changes: committedChanges, orderSQL: orderSQL,
joinSQL: joinSQL, filterSQL: filterSQL,
orderSQL: orderSQL, pageInfo: updatedPageInfo
filterSQL: filterSQL, )
pageInfo: updatedPageInfo
)
return (hasChanges, associatedRecord)
}
// Check if we need to trigger a change callback
guard cacheHasChanges || associatedDataInfo.contains(where: { hasChanges, _ in hasChanges }) else {
return
}
// If the associated data changed then update the updatedCachedData with the
// updated associated data
var finalUpdatedDataCache: DataCache<T> = updatedDataCache
associatedDataInfo.forEach { hasChanges, associatedData in
guard cacheHasChanges || hasChanges else { return }
finalUpdatedDataCache = associatedData.updateAssociatedData(to: finalUpdatedDataCache) return (hasChanges, associatedRecord)
} }
// Update the cache, pageInfo and the change callback
self?.dataCache.mutate { $0 = finalUpdatedDataCache }
self?.pageInfo.mutate { $0 = updatedPageInfo }
// Make sure the updates run on the main thread
guard Thread.isMainThread else {
DispatchQueue.main.async { [weak self] in
self?.onChangeUnsorted(finalUpdatedDataCache.values, updatedPageInfo)
}
return
}
self?.onChangeUnsorted(finalUpdatedDataCache.values, updatedPageInfo)
} }
// Determing if there were any direct or related data changes // Determine if there were any direct or related data changes
let directChanges: Set<PagedData.TrackedChange> = committedChanges let directChanges: Set<PagedData.TrackedChange> = committedChanges
.filter { $0.tableName == pagedTableName } .filter { $0.tableName == pagedTableName }
let relatedChanges: [String: [PagedData.TrackedChange]] = committedChanges let relatedChanges: [String: [PagedData.TrackedChange]] = committedChanges
@ -227,215 +217,248 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
.filter { $0.tableName != pagedTableName } .filter { $0.tableName != pagedTableName }
.filter { $0.kind == .delete } .filter { $0.kind == .delete }
guard !directChanges.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else { // Process and retrieve the updated data
updateDataAndCallbackIfNeeded(self.dataCache.wrappedValue, self.pageInfo.wrappedValue, false) let updatedData: UpdatedData = Storage.shared
return .read { db -> UpdatedData in
} // If there aren't any direct or related changes then early-out
guard !directChanges.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else {
var updatedPageInfo: PagedData.PageInfo = self.pageInfo.wrappedValue return (dataCache, pageInfo, false, getAssociatedDataInfo(db, pageInfo))
var updatedDataCache: DataCache<T> = self.dataCache.wrappedValue }
let deletionChanges: [Int64] = directChanges
.filter { $0.kind == .delete }
.map { $0.rowId }
let oldDataCount: Int = dataCache.wrappedValue.count
// First remove any items which have been deleted
if !deletionChanges.isEmpty {
updatedDataCache = updatedDataCache.deleting(rowIds: deletionChanges)
// Make sure there were actually changes
if updatedDataCache.count != oldDataCount {
let dataSizeDiff: Int = (updatedDataCache.count - oldDataCount)
updatedPageInfo = PagedData.PageInfo( // Store a mutable copies of the dataCache and pageInfo for updating
pageSize: updatedPageInfo.pageSize, var updatedDataCache: DataCache<T> = dataCache
pageOffset: updatedPageInfo.pageOffset, var updatedPageInfo: PagedData.PageInfo = pageInfo
currentCount: (updatedPageInfo.currentCount + dataSizeDiff), let deletionChanges: [Int64] = directChanges
totalCount: (updatedPageInfo.totalCount + dataSizeDiff) .filter { $0.kind == .delete }
) .map { $0.rowId }
} let oldDataCount: Int = dataCache.count
}
// First remove any items which have been deleted
// If there are no inserted/updated rows then trigger the update callback and stop here if !deletionChanges.isEmpty {
let changesToQuery: [PagedData.TrackedChange] = directChanges updatedDataCache = updatedDataCache.deleting(rowIds: deletionChanges)
.filter { $0.kind != .delete }
guard !changesToQuery.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else {
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty)
return
}
// First we need to get the rowIds for the paged data connected to any of the related changes
let pagedRowIdsForRelatedChanges: Set<Int64> = {
guard !relatedChanges.isEmpty else { return [] }
return relatedChanges
.reduce(into: []) { result, next in
guard
let observedChange: PagedData.ObservedChanges = observedTableChangeTypes[next.key],
let joinToPagedType: SQL = observedChange.joinToPagedType
else { return }
let pagedRowIds: [Int64] = PagedData.pagedRowIdsForRelatedRowIds( // Make sure there were actually changes
db, if updatedDataCache.count != oldDataCount {
tableName: next.key, let dataSizeDiff: Int = (updatedDataCache.count - oldDataCount)
pagedTableName: pagedTableName,
relatedRowIds: Array(next.value.map { $0.rowId }.asSet()), updatedPageInfo = PagedData.PageInfo(
joinToPagedType: joinToPagedType pageSize: updatedPageInfo.pageSize,
) pageOffset: updatedPageInfo.pageOffset,
currentCount: (updatedPageInfo.currentCount + dataSizeDiff),
totalCount: (updatedPageInfo.totalCount + dataSizeDiff)
)
}
}
// If there are no inserted/updated rows then trigger then early-out
let changesToQuery: [PagedData.TrackedChange] = directChanges
.filter { $0.kind != .delete }
guard !changesToQuery.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else {
let associatedData: AssociatedDataInfo = getAssociatedDataInfo(db, updatedPageInfo)
return (updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty, associatedData)
}
// Next we need to determine if any related changes were associated to the pagedData we are
// observing, if they aren't (and there were no other direct changes) we can early-out
let pagedRowIdsForRelatedChanges: Set<Int64> = {
guard !relatedChanges.isEmpty else { return [] }
result.append(contentsOf: pagedRowIds) return relatedChanges
.reduce(into: []) { result, next in
guard
let observedChange: PagedData.ObservedChanges = observedTableChangeTypes[next.key],
let joinToPagedType: SQL = observedChange.joinToPagedType
else { return }
let pagedRowIds: [Int64] = PagedData.pagedRowIdsForRelatedRowIds(
db,
tableName: next.key,
pagedTableName: pagedTableName,
relatedRowIds: Array(next.value.map { $0.rowId }.asSet()),
joinToPagedType: joinToPagedType
)
result.append(contentsOf: pagedRowIds)
}
.asSet()
}()
guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty || !relatedDeletions.isEmpty else {
let associatedData: AssociatedDataInfo = getAssociatedDataInfo(db, updatedPageInfo)
return (updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty, associatedData)
} }
.asSet()
}() // Fetch the indexes of the rowIds so we can determine whether they should be added to the screen
let directRowIds: Set<Int64> = changesToQuery.map { $0.rowId }.asSet()
guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty || !relatedDeletions.isEmpty else { let pagedRowIdsForRelatedDeletions: Set<Int64> = relatedDeletions
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty) .compactMap { $0.pagedRowIdsForRelatedDeletion }
return .flatMap { $0 }
} .asSet()
let itemIndexes: [PagedData.RowIndexInfo] = PagedData.indexes(
// Fetch the indexes of the rowIds so we can determine whether they should be added to the screen db,
let directRowIds: Set<Int64> = changesToQuery.map { $0.rowId }.asSet() rowIds: Array(directRowIds),
let pagedRowIdsForRelatedDeletions: Set<Int64> = relatedDeletions tableName: pagedTableName,
.compactMap { $0.pagedRowIdsForRelatedDeletion } requiredJoinSQL: joinSQL,
.flatMap { $0 } orderSQL: orderSQL,
.asSet() filterSQL: filterSQL
let itemIndexes: [PagedData.RowIndexInfo] = PagedData.indexes(
db,
rowIds: Array(directRowIds),
tableName: pagedTableName,
requiredJoinSQL: joinSQL,
orderSQL: orderSQL,
filterSQL: filterSQL
)
let relatedChangeIndexes: [PagedData.RowIndexInfo] = PagedData.indexes(
db,
rowIds: Array(pagedRowIdsForRelatedChanges),
tableName: pagedTableName,
requiredJoinSQL: joinSQL,
orderSQL: orderSQL,
filterSQL: filterSQL
)
let relatedDeletionIndexes: [PagedData.RowIndexInfo] = PagedData.indexes(
db,
rowIds: Array(pagedRowIdsForRelatedDeletions),
tableName: pagedTableName,
requiredJoinSQL: joinSQL,
orderSQL: orderSQL,
filterSQL: filterSQL
)
// Determine if the indexes for the row ids should be displayed on the screen and remove any
// which shouldn't - values less than 'currentCount' or if there is at least one value less than
// 'currentCount' and the indexes are sequential (ie. more than the current loaded content was
// added at once)
func determineValidChanges(for indexInfo: [PagedData.RowIndexInfo]) -> [Int64] {
let indexes: [Int64] = Array(indexInfo
.map { $0.rowIndex }
.sorted()
.asSet())
let indexesAreSequential: Bool = (indexes.map { $0 - 1 }.dropFirst() == indexes.dropLast())
let hasOneValidIndex: Bool = indexInfo.contains(where: { info -> Bool in
info.rowIndex >= updatedPageInfo.pageOffset && (
info.rowIndex < updatedPageInfo.currentCount || (
updatedPageInfo.currentCount < updatedPageInfo.pageSize &&
info.rowIndex <= (updatedPageInfo.pageOffset + updatedPageInfo.pageSize)
)
) )
}) let relatedChangeIndexes: [PagedData.RowIndexInfo] = PagedData.indexes(
db,
return (indexesAreSequential && hasOneValidIndex ? rowIds: Array(pagedRowIdsForRelatedChanges),
indexInfo.map { $0.rowId } : tableName: pagedTableName,
indexInfo requiredJoinSQL: joinSQL,
.filter { info -> Bool in orderSQL: orderSQL,
filterSQL: filterSQL
)
let relatedDeletionIndexes: [PagedData.RowIndexInfo] = PagedData.indexes(
db,
rowIds: Array(pagedRowIdsForRelatedDeletions),
tableName: pagedTableName,
requiredJoinSQL: joinSQL,
orderSQL: orderSQL,
filterSQL: filterSQL
)
// Determine if the indexes for the row ids should be displayed on the screen and remove any
// which shouldn't - values less than 'currentCount' or if there is at least one value less than
// 'currentCount' and the indexes are sequential (ie. more than the current loaded content was
// added at once)
func determineValidChanges(for indexInfo: [PagedData.RowIndexInfo]) -> [Int64] {
let indexes: [Int64] = Array(indexInfo
.map { $0.rowIndex }
.sorted()
.asSet())
let indexesAreSequential: Bool = (indexes.map { $0 - 1 }.dropFirst() == indexes.dropLast())
let hasOneValidIndex: Bool = indexInfo.contains(where: { info -> Bool in
info.rowIndex >= updatedPageInfo.pageOffset && ( info.rowIndex >= updatedPageInfo.pageOffset && (
info.rowIndex < updatedPageInfo.currentCount || ( info.rowIndex < updatedPageInfo.currentCount || (
updatedPageInfo.currentCount < updatedPageInfo.pageSize && updatedPageInfo.currentCount < updatedPageInfo.pageSize &&
info.rowIndex <= (updatedPageInfo.pageOffset + updatedPageInfo.pageSize) info.rowIndex <= (updatedPageInfo.pageOffset + updatedPageInfo.pageSize)
) )
) )
})
return (indexesAreSequential && hasOneValidIndex ?
indexInfo.map { $0.rowId } :
indexInfo
.filter { info -> Bool in
info.rowIndex >= updatedPageInfo.pageOffset && (
info.rowIndex < updatedPageInfo.currentCount || (
updatedPageInfo.currentCount < updatedPageInfo.pageSize &&
info.rowIndex <= (updatedPageInfo.pageOffset + updatedPageInfo.pageSize)
)
)
}
.map { info -> Int64 in info.rowId }
)
}
let validChangeRowIds: [Int64] = determineValidChanges(for: itemIndexes)
let validRelatedChangeRowIds: [Int64] = determineValidChanges(for: relatedChangeIndexes)
let validRelatedDeletionRowIds: [Int64] = determineValidChanges(for: relatedDeletionIndexes)
let countBefore: Int = itemIndexes.filter { $0.rowIndex < updatedPageInfo.pageOffset }.count
// If the number of indexes doesn't match the number of rowIds then it means something changed
// resulting in an item being filtered out
func performRemovalsIfNeeded(for rowIds: Set<Int64>, indexes: [PagedData.RowIndexInfo]) {
let uniqueIndexes: Set<Int64> = indexes.map { $0.rowId }.asSet()
// If they have the same count then nothin was filtered out so do nothing
guard rowIds.count != uniqueIndexes.count else { return }
// Otherwise something was probably removed so try to remove it from the cache
let rowIdsRemoved: Set<Int64> = rowIds.subtracting(uniqueIndexes)
let preDeletionCount: Int = updatedDataCache.count
updatedDataCache = updatedDataCache.deleting(rowIds: Array(rowIdsRemoved))
// Lastly make sure there were actually changes before updating the page info
guard updatedDataCache.count != preDeletionCount else { return }
let dataSizeDiff: Int = (updatedDataCache.count - preDeletionCount)
updatedPageInfo = PagedData.PageInfo(
pageSize: updatedPageInfo.pageSize,
pageOffset: updatedPageInfo.pageOffset,
currentCount: (updatedPageInfo.currentCount + dataSizeDiff),
totalCount: (updatedPageInfo.totalCount + dataSizeDiff)
)
}
// Actually perform any required removals
performRemovalsIfNeeded(for: directRowIds, indexes: itemIndexes)
performRemovalsIfNeeded(for: pagedRowIdsForRelatedChanges, indexes: relatedChangeIndexes)
performRemovalsIfNeeded(for: pagedRowIdsForRelatedDeletions, indexes: relatedDeletionIndexes)
// Update the offset and totalCount even if the rows are outside of the current page (need to
// in order to ensure the 'load more' sections are accurate)
updatedPageInfo = PagedData.PageInfo(
pageSize: updatedPageInfo.pageSize,
pageOffset: (updatedPageInfo.pageOffset + countBefore),
currentCount: updatedPageInfo.currentCount,
totalCount: (
updatedPageInfo.totalCount +
changesToQuery
.filter { $0.kind == .insert }
.filter { validChangeRowIds.contains($0.rowId) }
.count
)
)
// If there are no valid row ids then early-out (at this point the pageInfo would have changed
// so we want to flat 'hasChanges' as true)
guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty || !validRelatedDeletionRowIds.isEmpty else {
let associatedData: AssociatedDataInfo = getAssociatedDataInfo(db, updatedPageInfo)
return (updatedDataCache, updatedPageInfo, true, associatedData)
}
// Fetch the inserted/updated rows
let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds + validRelatedDeletionRowIds).asSet())
let updatedItems: [T] = {
do { return try dataQuery(targetRowIds).fetchAll(db) }
catch {
SNLog("[PagedDatabaseObserver] Error fetching data during change: \(error)")
return []
} }
.map { info -> Int64 in info.rowId } }()
)
updatedDataCache = updatedDataCache.upserting(items: updatedItems)
// Update the currentCount for the upserted data
let dataSizeDiff: Int = (updatedDataCache.count - oldDataCount)
updatedPageInfo = PagedData.PageInfo(
pageSize: updatedPageInfo.pageSize,
pageOffset: updatedPageInfo.pageOffset,
currentCount: (updatedPageInfo.currentCount + dataSizeDiff),
totalCount: updatedPageInfo.totalCount
)
// Return the final updated data
let associatedData: AssociatedDataInfo = getAssociatedDataInfo(db, updatedPageInfo)
return (updatedDataCache, updatedPageInfo, true, associatedData)
}
.defaulting(to: (cache: dataCache, pageInfo: pageInfo, hasChanges: false, associatedData: []))
// Now that we have all of the changes, check if there were actually any changes
guard updatedData.hasChanges || updatedData.associatedData.contains(where: { hasChanges, _ in hasChanges }) else {
return
} }
let validChangeRowIds: [Int64] = determineValidChanges(for: itemIndexes)
let validRelatedChangeRowIds: [Int64] = determineValidChanges(for: relatedChangeIndexes)
let validRelatedDeletionRowIds: [Int64] = determineValidChanges(for: relatedDeletionIndexes)
let countBefore: Int = itemIndexes.filter { $0.rowIndex < updatedPageInfo.pageOffset }.count
// If the number of indexes doesn't match the number of rowIds then it means something changed // If the associated data changed then update the updatedCachedData with the updated associated data
// resulting in an item being filtered out var finalUpdatedDataCache: DataCache<T> = updatedData.cache
func performRemovalsIfNeeded(for rowIds: Set<Int64>, indexes: [PagedData.RowIndexInfo]) {
let uniqueIndexes: Set<Int64> = indexes.map { $0.rowId }.asSet()
// If they have the same count then nothin was filtered out so do nothing
guard rowIds.count != uniqueIndexes.count else { return }
// Otherwise something was probably removed so try to remove it from the cache
let rowIdsRemoved: Set<Int64> = rowIds.subtracting(uniqueIndexes)
let preDeletionCount: Int = updatedDataCache.count
updatedDataCache = updatedDataCache.deleting(rowIds: Array(rowIdsRemoved))
// Lastly make sure there were actually changes before updating the page info updatedData.associatedData.forEach { hasChanges, associatedData in
guard updatedDataCache.count != preDeletionCount else { return } guard updatedData.hasChanges || hasChanges else { return }
let dataSizeDiff: Int = (updatedDataCache.count - preDeletionCount)
updatedPageInfo = PagedData.PageInfo( finalUpdatedDataCache = associatedData.updateAssociatedData(to: finalUpdatedDataCache)
pageSize: updatedPageInfo.pageSize,
pageOffset: updatedPageInfo.pageOffset,
currentCount: (updatedPageInfo.currentCount + dataSizeDiff),
totalCount: (updatedPageInfo.totalCount + dataSizeDiff)
)
} }
// Actually perform any required removals
performRemovalsIfNeeded(for: directRowIds, indexes: itemIndexes)
performRemovalsIfNeeded(for: pagedRowIdsForRelatedChanges, indexes: relatedChangeIndexes)
performRemovalsIfNeeded(for: pagedRowIdsForRelatedDeletions, indexes: relatedDeletionIndexes)
// Update the offset and totalCount even if the rows are outside of the current page (need to
// in order to ensure the 'load more' sections are accurate)
updatedPageInfo = PagedData.PageInfo(
pageSize: updatedPageInfo.pageSize,
pageOffset: (updatedPageInfo.pageOffset + countBefore),
currentCount: updatedPageInfo.currentCount,
totalCount: (
updatedPageInfo.totalCount +
changesToQuery
.filter { $0.kind == .insert }
.filter { validChangeRowIds.contains($0.rowId) }
.count
)
)
// If there are no valid row ids then stop here (trigger updates though since the page info // Update the cache, pageInfo and the change callback
// has changes) self.dataCache.mutate { $0 = finalUpdatedDataCache }
guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty || !validRelatedDeletionRowIds.isEmpty else { self.pageInfo.mutate { $0 = updatedData.pageInfo }
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, true)
return
}
// Fetch the inserted/updated rows
let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds + validRelatedDeletionRowIds).asSet())
let updatedItems: [T] = (try? dataQuery(targetRowIds)
.fetchAll(db))
.defaulting(to: [])
// Process the upserted data // Trigger the unsorted change callback (the actual UI update triggering should eventually be run on
updatedDataCache = updatedDataCache.upserting(items: updatedItems) // the main thread via the `PagedData.processAndTriggerUpdates` function)
self.onChangeUnsorted(finalUpdatedDataCache.values, updatedData.pageInfo)
// Update the currentCount for the upserted data
let dataSizeDiff: Int = (updatedDataCache.count - oldDataCount)
updatedPageInfo = PagedData.PageInfo(
pageSize: updatedPageInfo.pageSize,
pageOffset: updatedPageInfo.pageOffset,
currentCount: (updatedPageInfo.currentCount + dataSizeDiff),
totalCount: updatedPageInfo.totalCount
)
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, true)
} }
public func databaseDidRollback(_ db: Database) {} public func databaseDidRollback(_ db: Database) {}

@ -19,9 +19,12 @@ class SynchronousStorage: Storage {
} }
override func writePublisher<T>( override func writePublisher<T>(
fileName: String = #file,
functionName: String = #function,
lineNumber: Int = #line,
updates: @escaping (Database) throws -> T updates: @escaping (Database) throws -> T
) -> AnyPublisher<T, Error> { ) -> AnyPublisher<T, Error> {
guard let result: T = super.write(updates: updates) else { guard let result: T = super.write(fileName: fileName, functionName: functionName, lineNumber: lineNumber, updates: updates) else {
return Fail(error: StorageError.generic) return Fail(error: StorageError.generic)
.eraseToAnyPublisher() .eraseToAnyPublisher()
} }

Loading…
Cancel
Save