From cac2831208ea1acbfc02b29f8cee725996576047 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Thu, 13 Apr 2023 09:26:34 +1000 Subject: [PATCH] Fixed a few issues with albums and attachments Fixed an issue where the JobRunner could schedule dependent jobs which were already running (eg. uploading attachments multiple times) Fixed an issue where the image used for quotes might not be the first in an album message Fixed an issue where sending an album message wouldn't send attachments in the correct order Fixed an issue where album attachments wouldn't be downloaded in the correct order --- .../Database/Models/Attachment.swift | 7 ++-- .../Jobs/Types/MessageSendJob.swift | 1 + .../Visible Messages/VisibleMessage.swift | 13 +++++-- .../Shared Models/MessageViewModel.swift | 6 +++- SessionUtilitiesKit/JobRunner/JobRunner.swift | 34 ++++++++++++++----- 5 files changed, 46 insertions(+), 15 deletions(-) diff --git a/SessionMessagingKit/Database/Models/Attachment.swift b/SessionMessagingKit/Database/Models/Attachment.swift index 17cab5424..3f661fc20 100644 --- a/SessionMessagingKit/Database/Models/Attachment.swift +++ b/SessionMessagingKit/Database/Models/Attachment.swift @@ -493,6 +493,7 @@ extension Attachment { public let interactionId: Int64 public let state: Attachment.State public let downloadUrl: String? + public let albumIndex: Int } public static func stateInfo(authorId: String, state: State? = nil) -> SQLRequest { @@ -510,7 +511,8 @@ extension Attachment { \(attachment[.id]) AS attachmentId, \(interaction[.id]) AS interactionId, \(attachment[.state]) AS state, - \(attachment[.downloadUrl]) AS downloadUrl + \(attachment[.downloadUrl]) AS downloadUrl, + IFNULL(\(interactionAttachment[.albumIndex]), 0) AS albumIndex FROM \(Attachment.self) @@ -555,7 +557,8 @@ extension Attachment { \(attachment[.id]) AS attachmentId, \(interaction[.id]) AS interactionId, \(attachment[.state]) AS state, - \(attachment[.downloadUrl]) AS downloadUrl + \(attachment[.downloadUrl]) AS downloadUrl, + IFNULL(\(interactionAttachment[.albumIndex]), 0) AS albumIndex FROM \(Attachment.self) diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index b835cbca5..eac85ddfb 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -57,6 +57,7 @@ public enum MessageSendJob: JobExecutor { .stateInfo(interactionId: interactionId) .fetchAll(db) let maybeFileIds: [String?] = allAttachmentStateInfo + .sorted { lhs, rhs in lhs.albumIndex < rhs.albumIndex } .map { Attachment.fileId(for: $0.downloadUrl) } let fileIds: [String] = maybeFileIds.compactMap { $0 } diff --git a/SessionMessagingKit/Messages/Visible Messages/VisibleMessage.swift b/SessionMessagingKit/Messages/Visible Messages/VisibleMessage.swift index 2780ae0eb..7132723b9 100644 --- a/SessionMessagingKit/Messages/Visible Messages/VisibleMessage.swift +++ b/SessionMessagingKit/Messages/Visible Messages/VisibleMessage.swift @@ -160,14 +160,21 @@ public final class VisibleMessage: Message { // Attachments - let attachments: [Attachment]? = try? Attachment.fetchAll(db, ids: self.attachmentIds) + let attachmentIdIndexes: [String: Int] = (try? InteractionAttachment + .filter(self.attachmentIds.contains(InteractionAttachment.Columns.attachmentId)) + .fetchAll(db)) + .defaulting(to: []) + .reduce(into: [:]) { result, next in result[next.attachmentId] = next.albumIndex } + let attachments: [Attachment] = (try? Attachment.fetchAll(db, ids: self.attachmentIds)) + .defaulting(to: []) + .sorted { lhs, rhs in (attachmentIdIndexes[lhs.id] ?? 0) < (attachmentIdIndexes[rhs.id] ?? 0) } - if !(attachments ?? []).allSatisfy({ $0.state == .uploaded }) { + if !attachments.allSatisfy({ $0.state == .uploaded }) { #if DEBUG preconditionFailure("Sending a message before all associated attachments have been uploaded.") #endif } - let attachmentProtos = (attachments ?? []).compactMap { $0.buildProto() } + let attachmentProtos = attachments.compactMap { $0.buildProto() } dataMessage.setAttachments(attachmentProtos) // Open group invitation diff --git a/SessionMessagingKit/Shared Models/MessageViewModel.swift b/SessionMessagingKit/Shared Models/MessageViewModel.swift index f45b3a6a4..0523224ed 100644 --- a/SessionMessagingKit/Shared Models/MessageViewModel.swift +++ b/SessionMessagingKit/Shared Models/MessageViewModel.swift @@ -663,6 +663,7 @@ public extension MessageViewModel { let attachmentIdColumn: SQL = SQL(stringLiteral: Attachment.Columns.id.name) let interactionAttachmentInteractionIdColumn: SQL = SQL(stringLiteral: InteractionAttachment.Columns.interactionId.name) let interactionAttachmentAttachmentIdColumn: SQL = SQL(stringLiteral: InteractionAttachment.Columns.attachmentId.name) + let interactionAttachmentAlbumIndexColumn: SQL = SQL(stringLiteral: InteractionAttachment.Columns.albumIndex.name) let numColumnsBeforeLinkedRecords: Int = 20 let finalGroupSQL: SQL = (groupSQL ?? "") @@ -743,7 +744,10 @@ public extension MessageViewModel { ) ) ) - LEFT JOIN \(InteractionAttachment.self) AS \(quoteInteractionAttachment) ON \(quoteInteractionAttachment).\(interactionAttachmentInteractionIdColumn) = \(quoteInteraction).\(idColumn) + LEFT JOIN \(InteractionAttachment.self) AS \(quoteInteractionAttachment) ON ( + \(quoteInteractionAttachment).\(interactionAttachmentInteractionIdColumn) = \(quoteInteraction).\(idColumn) AND + \(quoteInteractionAttachment).\(interactionAttachmentAlbumIndexColumn) = 0 + ) LEFT JOIN \(Attachment.self) AS \(ViewModel.quoteAttachmentKey) ON \(ViewModel.quoteAttachmentKey).\(attachmentIdColumn) = \(quoteInteractionAttachment).\(interactionAttachmentAttachmentIdColumn) LEFT JOIN \(LinkPreview.self) ON ( diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index e1f1408a8..25d352a1d 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -591,12 +591,17 @@ private final class JobQueue { } fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) { + let currentlyRunningJobIds: Set = jobsCurrentlyRunning.wrappedValue + queue.mutate { queue in // Avoid re-adding jobs to the queue that are already in it (this can // happen if the user sends the app to the background before the 'onActive' // jobs and then brings it back to the foreground) let jobsNotAlreadyInQueue: [Job] = jobs - .filter { job in !queue.contains(where: { $0.id == job.id }) } + .filter { job in + !currentlyRunningJobIds.contains(job.id ?? -1) && + !queue.contains(where: { $0.id == job.id }) + } queue.append(contentsOf: jobsNotAlreadyInQueue) } @@ -784,14 +789,20 @@ private final class JobQueue { guard dependencyInfo.jobs.isEmpty else { SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first") - /// Remove all jobs this one is dependant on from the queue and re-insert them at the start of the queue + /// Remove all jobs this one is dependant on that aren't currently running from the queue and re-insert them at the start + /// of the queue /// /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies /// are successfully completed + let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) + let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs + .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } + .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } + queue.mutate { queue in queue = queue - .filter { !dependencyInfo.jobs.contains($0) } - .inserting(contentsOf: Array(dependencyInfo.jobs), at: 0) + .filter { !dependencyJobsNotCurrentlyRunning.contains($0) } + .inserting(contentsOf: dependencyJobsNotCurrentlyRunning, at: 0) } handleJobDeferred(nextJob) return @@ -960,17 +971,22 @@ private final class JobQueue { default: break } - /// Now that the job has been completed we want to insert any jobs that were dependant on it to the start of the queue (the - /// most likely case is that we want an entire job chain to be completed at the same time rather than being blocked by other - /// unrelated jobs) + /// Now that the job has been completed we want to insert any jobs that were dependant on it, that aren't already running + /// to the start of the queue (the most likely case is that we want an entire job chain to be completed at the same time rather + /// than being blocked by other unrelated jobs) /// /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be /// removed from the queue, replaced by their dependencies if !dependantJobs.isEmpty { + let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) + let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs + .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } + .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } + queue.mutate { queue in queue = queue - .filter { !dependantJobs.contains($0) } - .inserting(contentsOf: dependantJobs, at: 0) + .filter { !dependantJobsNotCurrentlyRunning.contains($0) } + .inserting(contentsOf: dependantJobsNotCurrentlyRunning, at: 0) } }