From 0abb09c0cfebad4699585ab155e4303627e6e587 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Wed, 18 Jan 2023 10:56:18 +1100 Subject: [PATCH] Fixed a few small issues found when testing Fixed a couple of issues with the ConfigurationSyncJob logic Moved the proto parsing out of the MessageReceiveJob write block (to reduce time blocking writes) Removed difficulty from the SendMessageResponse (deprecated and removed) --- .../Conversations/ConversationViewModel.swift | 3 +- .../Jobs/Types/ConfigurationSyncJob.swift | 44 ++++++++++++++----- .../Jobs/Types/MessageReceiveJob.swift | 31 +++++++++---- .../Models/SendMessageResponse.swift | 3 -- SessionSnodeKit/Networking/SnodeAPI.swift | 4 +- .../Networking/BatchResponse.swift | 8 +++- 6 files changed, 66 insertions(+), 27 deletions(-) diff --git a/Session/Conversations/ConversationViewModel.swift b/Session/Conversations/ConversationViewModel.swift index c5a974041..113a6b148 100644 --- a/Session/Conversations/ConversationViewModel.swift +++ b/Session/Conversations/ConversationViewModel.swift @@ -104,10 +104,11 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { public private(set) lazy var threadData: SessionThreadViewModel = SessionThreadViewModel( threadId: self.threadId, threadVariant: self.initialThreadVariant, + threadIsNoteToSelf: (self.threadId == getUserHexEncodedPublicKey()), currentUserIsClosedGroupMember: (self.initialThreadVariant != .closedGroup ? nil : Storage.shared.read { db in - try GroupMember + GroupMember .filter(GroupMember.Columns.groupId == self.threadId) .filter(GroupMember.Columns.profileId == getUserHexEncodedPublicKey(db)) .filter(GroupMember.Columns.role == GroupMember.Role.standard) diff --git a/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift b/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift index 563c34db2..0ff78309d 100644 --- a/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift +++ b/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift @@ -110,10 +110,17 @@ public enum ConfigurationSyncJob: JobExecutor { .collect() .eraseToAnyPublisher() } - .map { (responses: [HTTP.BatchResponse]) -> [SuccessfulChange] in + .flatMap { (responses: [HTTP.BatchResponse]) -> AnyPublisher<[SuccessfulChange], Error> in + // We make a sequence call for this so it's possible to get fewer responses than + // expected so if that happens fail and re-run later + guard responses.count == pendingSwarmConfigChanges.count else { + return Fail(error: HTTPError.invalidResponse) + .eraseToAnyPublisher() + } + // Process the response data into an easy to understand for (this isn't strictly // needed but the code gets convoluted without this) - zip(responses, pendingSwarmConfigChanges) + let successfulChanges: [SuccessfulChange] = zip(responses, pendingSwarmConfigChanges) .compactMap { (batchResponse: HTTP.BatchResponse, pendingSwarmChange: SingleDestinationChanges) -> [SuccessfulChange]? in let maybePublicKey: String? = { switch pendingSwarmChange.destination { @@ -145,6 +152,7 @@ public enum ConfigurationSyncJob: JobExecutor { guard let subResponse: HTTP.BatchSubResponse = (next.response as? HTTP.BatchSubResponse), 200...299 ~= subResponse.code, + !subResponse.failedToParseBody, let sendMessageResponse: SendMessagesResponse = subResponse.body else { return } @@ -162,6 +170,10 @@ public enum ConfigurationSyncJob: JobExecutor { } } .flatMap { $0 } + + return Just(successfulChanges) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() } .map { (successfulChanges: [SuccessfulChange]) -> [ConfigDump] in // Now that we have the successful changes, we need to mark them as pushed and @@ -189,6 +201,13 @@ public enum ConfigurationSyncJob: JobExecutor { } } .sinkUntilComplete( + receiveCompletion: { result in + switch result { + case .finished: break + case .failure(let error): + failure(job, error, false) + } + }, receiveValue: { (configDumps: [ConfigDump]) in // Flag to indicate whether the job should be finished or will run again var shouldFinishCurrentJob: Bool = false @@ -209,12 +228,17 @@ public enum ConfigurationSyncJob: JobExecutor { let existingJob: Job = try? Job .filter(Job.Columns.id != job.id) .filter(Job.Columns.variant == Job.Variant.configurationSync) - .fetchOne(db), - !JobRunner.isCurrentlyRunning(existingJob) + .fetchOne(db) { - _ = try existingJob - .with(nextRunTimestamp: nextRunTimestamp) - .saved(db) + // If the next job isn't currently running then delay it's start time + // until the 'nextRunTimestamp' + if !JobRunner.isCurrentlyRunning(existingJob) { + _ = try existingJob + .with(nextRunTimestamp: nextRunTimestamp) + .saved(db) + } + + // If there is another job then we should finish this one shouldFinishCurrentJob = true return job } @@ -302,10 +326,10 @@ public extension ConfigurationSyncJob { @discardableResult static func createOrUpdateIfNeeded(_ db: Database) -> Job { // Try to get an existing job (if there is one that's not running) if - let existingJob: Job = try? Job + let existingJobs: [Job] = try? Job .filter(Job.Columns.variant == Job.Variant.configurationSync) - .fetchOne(db), - !JobRunner.isCurrentlyRunning(existingJob) + .fetchAll(db), + let existingJob: Job = existingJobs.first(where: { !JobRunner.isCurrentlyRunning($0) }) { return existingJob } diff --git a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift index e3ac101c8..b928b9e85 100644 --- a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift @@ -25,9 +25,24 @@ public enum MessageReceiveJob: JobExecutor { } var updatedJob: Job = job - var leastSevereError: Error? - let nonConfigMessages: [Details.MessageInfo] = details.messages + var lastError: Error? + var remainingMessagesToProcess: [Details.MessageInfo] = [] + let nonConfigMessages: [(info: Details.MessageInfo, proto: SNProtoContent)] = details.messages .filter { $0.variant != .sharedConfigMessage } + .compactMap { messageInfo -> (info: Details.MessageInfo, proto: SNProtoContent)? in + do { + return (messageInfo, try SNProtoContent.parseData(messageInfo.serializedProtoData)) + } + catch { + SNLog("Couldn't receive message due to error: \(error)") + lastError = error + + // We failed to process this message but it is a retryable error + // so add it to the list to re-process + remainingMessagesToProcess.append(messageInfo) + return nil + } + } let sharedConfigMessages: [SharedConfigMessage] = details.messages .compactMap { $0.message as? SharedConfigMessage } @@ -40,14 +55,12 @@ public enum MessageReceiveJob: JobExecutor { ) // Handle the remaining messages - var remainingMessagesToProcess: [Details.MessageInfo] = [] - - for messageInfo in nonConfigMessages { + for (messageInfo, protoContent) in nonConfigMessages { do { try MessageReceiver.handle( db, message: messageInfo.message, - associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData), + associatedWithProto: protoContent, openGroupId: nil ) } @@ -71,7 +84,7 @@ public enum MessageReceiveJob: JobExecutor { default: SNLog("Couldn't receive message due to error: \(error)") - leastSevereError = error + lastError = error // We failed to process this message but it is a retryable error // so add it to the list to re-process @@ -94,12 +107,12 @@ public enum MessageReceiveJob: JobExecutor { } // Handle the result - switch leastSevereError { + switch lastError { case let error as MessageReceiverError where !error.isRetryable: failure(updatedJob, error, true) case .some(let error): - failure(updatedJob, error, false) // TODO: Confirm the 'noKeyPair' errors here aren't an issue + failure(updatedJob, error, false) case .none: success(updatedJob, false) diff --git a/SessionSnodeKit/Models/SendMessageResponse.swift b/SessionSnodeKit/Models/SendMessageResponse.swift index d49dd9b55..052188a0e 100644 --- a/SessionSnodeKit/Models/SendMessageResponse.swift +++ b/SessionSnodeKit/Models/SendMessageResponse.swift @@ -4,12 +4,10 @@ import Foundation public class SendMessagesResponse: SnodeRecursiveResponse { private enum CodingKeys: String, CodingKey { - case difficulty case hash case swarm } - public let difficulty: Int64 public let hash: String // MARK: - Initialization @@ -17,7 +15,6 @@ public class SendMessagesResponse: SnodeRecursiveResponse = try decoder.container(keyedBy: CodingKeys.self) - difficulty = try container.decode(Int64.self, forKey: .difficulty) hash = try container.decode(String.self, forKey: .hash) try super.init(from: decoder) diff --git a/SessionSnodeKit/Networking/SnodeAPI.swift b/SessionSnodeKit/Networking/SnodeAPI.swift index 694213d93..19531ca46 100644 --- a/SessionSnodeKit/Networking/SnodeAPI.swift +++ b/SessionSnodeKit/Networking/SnodeAPI.swift @@ -549,7 +549,7 @@ public final class SnodeAPI { var requests: [SnodeAPI.BatchRequest.Info] = targetedMessages .map { message, namespace in // Check if this namespace requires authentication - guard namespace.requiresReadAuthentication else { + guard namespace.requiresWriteAuthentication else { return BatchRequest.Info( request: SnodeRequest( endpoint: .sendMessage, @@ -618,7 +618,7 @@ public final class SnodeAPI { using: dependencies ) .eraseToAnyPublisher() - .decoded(as: responseTypes, using: dependencies) + .decoded(as: responseTypes, requireAllResults: false, using: dependencies) .eraseToAnyPublisher() } .retry(maxRetryCount) diff --git a/SessionUtilitiesKit/Networking/BatchResponse.swift b/SessionUtilitiesKit/Networking/BatchResponse.swift index 8b83fc6c2..05c8a77fe 100644 --- a/SessionUtilitiesKit/Networking/BatchResponse.swift +++ b/SessionUtilitiesKit/Networking/BatchResponse.swift @@ -81,6 +81,7 @@ public extension Decodable { public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure == Error { func decoded( as types: HTTP.BatchResponseTypes, + requireAllResults: Bool = true, using dependencies: Dependencies = Dependencies() ) -> AnyPublisher { self @@ -101,7 +102,7 @@ public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure case let anyArray as [Any]: dataArray = anyArray.compactMap { try? JSONSerialization.data(withJSONObject: $0) } - guard dataArray.count == types.count else { + guard !requireAllResults || dataArray.count == types.count else { return Fail(error: HTTPError.parsingFailed) .eraseToAnyPublisher() } @@ -110,7 +111,10 @@ public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure guard let resultsArray: [Data] = (anyDict["results"] as? [Any])? .compactMap({ try? JSONSerialization.data(withJSONObject: $0) }), - resultsArray.count == types.count + ( + !requireAllResults || + resultsArray.count == types.count + ) else { return Fail(error: HTTPError.parsingFailed) .eraseToAnyPublisher()