Fixed a few issues with albums and attachments

Fixed an issue where the JobRunner could schedule dependent jobs which were already running (eg. uploading attachments multiple times)
Fixed an issue where the image used for quotes might not be the first in an album message
Fixed an issue where sending an album message wouldn't send attachments in the correct order
Fixed an issue where album attachments wouldn't be downloaded in the correct order
pull/823/head
Morgan Pretty 3 years ago
parent 70ce967df6
commit cac2831208

@ -493,6 +493,7 @@ extension Attachment {
public let interactionId: Int64 public let interactionId: Int64
public let state: Attachment.State public let state: Attachment.State
public let downloadUrl: String? public let downloadUrl: String?
public let albumIndex: Int
} }
public static func stateInfo(authorId: String, state: State? = nil) -> SQLRequest<Attachment.StateInfo> { public static func stateInfo(authorId: String, state: State? = nil) -> SQLRequest<Attachment.StateInfo> {
@ -510,7 +511,8 @@ extension Attachment {
\(attachment[.id]) AS attachmentId, \(attachment[.id]) AS attachmentId,
\(interaction[.id]) AS interactionId, \(interaction[.id]) AS interactionId,
\(attachment[.state]) AS state, \(attachment[.state]) AS state,
\(attachment[.downloadUrl]) AS downloadUrl \(attachment[.downloadUrl]) AS downloadUrl,
IFNULL(\(interactionAttachment[.albumIndex]), 0) AS albumIndex
FROM \(Attachment.self) FROM \(Attachment.self)
@ -555,7 +557,8 @@ extension Attachment {
\(attachment[.id]) AS attachmentId, \(attachment[.id]) AS attachmentId,
\(interaction[.id]) AS interactionId, \(interaction[.id]) AS interactionId,
\(attachment[.state]) AS state, \(attachment[.state]) AS state,
\(attachment[.downloadUrl]) AS downloadUrl \(attachment[.downloadUrl]) AS downloadUrl,
IFNULL(\(interactionAttachment[.albumIndex]), 0) AS albumIndex
FROM \(Attachment.self) FROM \(Attachment.self)

@ -57,6 +57,7 @@ public enum MessageSendJob: JobExecutor {
.stateInfo(interactionId: interactionId) .stateInfo(interactionId: interactionId)
.fetchAll(db) .fetchAll(db)
let maybeFileIds: [String?] = allAttachmentStateInfo let maybeFileIds: [String?] = allAttachmentStateInfo
.sorted { lhs, rhs in lhs.albumIndex < rhs.albumIndex }
.map { Attachment.fileId(for: $0.downloadUrl) } .map { Attachment.fileId(for: $0.downloadUrl) }
let fileIds: [String] = maybeFileIds.compactMap { $0 } let fileIds: [String] = maybeFileIds.compactMap { $0 }

@ -160,14 +160,21 @@ public final class VisibleMessage: Message {
// Attachments // Attachments
let attachments: [Attachment]? = try? Attachment.fetchAll(db, ids: self.attachmentIds) let attachmentIdIndexes: [String: Int] = (try? InteractionAttachment
.filter(self.attachmentIds.contains(InteractionAttachment.Columns.attachmentId))
if !(attachments ?? []).allSatisfy({ $0.state == .uploaded }) { .fetchAll(db))
.defaulting(to: [])
.reduce(into: [:]) { result, next in result[next.attachmentId] = next.albumIndex }
let attachments: [Attachment] = (try? Attachment.fetchAll(db, ids: self.attachmentIds))
.defaulting(to: [])
.sorted { lhs, rhs in (attachmentIdIndexes[lhs.id] ?? 0) < (attachmentIdIndexes[rhs.id] ?? 0) }
if !attachments.allSatisfy({ $0.state == .uploaded }) {
#if DEBUG #if DEBUG
preconditionFailure("Sending a message before all associated attachments have been uploaded.") preconditionFailure("Sending a message before all associated attachments have been uploaded.")
#endif #endif
} }
let attachmentProtos = (attachments ?? []).compactMap { $0.buildProto() } let attachmentProtos = attachments.compactMap { $0.buildProto() }
dataMessage.setAttachments(attachmentProtos) dataMessage.setAttachments(attachmentProtos)
// Open group invitation // Open group invitation

@ -663,6 +663,7 @@ public extension MessageViewModel {
let attachmentIdColumn: SQL = SQL(stringLiteral: Attachment.Columns.id.name) let attachmentIdColumn: SQL = SQL(stringLiteral: Attachment.Columns.id.name)
let interactionAttachmentInteractionIdColumn: SQL = SQL(stringLiteral: InteractionAttachment.Columns.interactionId.name) let interactionAttachmentInteractionIdColumn: SQL = SQL(stringLiteral: InteractionAttachment.Columns.interactionId.name)
let interactionAttachmentAttachmentIdColumn: SQL = SQL(stringLiteral: InteractionAttachment.Columns.attachmentId.name) let interactionAttachmentAttachmentIdColumn: SQL = SQL(stringLiteral: InteractionAttachment.Columns.attachmentId.name)
let interactionAttachmentAlbumIndexColumn: SQL = SQL(stringLiteral: InteractionAttachment.Columns.albumIndex.name)
let numColumnsBeforeLinkedRecords: Int = 20 let numColumnsBeforeLinkedRecords: Int = 20
let finalGroupSQL: SQL = (groupSQL ?? "") let finalGroupSQL: SQL = (groupSQL ?? "")
@ -743,7 +744,10 @@ public extension MessageViewModel {
) )
) )
) )
LEFT JOIN \(InteractionAttachment.self) AS \(quoteInteractionAttachment) ON \(quoteInteractionAttachment).\(interactionAttachmentInteractionIdColumn) = \(quoteInteraction).\(idColumn) LEFT JOIN \(InteractionAttachment.self) AS \(quoteInteractionAttachment) ON (
\(quoteInteractionAttachment).\(interactionAttachmentInteractionIdColumn) = \(quoteInteraction).\(idColumn) AND
\(quoteInteractionAttachment).\(interactionAttachmentAlbumIndexColumn) = 0
)
LEFT JOIN \(Attachment.self) AS \(ViewModel.quoteAttachmentKey) ON \(ViewModel.quoteAttachmentKey).\(attachmentIdColumn) = \(quoteInteractionAttachment).\(interactionAttachmentAttachmentIdColumn) LEFT JOIN \(Attachment.self) AS \(ViewModel.quoteAttachmentKey) ON \(ViewModel.quoteAttachmentKey).\(attachmentIdColumn) = \(quoteInteractionAttachment).\(interactionAttachmentAttachmentIdColumn)
LEFT JOIN \(LinkPreview.self) ON ( LEFT JOIN \(LinkPreview.self) ON (

@ -591,12 +591,17 @@ private final class JobQueue {
} }
fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) { fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) {
let currentlyRunningJobIds: Set<Int64> = jobsCurrentlyRunning.wrappedValue
queue.mutate { queue in queue.mutate { queue in
// Avoid re-adding jobs to the queue that are already in it (this can // Avoid re-adding jobs to the queue that are already in it (this can
// happen if the user sends the app to the background before the 'onActive' // happen if the user sends the app to the background before the 'onActive'
// jobs and then brings it back to the foreground) // jobs and then brings it back to the foreground)
let jobsNotAlreadyInQueue: [Job] = jobs let jobsNotAlreadyInQueue: [Job] = jobs
.filter { job in !queue.contains(where: { $0.id == job.id }) } .filter { job in
!currentlyRunningJobIds.contains(job.id ?? -1) &&
!queue.contains(where: { $0.id == job.id })
}
queue.append(contentsOf: jobsNotAlreadyInQueue) queue.append(contentsOf: jobsNotAlreadyInQueue)
} }
@ -784,14 +789,20 @@ private final class JobQueue {
guard dependencyInfo.jobs.isEmpty else { guard dependencyInfo.jobs.isEmpty else {
SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first") SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first")
/// Remove all jobs this one is dependant on from the queue and re-insert them at the start of the queue /// Remove all jobs this one is dependant on that aren't currently running from the queue and re-insert them at the start
/// of the queue
/// ///
/// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies
/// are successfully completed /// are successfully completed
let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys)
let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs
.filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) }
.sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) }
queue.mutate { queue in queue.mutate { queue in
queue = queue queue = queue
.filter { !dependencyInfo.jobs.contains($0) } .filter { !dependencyJobsNotCurrentlyRunning.contains($0) }
.inserting(contentsOf: Array(dependencyInfo.jobs), at: 0) .inserting(contentsOf: dependencyJobsNotCurrentlyRunning, at: 0)
} }
handleJobDeferred(nextJob) handleJobDeferred(nextJob)
return return
@ -960,17 +971,22 @@ private final class JobQueue {
default: break default: break
} }
/// Now that the job has been completed we want to insert any jobs that were dependant on it to the start of the queue (the /// Now that the job has been completed we want to insert any jobs that were dependant on it, that aren't already running
/// most likely case is that we want an entire job chain to be completed at the same time rather than being blocked by other /// to the start of the queue (the most likely case is that we want an entire job chain to be completed at the same time rather
/// unrelated jobs) /// than being blocked by other unrelated jobs)
/// ///
/// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be
/// removed from the queue, replaced by their dependencies /// removed from the queue, replaced by their dependencies
if !dependantJobs.isEmpty { if !dependantJobs.isEmpty {
let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys)
let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs
.filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) }
.sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) }
queue.mutate { queue in queue.mutate { queue in
queue = queue queue = queue
.filter { !dependantJobs.contains($0) } .filter { !dependantJobsNotCurrentlyRunning.contains($0) }
.inserting(contentsOf: dependantJobs, at: 0) .inserting(contentsOf: dependantJobsNotCurrentlyRunning, at: 0)
} }
} }

Loading…
Cancel
Save