From c73bb43c56981482db7bc1461ed0486e10675794 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Thu, 25 Aug 2022 12:55:41 +1000 Subject: [PATCH] Fixed a few bugs Fixed a bug where open group message deletion wasn't thread-specific Fixed a bug where the user couldn't delete pending/failed messages being sent to an open group Fixed a bug where deleting content from "related" tables wouldn't correctly trigger an update within the PagedDatabaseObserver Fixed a bug where a user that was an admin/mod of one open group would incorrectly appear to be an admin/mod of all open groups --- .../Context Menu/ContextMenuVC+Action.swift | 3 +- .../ConversationVC+Interaction.swift | 53 ++++++++- .../Open Groups/OpenGroupManager.swift | 1 + .../Shared Models/MessageViewModel.swift | 3 + .../Types/PagedDatabaseObserver.swift | 59 ++++++++-- SessionUtilitiesKit/JobRunner/JobRunner.swift | 102 +++++++++++++++--- 6 files changed, 199 insertions(+), 22 deletions(-) diff --git a/Session/Conversations/Context Menu/ContextMenuVC+Action.swift b/Session/Conversations/Context Menu/ContextMenuVC+Action.swift index fa6a64f2f..cbe87d1b0 100644 --- a/Session/Conversations/Context Menu/ContextMenuVC+Action.swift +++ b/Session/Conversations/Context Menu/ContextMenuVC+Action.swift @@ -150,7 +150,8 @@ extension ContextMenuVC { ) let canDelete: Bool = ( cellViewModel.threadVariant != .openGroup || - currentUserIsOpenGroupModerator + currentUserIsOpenGroupModerator || + cellViewModel.state == .failed ) let canBan: Bool = ( cellViewModel.threadVariant == .openGroup && diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index ff2ecc991..5a9a05bca 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -1134,7 +1134,58 @@ extension ConversationVC: .filter(id: cellViewModel.id) .asRequest(of: Int64.self) .fetchOne(db) - else { return } + else { + // If the message hasn't been sent yet then just delete locally + guard cellViewModel.state == .sending || cellViewModel.state == .failed else { return } + + // Retrieve any message send jobs for this interaction + let jobs: [Job] = Storage.shared + .read { db in + try? Job + .filter(Job.Columns.variant == Job.Variant.messageSend) + .filter(Job.Columns.interactionId == cellViewModel.id) + .fetchAll(db) + } + .defaulting(to: []) + + // If the job is currently running then wait until it's done before triggering + // the deletion + let targetJob: Job? = jobs.first(where: { JobRunner.isCurrentlyRunning($0) }) + + guard targetJob == nil else { + JobRunner.afterCurrentlyRunningJob(targetJob) { [weak self] result in + switch result { + // If it succeeded then we'll need to delete from the server so re-run + // this function (if we still don't have the server id for some reason + // then this would result in a local-only deletion which should be fine + case .succeeded: self?.delete(cellViewModel) + + // Otherwise we just need to cancel the pending job (in case it retries) + // and delete the interaction + default: + JobRunner.removePendingJob(targetJob) + + Storage.shared.writeAsync { db in + _ = try Interaction + .filter(id: cellViewModel.id) + .deleteAll(db) + } + } + } + return + } + + // If it's not currently running then remove any pending jobs (just to be safe) and + // delete the interaction locally + jobs.forEach { JobRunner.removePendingJob($0) } + + Storage.shared.writeAsync { db in + _ = try Interaction + .filter(id: cellViewModel.id) + .deleteAll(db) + } + return + } if remove { OpenGroupAPI diff --git a/SessionMessagingKit/Open Groups/OpenGroupManager.swift b/SessionMessagingKit/Open Groups/OpenGroupManager.swift index 0f532bb9b..553fe0786 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupManager.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupManager.swift @@ -608,6 +608,7 @@ public final class OpenGroupManager: NSObject { guard !messageServerIdsToRemove.isEmpty else { return } _ = try? Interaction + .filter(Interaction.Columns.threadId == openGroup.threadId) .filter(messageServerIdsToRemove.contains(Interaction.Columns.openGroupServerMessageId)) .deleteAll(db) } diff --git a/SessionMessagingKit/Shared Models/MessageViewModel.swift b/SessionMessagingKit/Shared Models/MessageViewModel.swift index 37e4751ab..861c82628 100644 --- a/SessionMessagingKit/Shared Models/MessageViewModel.swift +++ b/SessionMessagingKit/Shared Models/MessageViewModel.swift @@ -637,6 +637,7 @@ public extension MessageViewModel { let attachmentIdColumnLiteral: SQL = SQL(stringLiteral: Attachment.Columns.id.name) let groupMemberModeratorTableLiteral: SQL = SQL(stringLiteral: "groupMemberModerator") let groupMemberAdminTableLiteral: SQL = SQL(stringLiteral: "groupMemberAdmin") + let groupMemberGroupIdColumnLiteral: SQL = SQL(stringLiteral: GroupMember.Columns.groupId.name) let groupMemberProfileIdColumnLiteral: SQL = SQL(stringLiteral: GroupMember.Columns.profileId.name) let groupMemberRoleColumnLiteral: SQL = SQL(stringLiteral: GroupMember.Columns.role.name) @@ -715,11 +716,13 @@ public extension MessageViewModel { ) LEFT JOIN \(GroupMember.self) AS \(groupMemberModeratorTableLiteral) ON ( \(SQL("\(thread[.variant]) = \(SessionThread.Variant.openGroup)")) AND + \(groupMemberModeratorTableLiteral).\(groupMemberGroupIdColumnLiteral) = \(interaction[.threadId]) AND \(groupMemberModeratorTableLiteral).\(groupMemberProfileIdColumnLiteral) = \(interaction[.authorId]) AND \(SQL("\(groupMemberModeratorTableLiteral).\(groupMemberRoleColumnLiteral) = \(GroupMember.Role.moderator)")) ) LEFT JOIN \(GroupMember.self) AS \(groupMemberAdminTableLiteral) ON ( \(SQL("\(thread[.variant]) = \(SessionThread.Variant.openGroup)")) AND + \(groupMemberAdminTableLiteral).\(groupMemberGroupIdColumnLiteral) = \(interaction[.threadId]) AND \(groupMemberAdminTableLiteral).\(groupMemberProfileIdColumnLiteral) = \(interaction[.authorId]) AND \(SQL("\(groupMemberAdminTableLiteral).\(groupMemberRoleColumnLiteral) = \(GroupMember.Role.admin)")) ) diff --git a/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift b/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift index b9af5b36b..c99da5e3f 100644 --- a/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift +++ b/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift @@ -110,9 +110,39 @@ public class PagedDatabaseObserver: TransactionObserver where // changes only include table and column info at this stage guard allObservedTableNames.contains(event.tableName) else { return } + // When generating the tracked change we need to check if the change was + // a deletion to a related table (if so then once the change is performed + // there won't be a way to associated the deleted related record to the + // original so we need to retrieve the association in here) + let trackedChange: PagedData.TrackedChange = { + guard + event.tableName != pagedTableName, + event.kind == .delete, + let observedChange: PagedData.ObservedChanges = observedTableChangeTypes[event.tableName], + let joinToPagedType: SQL = observedChange.joinToPagedType + else { return PagedData.TrackedChange(event: event) } + + // Retrieve the pagedRowId for the related value that is + // getting deleted + let pagedRowIds: [Int64] = Storage.shared + .read { db in + PagedData.pagedRowIdsForRelatedRowIds( + db, + tableName: event.tableName, + pagedTableName: pagedTableName, + relatedRowIds: [event.rowID], + joinToPagedType: joinToPagedType + ) + } + .defaulting(to: []) + + return PagedData.TrackedChange(event: event, pagedRowIdsForRelatedDeletion: pagedRowIds) + }() + // The 'event' object only exists during this method so we need to copy the info // from it, otherwise it will cease to exist after this metod call finishes changesInCommit.mutate { $0.insert(PagedData.TrackedChange(event: event)) } + changesInCommit.mutate { $0.insert(trackedChange) } } // Note: We will process all updates which come through this method even if @@ -180,13 +210,17 @@ public class PagedDatabaseObserver: TransactionObserver where .filter { $0.tableName == pagedTableName } let relatedChanges: [String: [PagedData.TrackedChange]] = committedChanges .filter { $0.tableName != pagedTableName } + .filter { $0.kind != .delete } .reduce(into: [:]) { result, next in guard observedTableChangeTypes[next.tableName] != nil else { return } result[next.tableName] = (result[next.tableName] ?? []).appending(next) } + let relatedDeletions: [PagedData.TrackedChange] = committedChanges + .filter { $0.tableName != pagedTableName } + .filter { $0.kind == .delete } - guard !directChanges.isEmpty || !relatedChanges.isEmpty else { + guard !directChanges.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else { updateDataAndCallbackIfNeeded(self.dataCache.wrappedValue, self.pageInfo.wrappedValue, false) return } @@ -219,7 +253,7 @@ public class PagedDatabaseObserver: TransactionObserver where let changesToQuery: [PagedData.TrackedChange] = directChanges .filter { $0.kind != .delete } - guard !changesToQuery.isEmpty || !relatedChanges.isEmpty else { + guard !changesToQuery.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else { updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty) return } @@ -248,7 +282,7 @@ public class PagedDatabaseObserver: TransactionObserver where .asSet() }() - guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty else { + guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty || !relatedDeletions.isEmpty else { updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty) return } @@ -270,6 +304,16 @@ public class PagedDatabaseObserver: TransactionObserver where orderSQL: orderSQL, filterSQL: filterSQL ) + let relatedDeletionIndexes: [PagedData.RowIndexInfo] = PagedData.indexes( + db, + rowIds: relatedDeletions + .compactMap { $0.pagedRowIdsForRelatedDeletion } + .flatMap { $0 }, + tableName: pagedTableName, + requiredJoinSQL: joinSQL, + orderSQL: orderSQL, + filterSQL: filterSQL + ) // Determine if the indexes for the row ids should be displayed on the screen and remove any // which shouldn't - values less than 'currentCount' or if there is at least one value less than @@ -306,6 +350,7 @@ public class PagedDatabaseObserver: TransactionObserver where } let validChangeRowIds: [Int64] = determineValidChanges(for: itemIndexes) let validRelatedChangeRowIds: [Int64] = determineValidChanges(for: relatedChangeIndexes) + let validRelatedDeletionRowIds: [Int64] = determineValidChanges(for: relatedDeletionIndexes) let countBefore: Int = itemIndexes.filter { $0.rowIndex < updatedPageInfo.pageOffset }.count // Update the offset and totalCount even if the rows are outside of the current page (need to @@ -325,13 +370,13 @@ public class PagedDatabaseObserver: TransactionObserver where // If there are no valid row ids then stop here (trigger updates though since the page info // has changes) - guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty else { + guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty || !validRelatedDeletionRowIds.isEmpty else { updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, true) return } // Fetch the inserted/updated rows - let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds).asSet()) + let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds + validRelatedDeletionRowIds).asSet()) let updatedItems: [T] = (try? dataQuery(targetRowIds) .fetchAll(db)) .defaulting(to: []) @@ -904,11 +949,13 @@ public enum PagedData { let tableName: String let kind: DatabaseEvent.Kind let rowId: Int64 + let pagedRowIdsForRelatedDeletion: [Int64]? - init(event: DatabaseEvent) { + init(event: DatabaseEvent, pagedRowIdsForRelatedDeletion: [Int64]? = nil) { self.tableName = event.tableName self.kind = event.kind self.rowId = event.rowID + self.pagedRowIdsForRelatedDeletion = pagedRowIdsForRelatedDeletion } } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index b49666600..d336f624b 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -36,6 +36,13 @@ public protocol JobExecutor { } public final class JobRunner { + public enum JobResult { + case succeeded + case failed + case deferred + case notFound + } + private static let blockingQueue: Atomic = Atomic( JobQueue( type: .blocking, @@ -332,6 +339,15 @@ public final class JobRunner { .defaulting(to: [:]) } + public static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { + guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else { + callback(.notFound) + return + } + + queue.afterCurrentlyRunningJob(jobId, callback: callback) + } + public static func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false } guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false } @@ -339,6 +355,12 @@ public final class JobRunner { return targetQueue.hasPendingOrRunningJob(with: detailsData) } + public static func removePendingJob(_ job: Job?) { + guard let job: Job = job, let jobId: Int64 = job.id else { return } + + queues.wrappedValue[job.variant]?.removePendingJob(jobId) + } + // MARK: - Convenience fileprivate static func getRetryInterval(for job: Job) -> TimeInterval { @@ -445,6 +467,7 @@ private final class JobQueue { fileprivate var isRunning: Atomic = Atomic(false) private var queue: Atomic<[Job]> = Atomic([]) private var jobsCurrentlyRunning: Atomic> = Atomic([]) + private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:]) private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:]) private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) @@ -560,12 +583,29 @@ private final class JobQueue { return detailsForCurrentlyRunningJobs.wrappedValue } + fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) { + guard isCurrentlyRunning(jobId) else { + callback(.notFound) + return + } + + jobCallbacks.mutate { jobCallbacks in + jobCallbacks[jobId] = (jobCallbacks[jobId] ?? []).appending(callback) + } + } + fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool { let pendingJobs: [Job] = queue.wrappedValue return pendingJobs.contains { job in job.details == detailsData } } + fileprivate func removePendingJob(_ jobId: Int64) { + queue.mutate { queue in + queue = queue.filter { $0.id != jobId } + } + } + // MARK: - Job Running fileprivate func start(force: Bool = false) { @@ -900,10 +940,8 @@ private final class JobQueue { } } - // The job is removed from the queue before it runs so all we need to to is remove it - // from the 'currentlyRunning' set and start the next one - jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + // Perform job cleanup and start the next job + performCleanUp(for: job, result: .succeeded) internalQueue.async { [weak self] in self?.runNextJob() } @@ -914,8 +952,7 @@ private final class JobQueue { private func handleJobFailed(_ job: Job, error: Error?, permanentFailure: Bool) { guard Storage.shared.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else { SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled") - jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + performCleanUp(for: job, result: .failed) internalQueue.async { [weak self] in self?.runNextJob() @@ -923,12 +960,30 @@ private final class JobQueue { return } - // If this is the blocking queue and a "blocking" job failed then rerun it immediately + // If this is the blocking queue and a "blocking" job failed then rerun it + // immediately (in this case we don't trigger any job callbacks because the + // job isn't actually done, it's going to try again immediately) if self.type == .blocking && job.shouldBlock { SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; retrying immediately") - jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } - queue.mutate { $0.insert(job, at: 0) } + + // If it was a possible deferral loop then we don't actually want to + // retry the job (even if it's a blocking one, this gives a small chance + // that the app could continue to function) + let wasPossibleDeferralLoop: Bool = { + if let error = error, case JobRunnerError.possibleDeferralLoop = error { return true } + + return false + }() + performCleanUp( + for: job, + result: .failed, + shouldTriggerCallbacks: wasPossibleDeferralLoop + ) + + // Only add it back to the queue if it wasn't a deferral loop + if !wasPossibleDeferralLoop { + queue.mutate { $0.insert(job, at: 0) } + } internalQueue.async { [weak self] in self?.runNextJob() @@ -1003,8 +1058,7 @@ private final class JobQueue { } } - jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + performCleanUp(for: job, result: .failed) internalQueue.async { [weak self] in self?.runNextJob() } @@ -1014,8 +1068,7 @@ private final class JobQueue { /// on other jobs, and it should automatically manage those dependencies) private func handleJobDeferred(_ job: Job) { var stuckInDeferLoop: Bool = false - jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + deferLoopTracker.mutate { guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else { $0 = $0.setting( @@ -1055,8 +1108,29 @@ private final class JobQueue { return } + performCleanUp(for: job, result: .deferred) internalQueue.async { [weak self] in self?.runNextJob() } } + + private func performCleanUp(for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true) { + // The job is removed from the queue before it runs so all we need to to is remove it + // from the 'currentlyRunning' set + jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } + detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + + guard shouldTriggerCallbacks else { return } + + // Run any job callbacks now that it's done + var jobCallbacksToRun: [(JobRunner.JobResult) -> ()] = [] + jobCallbacks.mutate { jobCallbacks in + jobCallbacksToRun = (jobCallbacks[job.id] ?? []) + jobCallbacks = jobCallbacks.removingValue(forKey: job.id) + } + + DispatchQueue.global(qos: .default).async { + jobCallbacksToRun.forEach { $0(result) } + } + } }