Merge remote-tracking branch 'origin/fix/sync-message-issues' into feature/lib-quic-integration

# Conflicts:
#	.drone.jsonnet
#	SessionSnodeKit/Networking/SnodeAPI.swift
#	SessionSnodeKit/Types/OnionRequestAPIError.swift
#	SessionSnodeKit/Types/SnodeAPIError.swift
pull/960/head
Morgan Pretty 1 year ago
commit 8c467dc511

@ -2861,11 +2861,11 @@
C300A5BB2554AFFB00555489 /* Messages */ = { C300A5BB2554AFFB00555489 /* Messages */ = {
isa = PBXGroup; isa = PBXGroup;
children = ( children = (
C300A5C62554B02D00555489 /* Visible Messages */,
C300A5C72554B03900555489 /* Control Messages */,
C3C2A74325539EB700C340D1 /* Message.swift */, C3C2A74325539EB700C340D1 /* Message.swift */,
C352A30825574D8400338F3E /* Message+Destination.swift */, C352A30825574D8400338F3E /* Message+Destination.swift */,
943C6D812B75E061004ACE64 /* Message+DisappearingMessages.swift */, 943C6D812B75E061004ACE64 /* Message+DisappearingMessages.swift */,
C300A5C62554B02D00555489 /* Visible Messages */,
C300A5C72554B03900555489 /* Control Messages */,
); );
path = Messages; path = Messages;
sourceTree = "<group>"; sourceTree = "<group>";
@ -7364,6 +7364,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "com.loki-project.SignalUtilitiesKit"; PRODUCT_BUNDLE_IDENTIFIER = "com.loki-project.SignalUtilitiesKit";
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)"; PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SKIP_INSTALL = YES; SKIP_INSTALL = YES;
STRIP_INSTALLED_PRODUCT = NO;
SUPPORTS_MACCATALYST = NO; SUPPORTS_MACCATALYST = NO;
SWIFT_ACTIVE_COMPILATION_CONDITIONS = DEBUG; SWIFT_ACTIVE_COMPILATION_CONDITIONS = DEBUG;
SWIFT_OPTIMIZATION_LEVEL = "-Onone"; SWIFT_OPTIMIZATION_LEVEL = "-Onone";
@ -7533,6 +7534,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "com.loki-project.SessionSnodeKit"; PRODUCT_BUNDLE_IDENTIFIER = "com.loki-project.SessionSnodeKit";
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)"; PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SKIP_INSTALL = YES; SKIP_INSTALL = YES;
STRIP_INSTALLED_PRODUCT = NO;
SUPPORTS_MACCATALYST = NO; SUPPORTS_MACCATALYST = NO;
SWIFT_ACTIVE_COMPILATION_CONDITIONS = DEBUG; SWIFT_ACTIVE_COMPILATION_CONDITIONS = DEBUG;
SWIFT_INCLUDE_PATHS = "$(inherited) \"${PODS_XCFRAMEWORKS_BUILD_DIR}/Clibsodium\" \"$(TARGET_BUILD_DIR)/libSessionUtil\""; SWIFT_INCLUDE_PATHS = "$(inherited) \"${PODS_XCFRAMEWORKS_BUILD_DIR}/Clibsodium\" \"$(TARGET_BUILD_DIR)/libSessionUtil\"";
@ -7917,6 +7919,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "com.loki-project.SessionMessagingKit"; PRODUCT_BUNDLE_IDENTIFIER = "com.loki-project.SessionMessagingKit";
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)"; PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SKIP_INSTALL = YES; SKIP_INSTALL = YES;
STRIP_INSTALLED_PRODUCT = NO;
SUPPORTS_MACCATALYST = NO; SUPPORTS_MACCATALYST = NO;
SWIFT_ACTIVE_COMPILATION_CONDITIONS = DEBUG; SWIFT_ACTIVE_COMPILATION_CONDITIONS = DEBUG;
SWIFT_INCLUDE_PATHS = "$(inherited) \"${PODS_XCFRAMEWORKS_BUILD_DIR}/Clibsodium\" \"$(TARGET_BUILD_DIR)/libSessionUtil\""; SWIFT_INCLUDE_PATHS = "$(inherited) \"${PODS_XCFRAMEWORKS_BUILD_DIR}/Clibsodium\" \"$(TARGET_BUILD_DIR)/libSessionUtil\"";
@ -8025,6 +8028,7 @@
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)"; PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SDKROOT = iphoneos; SDKROOT = iphoneos;
SKIP_INSTALL = YES; SKIP_INSTALL = YES;
STRIP_INSTALLED_PRODUCT = YES;
SUPPORTS_MACCATALYST = NO; SUPPORTS_MACCATALYST = NO;
SWIFT_COMPILATION_MODE = wholemodule; SWIFT_COMPILATION_MODE = wholemodule;
SWIFT_INCLUDE_PATHS = "$(inherited) \"${PODS_XCFRAMEWORKS_BUILD_DIR}/Clibsodium\" \"$(TARGET_BUILD_DIR)/libSessionUtil\""; SWIFT_INCLUDE_PATHS = "$(inherited) \"${PODS_XCFRAMEWORKS_BUILD_DIR}/Clibsodium\" \"$(TARGET_BUILD_DIR)/libSessionUtil\"";
@ -8249,6 +8253,7 @@
PROVISIONING_PROFILE_SPECIFIER = ""; PROVISIONING_PROFILE_SPECIFIER = "";
RUN_CLANG_STATIC_ANALYZER = YES; RUN_CLANG_STATIC_ANALYZER = YES;
SDKROOT = iphoneos; SDKROOT = iphoneos;
STRIP_INSTALLED_PRODUCT = NO;
SWIFT_OBJC_BRIDGING_HEADER = "Session/Meta/Signal-Bridging-Header.h"; SWIFT_OBJC_BRIDGING_HEADER = "Session/Meta/Signal-Bridging-Header.h";
SWIFT_OBJC_INTERFACE_HEADER_NAME = "Session-Swift.h"; SWIFT_OBJC_INTERFACE_HEADER_NAME = "Session-Swift.h";
SWIFT_OPTIMIZATION_LEVEL = "-Onone"; SWIFT_OPTIMIZATION_LEVEL = "-Onone";
@ -8665,6 +8670,7 @@
CLANG_WARN_NON_LITERAL_NULL_CONVERSION = YES; CLANG_WARN_NON_LITERAL_NULL_CONVERSION = YES;
CLANG_WARN_OBJC_LITERAL_CONVERSION = YES; CLANG_WARN_OBJC_LITERAL_CONVERSION = YES;
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR; CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_QUOTED_INCLUDE_IN_FRAMEWORK_HEADER = "$(inherited)";
CLANG_WARN_UNGUARDED_AVAILABILITY = YES_AGGRESSIVE; CLANG_WARN_UNGUARDED_AVAILABILITY = YES_AGGRESSIVE;
CODE_SIGN_STYLE = Automatic; CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO; COPY_PHASE_STRIP = NO;
@ -8717,7 +8723,7 @@
CLANG_WARN_OBJC_IMPLICIT_RETAIN_SELF = YES; CLANG_WARN_OBJC_IMPLICIT_RETAIN_SELF = YES;
CLANG_WARN_OBJC_LITERAL_CONVERSION = YES; CLANG_WARN_OBJC_LITERAL_CONVERSION = YES;
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR; CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_QUOTED_INCLUDE_IN_FRAMEWORK_HEADER = YES; CLANG_WARN_QUOTED_INCLUDE_IN_FRAMEWORK_HEADER = "$(inherited)";
CLANG_WARN_RANGE_LOOP_ANALYSIS = YES; CLANG_WARN_RANGE_LOOP_ANALYSIS = YES;
CLANG_WARN_STRICT_PROTOTYPES = YES; CLANG_WARN_STRICT_PROTOTYPES = YES;
CLANG_WARN_SUSPICIOUS_MOVE = YES; CLANG_WARN_SUSPICIOUS_MOVE = YES;

@ -224,7 +224,7 @@ final class NewDMVC: BaseVC, UIPageViewControllerDataSource, UIPageViewControlle
let message: String = { let message: String = {
if let error = error as? SnodeAPIError { if let error = error as? SnodeAPIError {
switch error { switch error {
case .decryptionFailed, .hashingFailed, .validationFailed: case .onsDecryptionFailed, .onsHashingFailed, .onsValidationFailed:
return "\(error)" return "\(error)"
default: break default: break

@ -63,8 +63,8 @@ public enum AttachmentUploadJob: JobExecutor {
MessageSender.handleMessageWillSend( MessageSender.handleMessageWillSend(
db, db,
message: details.message, message: details.message,
interactionId: interactionId, destination: details.destination,
isSyncMessage: details.isSyncMessage interactionId: interactionId
) )
} }
@ -93,9 +93,9 @@ public enum AttachmentUploadJob: JobExecutor {
MessageSender.handleFailedMessageSend( MessageSender.handleFailedMessageSend(
db, db,
message: details.message, message: details.message,
destination: nil,
with: .other(error), with: .other(error),
interactionId: interactionId, interactionId: interactionId,
isSyncMessage: details.isSyncMessage,
using: dependencies using: dependencies
) )
} }

@ -45,10 +45,8 @@ public enum GetExpirationJob: JobExecutor {
let userPublicKey: String = getUserHexEncodedPublicKey(using: dependencies) let userPublicKey: String = getUserHexEncodedPublicKey(using: dependencies)
SnodeAPI SnodeAPI
.getSwarm(for: userPublicKey, using: dependencies) .getSwarm(for: userPublicKey, using: dependencies)
.tryFlatMap { swarm -> AnyPublisher<(ResponseInfoType, GetExpiriesResponse), Error> in .tryFlatMapWithRandomSnode(using: dependencies) { snode -> AnyPublisher<(ResponseInfoType, GetExpiriesResponse), Error> in
guard let snode = swarm.randomElement() else { throw SnodeAPIError.ranOutOfRandomSnodes } SnodeAPI.getExpiries(
return SnodeAPI.getExpiries(
from: snode, from: snode,
swarmPublicKey: userPublicKey, swarmPublicKey: userPublicKey,
of: expirationInfo.map { $0.key }, of: expirationInfo.map { $0.key },

@ -51,7 +51,6 @@ public enum GroupLeavingJob: JobExecutor {
to: destination, to: destination,
namespace: destination.defaultNamespace, namespace: destination.defaultNamespace,
interactionId: job.interactionId, interactionId: job.interactionId,
isSyncMessage: false,
using: dependencies using: dependencies
) )
} }

@ -176,7 +176,6 @@ public enum MessageSendJob: JobExecutor {
to: details.destination, to: details.destination,
namespace: details.destination.defaultNamespace, namespace: details.destination.defaultNamespace,
interactionId: job.interactionId, interactionId: job.interactionId,
isSyncMessage: details.isSyncMessage,
using: dependencies using: dependencies
) )
} }
@ -197,7 +196,7 @@ public enum MessageSendJob: JobExecutor {
SNLog("[MessageSendJob] Couldn't send message due to error: \(error) (paths: \(OnionRequestAPI.paths.prettifiedDescription)).") SNLog("[MessageSendJob] Couldn't send message due to error: \(error) (paths: \(OnionRequestAPI.paths.prettifiedDescription)).")
default: default:
SNLog("[MessageSendJob] Couldn't send message due to error: \(error).") SNLog("[MessageSendJob] Couldn't send message due to error: \(error)")
} }
// Actual error handling // Actual error handling
@ -240,25 +239,22 @@ extension MessageSendJob {
private enum CodingKeys: String, CodingKey { private enum CodingKeys: String, CodingKey {
case destination case destination
case message case message
case isSyncMessage @available(*, deprecated, message: "replaced by 'Message.Destination.syncMessage'") case isSyncMessage
case variant case variant
} }
public let destination: Message.Destination public let destination: Message.Destination
public let message: Message public let message: Message
public let isSyncMessage: Bool
public let variant: Message.Variant? public let variant: Message.Variant?
// MARK: - Initialization // MARK: - Initialization
public init( public init(
destination: Message.Destination, destination: Message.Destination,
message: Message, message: Message
isSyncMessage: Bool = false
) { ) {
self.destination = destination self.destination = destination
self.message = message self.message = message
self.isSyncMessage = isSyncMessage
self.variant = Message.Variant(from: message) self.variant = Message.Variant(from: message)
} }
@ -272,10 +268,42 @@ extension MessageSendJob {
throw StorageError.decodingFailed throw StorageError.decodingFailed
} }
let message: Message = try variant.decode(from: container, forKey: .message)
var destination: Message.Destination = try container.decode(Message.Destination.self, forKey: .destination)
/// Handle the legacy 'isSyncMessage' flag - this flag was deprecated in `2.5.2` (April 2024) and can be removed in a
/// subsequent release after May 2024
if ((try? container.decode(Bool.self, forKey: .isSyncMessage)) ?? false) {
switch (destination, message) {
case (.contact, let message as VisibleMessage):
guard let targetPublicKey: String = message.syncTarget else {
SNLog("Unable to decode messageSend job due to missing syncTarget")
throw StorageError.decodingFailed
}
destination = .syncMessage(originalRecipientPublicKey: targetPublicKey)
case (.contact, let message as ExpirationTimerUpdate):
guard let targetPublicKey: String = message.syncTarget else {
SNLog("Unable to decode messageSend job due to missing syncTarget")
throw StorageError.decodingFailed
}
destination = .syncMessage(originalRecipientPublicKey: targetPublicKey)
case (.contact(let publicKey), _):
SNLog("Sync message in messageSend job was missing explicit syncTarget (falling back to specified value)")
destination = .syncMessage(originalRecipientPublicKey: publicKey)
default:
SNLog("Unable to decode messageSend job due to invalid sync message state")
throw StorageError.decodingFailed
}
}
self = Details( self = Details(
destination: try container.decode(Message.Destination.self, forKey: .destination), destination: destination,
message: try variant.decode(from: container, forKey: .message), message: message
isSyncMessage: ((try? container.decode(Bool.self, forKey: .isSyncMessage)) ?? false)
) )
} }
@ -289,7 +317,6 @@ extension MessageSendJob {
try container.encode(destination, forKey: .destination) try container.encode(destination, forKey: .destination)
try container.encode(message, forKey: .message) try container.encode(message, forKey: .message)
try container.encode(isSyncMessage, forKey: .isSyncMessage)
try container.encode(variant, forKey: .variant) try container.encode(variant, forKey: .variant)
} }
} }

@ -43,8 +43,7 @@ public enum SendReadReceiptsJob: JobExecutor {
), ),
to: details.destination, to: details.destination,
namespace: details.destination.defaultNamespace, namespace: details.destination.defaultNamespace,
interactionId: nil, interactionId: nil
isSyncMessage: false
) )
} }
.flatMap { MessageSender.sendImmediate(data: $0, using: dependencies) } .flatMap { MessageSender.sendImmediate(data: $0, using: dependencies) }

@ -7,8 +7,16 @@ import SessionUtilitiesKit
public extension Message { public extension Message {
enum Destination: Codable, Hashable { enum Destination: Codable, Hashable {
/// A message directed to another user
case contact(publicKey: String) case contact(publicKey: String)
/// A message that was originally sent to another user but needs to be replicated to the current users swarm
case syncMessage(originalRecipientPublicKey: String)
/// A message directed to group conversation
case closedGroup(groupPublicKey: String) case closedGroup(groupPublicKey: String)
/// A message directed to an open group
case openGroup( case openGroup(
roomToken: String, roomToken: String,
server: String, server: String,
@ -16,13 +24,15 @@ public extension Message {
whisperMods: Bool = false, whisperMods: Bool = false,
fileIds: [String]? = nil fileIds: [String]? = nil
) )
/// A message directed to an open group inbox
case openGroupInbox(server: String, openGroupPublicKey: String, blindedPublicKey: String) case openGroupInbox(server: String, openGroupPublicKey: String, blindedPublicKey: String)
public var defaultNamespace: SnodeAPI.Namespace? { public var defaultNamespace: SnodeAPI.Namespace? {
switch self { switch self {
case .contact: return .`default` case .contact, .syncMessage: return .`default`
case .closedGroup: return .legacyClosedGroup case .closedGroup: return .legacyClosedGroup
default: return nil case .openGroup, .openGroupInbox: return nil
} }
} }

@ -246,7 +246,7 @@ public extension Message {
static func threadId(forMessage message: Message, destination: Message.Destination) -> String { static func threadId(forMessage message: Message, destination: Message.Destination) -> String {
switch destination { switch destination {
case .contact(let publicKey): case .contact(let publicKey), .syncMessage(let publicKey):
// Extract the 'syncTarget' value if there is one // Extract the 'syncTarget' value if there is one
let maybeSyncTarget: String? let maybeSyncTarget: String?
@ -661,29 +661,24 @@ public extension Message {
internal static func getSpecifiedTTL( internal static func getSpecifiedTTL(
message: Message, message: Message,
isGroupMessage: Bool, destination: Message.Destination
isSyncMessage: Bool
) -> UInt64 { ) -> UInt64 {
guard Features.useNewDisappearingMessagesConfig else { return message.ttl } guard Features.useNewDisappearingMessagesConfig else { return message.ttl }
// Not disappearing messages
guard let expiresInSeconds = message.expiresInSeconds else { return message.ttl }
// Sync message should be read already, it is the same for disappear after read and disappear after sent switch (destination, message) {
guard !isSyncMessage else { return UInt64(expiresInSeconds * 1000) } // Disappear after sent messages with exceptions
case (_, is UnsendRequest): return message.ttl
// Disappear after read messages that have not be read case (.closedGroup, is ClosedGroupControlMessage), (.closedGroup, is ExpirationTimerUpdate):
guard let expiresStartedAtMs = message.expiresStartedAtMs else { return message.ttl }
// Disappear after read messages that have already be read
guard message.sentTimestamp == UInt64(expiresStartedAtMs) else { return message.ttl }
// Disappear after sent messages with exceptions
switch message {
case is ClosedGroupControlMessage, is UnsendRequest:
return message.ttl return message.ttl
case is ExpirationTimerUpdate:
return isGroupMessage ? message.ttl : UInt64(expiresInSeconds * 1000)
default: default:
guard
let expiresInSeconds = message.expiresInSeconds, // Not disappearing messages
expiresInSeconds > 0, // Not disappearing messages (0 == disabled)
let expiresStartedAtMs = message.expiresStartedAtMs, // Unread disappear after read message
message.sentTimestamp == UInt64(expiresStartedAtMs) // Already read disappearing messages
else { return message.ttl }
return UInt64(expiresInSeconds * 1000) return UInt64(expiresInSeconds * 1000)
} }
} }

@ -4,7 +4,7 @@
import Foundation import Foundation
public enum MessageSenderError: LocalizedError, Equatable { public enum MessageSenderError: Error, CustomStringConvertible, Equatable {
case invalidMessage case invalidMessage
case protoConversionFailed case protoConversionFailed
case noUserX25519KeyPair case noUserX25519KeyPair
@ -33,24 +33,28 @@ public enum MessageSenderError: LocalizedError, Equatable {
} }
} }
public var errorDescription: String? { public var description: String {
switch self { switch self {
case .invalidMessage: return "Invalid message." case .invalidMessage: return "Invalid message (MessageSenderError.invalidMessage)."
case .protoConversionFailed: return "Couldn't convert message to proto." case .protoConversionFailed: return "Couldn't convert message to proto (MessageSenderError.protoConversionFailed)."
case .noUserX25519KeyPair: return "Couldn't find user X25519 key pair." case .noUserX25519KeyPair: return "Couldn't find user X25519 key pair (MessageSenderError.noUserX25519KeyPair)."
case .noUserED25519KeyPair: return "Couldn't find user ED25519 key pair." case .noUserED25519KeyPair: return "Couldn't find user ED25519 key pair (MessageSenderError.noUserED25519KeyPair)."
case .signingFailed: return "Couldn't sign message." case .signingFailed: return "Couldn't sign message (MessageSenderError.signingFailed)."
case .encryptionFailed: return "Couldn't encrypt message." case .encryptionFailed: return "Couldn't encrypt message (MessageSenderError.encryptionFailed)."
case .noUsername: return "Missing username." case .noUsername: return "Missing username (MessageSenderError.noUsername)."
case .attachmentsNotUploaded: return "Attachments for this message have not been uploaded." case .attachmentsNotUploaded: return "Attachments for this message have not been uploaded (MessageSenderError.attachmentsNotUploaded)."
case .blindingFailed: return "Couldn't blind the sender" case .blindingFailed: return "Couldn't blind the sender (MessageSenderError.blindingFailed)."
case .sendJobTimeout: return "Send job timeout (likely due to path building taking too long)." case .sendJobTimeout: return "Send job timeout (likely due to path building taking too long - MessageSenderError.sendJobTimeout)."
// Closed groups // Closed groups
case .noThread: return "Couldn't find a thread associated with the given group public key." case .noThread: return "Couldn't find a thread associated with the given group public key (MessageSenderError.noThread)."
case .noKeyPair: return "Couldn't find a private key associated with the given group public key." case .noKeyPair: return "Couldn't find a private key associated with the given group public key (MessageSenderError.noKeyPair)."
case .invalidClosedGroupUpdate: return "Invalid group update." case .invalidClosedGroupUpdate: return "Invalid group update (MessageSenderError.invalidClosedGroupUpdate)."
case .other(let error): return error.localizedDescription case .other(let error):
switch error {
case is CustomStringConvertible: return "\(error)"
default: return error.localizedDescription
}
} }
} }

@ -70,7 +70,6 @@ extension MessageSender {
destination: destination, destination: destination,
threadId: threadId, threadId: threadId,
interactionId: interactionId, interactionId: interactionId,
isAlreadySyncMessage: false,
using: dependencies using: dependencies
) )
return return
@ -84,8 +83,7 @@ extension MessageSender {
interactionId: interactionId, interactionId: interactionId,
details: MessageSendJob.Details( details: MessageSendJob.Details(
destination: destination, destination: destination,
message: message, message: message
isSyncMessage: isSyncMessage
) )
), ),
canStartJob: true, canStartJob: true,
@ -132,6 +130,7 @@ extension MessageSender {
let threadId: String = { let threadId: String = {
switch preparedSendData.destination { switch preparedSendData.destination {
case .contact(let publicKey): return publicKey case .contact(let publicKey): return publicKey
case .syncMessage(let originalRecipientPublicKey): return originalRecipientPublicKey
case .closedGroup(let groupPublicKey): return groupPublicKey case .closedGroup(let groupPublicKey): return groupPublicKey
case .openGroup(let roomToken, let server, _, _, _): case .openGroup(let roomToken, let server, _, _, _):
return OpenGroup.idFor(roomToken: roomToken, server: server) return OpenGroup.idFor(roomToken: roomToken, server: server)

@ -17,7 +17,6 @@ public final class MessageSender {
let message: Message? let message: Message?
let interactionId: Int64? let interactionId: Int64?
let isSyncMessage: Bool?
let totalAttachmentsUploaded: Int let totalAttachmentsUploaded: Int
let snodeMessage: SnodeMessage? let snodeMessage: SnodeMessage?
@ -30,7 +29,6 @@ public final class MessageSender {
destination: Message.Destination, destination: Message.Destination,
namespace: SnodeAPI.Namespace?, namespace: SnodeAPI.Namespace?,
interactionId: Int64?, interactionId: Int64?,
isSyncMessage: Bool?,
totalAttachmentsUploaded: Int = 0, totalAttachmentsUploaded: Int = 0,
snodeMessage: SnodeMessage?, snodeMessage: SnodeMessage?,
plaintext: Data?, plaintext: Data?,
@ -42,7 +40,6 @@ public final class MessageSender {
self.destination = destination self.destination = destination
self.namespace = namespace self.namespace = namespace
self.interactionId = interactionId self.interactionId = interactionId
self.isSyncMessage = isSyncMessage
self.totalAttachmentsUploaded = totalAttachmentsUploaded self.totalAttachmentsUploaded = totalAttachmentsUploaded
self.snodeMessage = snodeMessage self.snodeMessage = snodeMessage
@ -56,7 +53,6 @@ public final class MessageSender {
destination: Message.Destination, destination: Message.Destination,
namespace: SnodeAPI.Namespace, namespace: SnodeAPI.Namespace,
interactionId: Int64?, interactionId: Int64?,
isSyncMessage: Bool?,
snodeMessage: SnodeMessage snodeMessage: SnodeMessage
) { ) {
self.shouldSend = true self.shouldSend = true
@ -65,7 +61,6 @@ public final class MessageSender {
self.destination = destination self.destination = destination
self.namespace = namespace self.namespace = namespace
self.interactionId = interactionId self.interactionId = interactionId
self.isSyncMessage = isSyncMessage
self.totalAttachmentsUploaded = 0 self.totalAttachmentsUploaded = 0
self.snodeMessage = snodeMessage self.snodeMessage = snodeMessage
@ -86,7 +81,6 @@ public final class MessageSender {
self.destination = destination self.destination = destination
self.namespace = nil self.namespace = nil
self.interactionId = interactionId self.interactionId = interactionId
self.isSyncMessage = false
self.totalAttachmentsUploaded = 0 self.totalAttachmentsUploaded = 0
self.snodeMessage = nil self.snodeMessage = nil
@ -107,7 +101,6 @@ public final class MessageSender {
self.destination = destination self.destination = destination
self.namespace = nil self.namespace = nil
self.interactionId = interactionId self.interactionId = interactionId
self.isSyncMessage = false
self.totalAttachmentsUploaded = 0 self.totalAttachmentsUploaded = 0
self.snodeMessage = nil self.snodeMessage = nil
@ -124,7 +117,6 @@ public final class MessageSender {
destination: destination.with(fileIds: fileIds), destination: destination.with(fileIds: fileIds),
namespace: namespace, namespace: namespace,
interactionId: interactionId, interactionId: interactionId,
isSyncMessage: isSyncMessage,
totalAttachmentsUploaded: fileIds.count, totalAttachmentsUploaded: fileIds.count,
snodeMessage: snodeMessage, snodeMessage: snodeMessage,
plaintext: plaintext, plaintext: plaintext,
@ -139,7 +131,6 @@ public final class MessageSender {
to destination: Message.Destination, to destination: Message.Destination,
namespace: SnodeAPI.Namespace?, namespace: SnodeAPI.Namespace?,
interactionId: Int64?, interactionId: Int64?,
isSyncMessage: Bool = false,
using dependencies: Dependencies = Dependencies() using dependencies: Dependencies = Dependencies()
) throws -> PreparedSendData { ) throws -> PreparedSendData {
// Common logic for all destinations // Common logic for all destinations
@ -154,7 +145,7 @@ public final class MessageSender {
) )
switch destination { switch destination {
case .contact, .closedGroup: case .contact, .syncMessage, .closedGroup:
return try prepareSendToSnodeDestination( return try prepareSendToSnodeDestination(
db, db,
message: updatedMessage, message: updatedMessage,
@ -163,7 +154,6 @@ public final class MessageSender {
interactionId: interactionId, interactionId: interactionId,
userPublicKey: currentUserPublicKey, userPublicKey: currentUserPublicKey,
messageSendTimestamp: messageSendTimestamp, messageSendTimestamp: messageSendTimestamp,
isSyncMessage: isSyncMessage,
using: dependencies using: dependencies
) )
@ -198,13 +188,13 @@ public final class MessageSender {
interactionId: Int64?, interactionId: Int64?,
userPublicKey: String, userPublicKey: String,
messageSendTimestamp: Int64, messageSendTimestamp: Int64,
isSyncMessage: Bool = false,
using dependencies: Dependencies using dependencies: Dependencies
) throws -> PreparedSendData { ) throws -> PreparedSendData {
message.sender = userPublicKey message.sender = userPublicKey
message.recipient = { message.recipient = {
switch destination { switch destination {
case .contact(let publicKey): return publicKey case .contact(let publicKey): return publicKey
case .syncMessage: return userPublicKey
case .closedGroup(let groupPublicKey): return groupPublicKey case .closedGroup(let groupPublicKey): return groupPublicKey
case .openGroup, .openGroupInbox: preconditionFailure() case .openGroup, .openGroupInbox: preconditionFailure()
} }
@ -215,6 +205,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .invalidMessage, with: .invalidMessage,
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -223,25 +214,25 @@ public final class MessageSender {
// Attach the user's profile if needed (no need to do so for 'Note to Self' or sync // Attach the user's profile if needed (no need to do so for 'Note to Self' or sync
// messages as they will be managed by the user config handling // messages as they will be managed by the user config handling
let isSelfSend: Bool = (message.recipient == userPublicKey) switch (destination, (message.recipient == userPublicKey), message as? MessageWithProfile) {
case (.syncMessage, _, _), (_, true, _), (_, _, .none): break
if !isSelfSend, !isSyncMessage, var messageWithProfile: MessageWithProfile = message as? MessageWithProfile { case (_, _, .some(var messageWithProfile)):
let profile: Profile = Profile.fetchOrCreateCurrentUser(db) let profile: Profile = Profile.fetchOrCreateCurrentUser(db)
if let profileKey: Data = profile.profileEncryptionKey, let profilePictureUrl: String = profile.profilePictureUrl { if let profileKey: Data = profile.profileEncryptionKey, let profilePictureUrl: String = profile.profilePictureUrl {
messageWithProfile.profile = VisibleMessage.VMProfile( messageWithProfile.profile = VisibleMessage.VMProfile(
displayName: profile.name, displayName: profile.name,
profileKey: profileKey, profileKey: profileKey,
profilePictureUrl: profilePictureUrl profilePictureUrl: profilePictureUrl
) )
} }
else { else {
messageWithProfile.profile = VisibleMessage.VMProfile(displayName: profile.name) messageWithProfile.profile = VisibleMessage.VMProfile(displayName: profile.name)
} }
} }
// Perform any pre-send actions // Perform any pre-send actions
handleMessageWillSend(db, message: message, interactionId: interactionId, isSyncMessage: isSyncMessage) handleMessageWillSend(db, message: message, destination: destination, interactionId: interactionId)
// Convert it to protobuf // Convert it to protobuf
let threadId: String = Message.threadId(forMessage: message, destination: destination) let threadId: String = Message.threadId(forMessage: message, destination: destination)
@ -250,6 +241,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .protoConversionFailed, with: .protoConversionFailed,
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -267,6 +259,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .other(error), with: .other(error),
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -280,6 +273,9 @@ public final class MessageSender {
case .contact(let publicKey): case .contact(let publicKey):
ciphertext = try encryptWithSessionProtocol(db, plaintext: plaintext, for: publicKey, using: dependencies) ciphertext = try encryptWithSessionProtocol(db, plaintext: plaintext, for: publicKey, using: dependencies)
case .syncMessage:
ciphertext = try encryptWithSessionProtocol(db, plaintext: plaintext, for: userPublicKey, using: dependencies)
case .closedGroup(let groupPublicKey): case .closedGroup(let groupPublicKey):
guard let encryptionKeyPair: ClosedGroupKeyPair = try? ClosedGroupKeyPair.fetchLatestKeyPair(db, threadId: groupPublicKey) else { guard let encryptionKeyPair: ClosedGroupKeyPair = try? ClosedGroupKeyPair.fetchLatestKeyPair(db, threadId: groupPublicKey) else {
throw MessageSenderError.noKeyPair throw MessageSenderError.noKeyPair
@ -300,6 +296,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .other(error), with: .other(error),
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -311,7 +308,7 @@ public final class MessageSender {
let senderPublicKey: String let senderPublicKey: String
switch destination { switch destination {
case .contact: case .contact, .syncMessage:
kind = .sessionMessage kind = .sessionMessage
senderPublicKey = "" senderPublicKey = ""
@ -336,6 +333,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .other(error), with: .other(error),
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -350,13 +348,7 @@ public final class MessageSender {
data: base64EncodedData, data: base64EncodedData,
ttl: Message.getSpecifiedTTL( ttl: Message.getSpecifiedTTL(
message: message, message: message,
isGroupMessage: { destination: destination
switch destination {
case .closedGroup: return true
default: return false
}
}(),
isSyncMessage: isSyncMessage
), ),
timestampMs: UInt64(messageSendTimestamp) timestampMs: UInt64(messageSendTimestamp)
) )
@ -366,7 +358,6 @@ public final class MessageSender {
destination: destination, destination: destination,
namespace: namespace, namespace: namespace,
interactionId: interactionId, interactionId: interactionId,
isSyncMessage: isSyncMessage,
snodeMessage: snodeMessage snodeMessage: snodeMessage
) )
} }
@ -382,7 +373,7 @@ public final class MessageSender {
let threadId: String let threadId: String
switch destination { switch destination {
case .contact, .closedGroup, .openGroupInbox: preconditionFailure() case .contact, .syncMessage, .closedGroup, .openGroupInbox: preconditionFailure()
case .openGroup(let roomToken, let server, let whisperTo, let whisperMods, _): case .openGroup(let roomToken, let server, let whisperTo, let whisperMods, _):
threadId = OpenGroup.idFor(roomToken: roomToken, server: server) threadId = OpenGroup.idFor(roomToken: roomToken, server: server)
message.recipient = [ message.recipient = [
@ -439,6 +430,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .invalidMessage, with: .invalidMessage,
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -455,6 +447,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .noUsername, with: .noUsername,
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -462,13 +455,14 @@ public final class MessageSender {
} }
// Perform any pre-send actions // Perform any pre-send actions
handleMessageWillSend(db, message: message, interactionId: interactionId) handleMessageWillSend(db, message: message, destination: destination, interactionId: interactionId)
// Convert it to protobuf // Convert it to protobuf
guard let proto = message.toProto(db, threadId: threadId) else { guard let proto = message.toProto(db, threadId: threadId) else {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .protoConversionFailed, with: .protoConversionFailed,
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -486,6 +480,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .other(error), with: .other(error),
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -533,13 +528,14 @@ public final class MessageSender {
} }
// Perform any pre-send actions // Perform any pre-send actions
handleMessageWillSend(db, message: message, interactionId: interactionId) handleMessageWillSend(db, message: message, destination: destination, interactionId: interactionId)
// Convert it to protobuf // Convert it to protobuf
guard let proto = message.toProto(db, threadId: recipientBlindedPublicKey) else { guard let proto = message.toProto(db, threadId: recipientBlindedPublicKey) else {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .protoConversionFailed, with: .protoConversionFailed,
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -557,6 +553,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .other(error), with: .other(error),
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -580,6 +577,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend( throw MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: destination,
with: .other(error), with: .other(error),
interactionId: interactionId, interactionId: interactionId,
using: dependencies using: dependencies
@ -628,9 +626,9 @@ public final class MessageSender {
MessageSender.handleFailedMessageSend( MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: data.destination,
with: .attachmentsNotUploaded, with: .attachmentsNotUploaded,
interactionId: data.interactionId, interactionId: data.interactionId,
isSyncMessage: (data.isSyncMessage == true),
using: dependencies using: dependencies
) )
} }
@ -646,7 +644,7 @@ public final class MessageSender {
} }
switch data.destination { switch data.destination {
case .contact, .closedGroup: return sendToSnodeDestination(data: data, using: dependencies) case .contact, .syncMessage, .closedGroup: return sendToSnodeDestination(data: data, using: dependencies)
case .openGroup: return sendToOpenGroupDestination(data: data, using: dependencies) case .openGroup: return sendToOpenGroupDestination(data: data, using: dependencies)
case .openGroupInbox: return sendToOpenGroupInbox(data: data, using: dependencies) case .openGroupInbox: return sendToOpenGroupInbox(data: data, using: dependencies)
} }
@ -661,7 +659,6 @@ public final class MessageSender {
guard guard
let message: Message = data.message, let message: Message = data.message,
let namespace: SnodeAPI.Namespace = data.namespace, let namespace: SnodeAPI.Namespace = data.namespace,
let isSyncMessage: Bool = data.isSyncMessage,
let snodeMessage: SnodeMessage = data.snodeMessage let snodeMessage: SnodeMessage = data.snodeMessage
else { else {
return Fail(error: MessageSenderError.invalidMessage) return Fail(error: MessageSenderError.invalidMessage)
@ -680,9 +677,10 @@ public final class MessageSender {
details: NotifyPushServerJob.Details(message: snodeMessage) details: NotifyPushServerJob.Details(message: snodeMessage)
) )
let shouldNotify: Bool = { let shouldNotify: Bool = {
switch updatedMessage { switch (updatedMessage, data.destination) {
case is VisibleMessage, is UnsendRequest: return !isSyncMessage case (is VisibleMessage, .syncMessage), (is UnsendRequest, .syncMessage): return false
case let callMessage as CallMessage: case (is VisibleMessage, _), (is UnsendRequest, _): return true
case (let callMessage as CallMessage, _):
// Note: Other 'CallMessage' types are too big to send as push notifications // Note: Other 'CallMessage' types are too big to send as push notifications
// so only send the 'preOffer' message as a notification // so only send the 'preOffer' message as a notification
switch callMessage.kind { switch callMessage.kind {
@ -701,7 +699,6 @@ public final class MessageSender {
message: updatedMessage, message: updatedMessage,
to: data.destination, to: data.destination,
interactionId: data.interactionId, interactionId: data.interactionId,
isSyncMessage: isSyncMessage,
using: dependencies using: dependencies
) )
@ -758,6 +755,7 @@ public final class MessageSender {
MessageSender.handleFailedMessageSend( MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: data.destination,
with: .other(error), with: .other(error),
interactionId: data.interactionId, interactionId: data.interactionId,
using: dependencies using: dependencies
@ -829,6 +827,7 @@ public final class MessageSender {
MessageSender.handleFailedMessageSend( MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: data.destination,
with: .other(error), with: .other(error),
interactionId: data.interactionId, interactionId: data.interactionId,
using: dependencies using: dependencies
@ -893,6 +892,7 @@ public final class MessageSender {
MessageSender.handleFailedMessageSend( MessageSender.handleFailedMessageSend(
db, db,
message: message, message: message,
destination: data.destination,
with: .other(error), with: .other(error),
interactionId: data.interactionId, interactionId: data.interactionId,
using: dependencies using: dependencies
@ -909,27 +909,33 @@ public final class MessageSender {
public static func handleMessageWillSend( public static func handleMessageWillSend(
_ db: Database, _ db: Database,
message: Message, message: Message,
interactionId: Int64?, destination: Message.Destination,
isSyncMessage: Bool = false interactionId: Int64?
) { ) {
// If the message was a reaction then we don't want to do anything to the original // If the message was a reaction then we don't want to do anything to the original
// interaction (which the 'interactionId' is pointing to // interaction (which the 'interactionId' is pointing to
guard (message as? VisibleMessage)?.reaction == nil else { return } guard (message as? VisibleMessage)?.reaction == nil else { return }
// Mark messages as "sending"/"syncing" if needed (this is for retries) // Mark messages as "sending"/"syncing" if needed (this is for retries)
_ = try? RecipientState switch destination {
.filter(RecipientState.Columns.interactionId == interactionId) case .syncMessage:
.filter(isSyncMessage ? _ = try? RecipientState
RecipientState.Columns.state == RecipientState.State.failedToSync : .filter(RecipientState.Columns.interactionId == interactionId)
RecipientState.Columns.state == RecipientState.State.failed .filter(RecipientState.Columns.state == RecipientState.State.failedToSync)
) .updateAll(
.updateAll( db,
db, RecipientState.Columns.state.set(to: RecipientState.State.syncing)
RecipientState.Columns.state.set(to: isSyncMessage ? )
RecipientState.State.syncing :
RecipientState.State.sending default:
) _ = try? RecipientState
) .filter(RecipientState.Columns.interactionId == interactionId)
.filter(RecipientState.Columns.state == RecipientState.State.failed)
.updateAll(
db,
RecipientState.Columns.state.set(to: RecipientState.State.sending)
)
}
} }
private static func handleSuccessfulMessageSend( private static func handleSuccessfulMessageSend(
@ -938,7 +944,6 @@ public final class MessageSender {
to destination: Message.Destination, to destination: Message.Destination,
interactionId: Int64?, interactionId: Int64?,
serverTimestampMs: UInt64? = nil, serverTimestampMs: UInt64? = nil,
isSyncMessage: Bool = false,
using dependencies: Dependencies using dependencies: Dependencies
) throws { ) throws {
// If the message was a reaction then we want to update the reaction instead of the original // If the message was a reaction then we want to update the reaction instead of the original
@ -957,59 +962,61 @@ public final class MessageSender {
// Get the visible message if possible // Get the visible message if possible
if let interaction: Interaction = interaction { if let interaction: Interaction = interaction {
// Only store the server hash of a sync message if the message is self send valid // Only store the server hash of a sync message if the message is self send valid
if (message.isSelfSendValid && isSyncMessage || !isSyncMessage) { switch (message.isSelfSendValid, destination) {
try interaction.with( case (false, .syncMessage): break
serverHash: message.serverHash, case (true, .syncMessage), (_, .contact), (_, .closedGroup), (_, .openGroup), (_, .openGroupInbox):
// Track the open group server message ID and update server timestamp (use server try interaction.with(
// timestamp for open group messages otherwise the quote messages may not be able serverHash: message.serverHash,
// to be found by the timestamp on other devices // Track the open group server message ID and update server timestamp (use server
timestampMs: (message.openGroupServerMessageId == nil ? // timestamp for open group messages otherwise the quote messages may not be able
nil : // to be found by the timestamp on other devices
serverTimestampMs.map { Int64($0) } timestampMs: (message.openGroupServerMessageId == nil ?
), nil :
openGroupServerMessageId: message.openGroupServerMessageId.map { Int64($0) } serverTimestampMs.map { Int64($0) }
).update(db)
if interaction.isExpiringMessage {
// Start disappearing messages job after a message is successfully sent.
// For DAR and DAS outgoing messages, the expiration start time are the
// same as message sentTimestamp. So do this once, DAR and DAS messages
// should all be covered.
dependencies.jobRunner.upsert(
db,
job: DisappearingMessagesJob.updateNextRunIfNeeded(
db,
interaction: interaction,
startedAtMs: Double(interaction.timestampMs)
), ),
canStartJob: true, openGroupServerMessageId: message.openGroupServerMessageId.map { Int64($0) }
using: dependencies ).update(db)
)
if if interaction.isExpiringMessage {
isSyncMessage, // Start disappearing messages job after a message is successfully sent.
let startedAtMs: Double = interaction.expiresStartedAtMs, // For DAR and DAS outgoing messages, the expiration start time are the
let expiresInSeconds: TimeInterval = interaction.expiresInSeconds, // same as message sentTimestamp. So do this once, DAR and DAS messages
let serverHash: String = message.serverHash // should all be covered.
{ dependencies.jobRunner.upsert(
let expirationTimestampMs: Int64 = Int64(startedAtMs + expiresInSeconds * 1000)
dependencies.jobRunner.add(
db, db,
job: Job( job: DisappearingMessagesJob.updateNextRunIfNeeded(
variant: .expirationUpdate, db,
behaviour: .runOnce, interaction: interaction,
threadId: interaction.threadId, startedAtMs: Double(interaction.timestampMs)
details: ExpirationUpdateJob.Details(
serverHashes: [serverHash],
expirationTimestampMs: expirationTimestampMs
)
), ),
canStartJob: true, canStartJob: true,
using: dependencies using: dependencies
) )
if
case .syncMessage = destination,
let startedAtMs: Double = interaction.expiresStartedAtMs,
let expiresInSeconds: TimeInterval = interaction.expiresInSeconds,
let serverHash: String = message.serverHash
{
let expirationTimestampMs: Int64 = Int64(startedAtMs + expiresInSeconds * 1000)
dependencies.jobRunner.add(
db,
job: Job(
variant: .expirationUpdate,
behaviour: .runOnce,
threadId: interaction.threadId,
details: ExpirationUpdateJob.Details(
serverHashes: [serverHash],
expirationTimestampMs: expirationTimestampMs
)
),
canStartJob: true,
using: dependencies
)
}
} }
} }
}
// Mark the message as sent // Mark the message as sent
try interaction.recipientStates try interaction.recipientStates
@ -1038,7 +1045,6 @@ public final class MessageSender {
destination: destination, destination: destination,
threadId: threadId, threadId: threadId,
interactionId: interactionId, interactionId: interactionId,
isAlreadySyncMessage: isSyncMessage,
using: dependencies using: dependencies
) )
} }
@ -1046,9 +1052,9 @@ public final class MessageSender {
@discardableResult internal static func handleFailedMessageSend( @discardableResult internal static func handleFailedMessageSend(
_ db: Database, _ db: Database,
message: Message, message: Message,
destination: Message.Destination?,
with error: MessageSenderError, with error: MessageSenderError,
interactionId: Int64?, interactionId: Int64?,
isSyncMessage: Bool = false,
using dependencies: Dependencies using dependencies: Dependencies
) -> Error { ) -> Error {
// If the message was a reaction then we don't want to do anything to the original // If the message was a reaction then we don't want to do anything to the original
@ -1060,18 +1066,27 @@ public final class MessageSender {
// Note: The 'db' could be either read-only or writeable so we determine // Note: The 'db' could be either read-only or writeable so we determine
// if a change is required, and if so dispatch to a separate queue for the // if a change is required, and if so dispatch to a separate queue for the
// actual write // actual write
let rowIds: [Int64] = (try? RecipientState let rowIds: [Int64] = (try? {
.select(Column.rowID) switch destination {
.filter(RecipientState.Columns.interactionId == interactionId) case .syncMessage:
.filter(!isSyncMessage ? return RecipientState
RecipientState.Columns.state == RecipientState.State.sending : ( .select(Column.rowID)
RecipientState.Columns.state == RecipientState.State.syncing || .filter(RecipientState.Columns.interactionId == interactionId)
RecipientState.Columns.state == RecipientState.State.sent .filter(
) RecipientState.Columns.state == RecipientState.State.syncing ||
) RecipientState.Columns.state == RecipientState.State.sent
.asRequest(of: Int64.self) )
.fetchAll(db))
.defaulting(to: []) default:
return RecipientState
.select(Column.rowID)
.filter(RecipientState.Columns.interactionId == interactionId)
.filter(RecipientState.Columns.state == RecipientState.State.sending)
}
}()
.asRequest(of: Int64.self)
.fetchAll(db))
.defaulting(to: [])
guard !rowIds.isEmpty else { return error } guard !rowIds.isEmpty else { return error }
@ -1079,15 +1094,25 @@ public final class MessageSender {
// issue from occuring in some cases // issue from occuring in some cases
DispatchQueue.global(qos: .background).async { DispatchQueue.global(qos: .background).async {
dependencies.storage.write { db in dependencies.storage.write { db in
try RecipientState switch destination {
.filter(rowIds.contains(Column.rowID)) case .syncMessage:
.updateAll( try RecipientState
db, .filter(rowIds.contains(Column.rowID))
RecipientState.Columns.state.set( .updateAll(
to: (isSyncMessage ? RecipientState.State.failedToSync : RecipientState.State.failed) db,
), RecipientState.Columns.state.set(to: RecipientState.State.failedToSync),
RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription) RecipientState.Columns.mostRecentFailureText.set(to: "\(error)")
) )
default:
try RecipientState
.filter(rowIds.contains(Column.rowID))
.updateAll(
db,
RecipientState.Columns.state.set(to: RecipientState.State.failed),
RecipientState.Columns.mostRecentFailureText.set(to: "\(error)")
)
}
} }
} }
@ -1116,7 +1141,6 @@ public final class MessageSender {
destination: Message.Destination, destination: Message.Destination,
threadId: String?, threadId: String?,
interactionId: Int64?, interactionId: Int64?,
isAlreadySyncMessage: Bool,
using dependencies: Dependencies using dependencies: Dependencies
) { ) {
// Sync the message if it's not a sync message, wasn't already sent to the current user and // Sync the message if it's not a sync message, wasn't already sent to the current user and
@ -1125,7 +1149,6 @@ public final class MessageSender {
if if
case .contact(let publicKey) = destination, case .contact(let publicKey) = destination,
!isAlreadySyncMessage,
publicKey != currentUserPublicKey, publicKey != currentUserPublicKey,
Message.shouldSync(message: message) Message.shouldSync(message: message)
{ {
@ -1139,9 +1162,8 @@ public final class MessageSender {
threadId: threadId, threadId: threadId,
interactionId: interactionId, interactionId: interactionId,
details: MessageSendJob.Details( details: MessageSendJob.Details(
destination: .contact(publicKey: currentUserPublicKey), destination: .syncMessage(originalRecipientPublicKey: publicKey),
message: message, message: message
isSyncMessage: true
) )
), ),
canStartJob: true, canStartJob: true,

@ -253,7 +253,9 @@ final class ThreadPickerVC: UIViewController, UITableViewDataSource, UITableView
}(), }(),
using: dependencies using: dependencies
) )
.tryFlatMapWithRandomSnode { SnodeAPI.getNetworkTime(from: $0, using: dependencies) } .tryFlatMapWithRandomSnode(using: dependencies) {
SnodeAPI.getNetworkTime(from: $0, using: dependencies)
}
.map { _ in () } .map { _ in () }
.eraseToAnyPublisher() .eraseToAnyPublisher()
} }

@ -0,0 +1,3 @@
// Copyright © 2024 Rangeproof Pty Ltd. All rights reserved.
import Foundation

@ -50,12 +50,12 @@ extension SnodeAPI {
memLimit: sodium.pwHash.MemLimitModerate, memLimit: sodium.pwHash.MemLimitModerate,
alg: .Argon2ID13 alg: .Argon2ID13
) )
else { throw SnodeAPIError.hashingFailed } else { throw SnodeAPIError.onsHashingFailed }
let nonce: [UInt8] = Data(repeating: 0, count: sodium.secretBox.NonceBytes).bytes let nonce: [UInt8] = Data(repeating: 0, count: sodium.secretBox.NonceBytes).bytes
guard let sessionIdAsData: [UInt8] = sodium.secretBox.open(authenticatedCipherText: ciphertext, secretKey: key, nonce: nonce) else { guard let sessionIdAsData: [UInt8] = sodium.secretBox.open(authenticatedCipherText: ciphertext, secretKey: key, nonce: nonce) else {
throw SnodeAPIError.decryptionFailed throw SnodeAPIError.onsDecryptionFailed
} }
return sessionIdAsData.toHexString() return sessionIdAsData.toHexString()
@ -66,7 +66,7 @@ extension SnodeAPI {
// xchacha-based encryption // xchacha-based encryption
// key = H(name, key=H(name)) // key = H(name, key=H(name))
guard let key: [UInt8] = sodium.genericHash.hash(message: nameBytes, key: nameHashBytes) else { guard let key: [UInt8] = sodium.genericHash.hash(message: nameBytes, key: nameHashBytes) else {
throw SnodeAPIError.hashingFailed throw SnodeAPIError.onsHashingFailed
} }
guard guard
// Should always be equal in practice // Should always be equal in practice
@ -76,7 +76,7 @@ extension SnodeAPI {
secretKey: key, secretKey: key,
nonce: nonceBytes nonce: nonceBytes
) )
else { throw SnodeAPIError.decryptionFailed } else { throw SnodeAPIError.onsDecryptionFailed }
return sessionIdAsData.toHexString() return sessionIdAsData.toHexString()
} }

@ -600,7 +600,7 @@ public final class SnodeAPI {
let nameAsData = [UInt8](onsName.data(using: String.Encoding.utf8)!) let nameAsData = [UInt8](onsName.data(using: String.Encoding.utf8)!)
guard let nameHash = sodium.wrappedValue.genericHash.hash(message: nameAsData) else { guard let nameHash = sodium.wrappedValue.genericHash.hash(message: nameAsData) else {
return Fail(error: SnodeAPIError.hashingFailed) return Fail(error: SnodeAPIError.onsHashingFailed)
.eraseToAnyPublisher() .eraseToAnyPublisher()
} }
@ -647,7 +647,7 @@ public final class SnodeAPI {
.collect() .collect()
.tryMap { results -> String in .tryMap { results -> String in
guard results.count == validationCount, Set(results).count == 1 else { guard results.count == validationCount, Set(results).count == 1 else {
throw SnodeAPIError.validationFailed throw SnodeAPIError.onsValidationFailed
} }
return results[0] return results[0]
@ -1362,6 +1362,7 @@ public extension Publisher where Output == Set<Snode> {
return swarm.subtracting(usedSnodes) return swarm.subtracting(usedSnodes)
} }
}() }()
var lastError: Error?
return Just(()) return Just(())
.setFailureType(to: Error.self) .setFailureType(to: Error.self)
@ -1374,7 +1375,7 @@ public extension Publisher where Output == Set<Snode> {
// Select the next snode // Select the next snode
return try dependencies.popRandomElement(&remainingSnodes) ?? { return try dependencies.popRandomElement(&remainingSnodes) ?? {
throw SnodeAPIError.ranOutOfRandomSnodes throw SnodeAPIError.ranOutOfRandomSnodes(lastError)
}() }()
}() }()
drainBehaviour.mutate { $0 = $0.use(snode: snode, from: swarm) } drainBehaviour.mutate { $0 = $0.use(snode: snode, from: swarm) }
@ -1382,33 +1383,14 @@ public extension Publisher where Output == Set<Snode> {
return try transform(snode) return try transform(snode)
.eraseToAnyPublisher() .eraseToAnyPublisher()
} }
.retry(retries) .mapError { error in
.eraseToAnyPublisher() // Prevent nesting the 'ranOutOfRandomSnodes' errors
} switch error {
.eraseToAnyPublisher() case SnodeAPIError.ranOutOfRandomSnodes: break
} default: lastError = error
} }
// MARK: - Convenience
public extension Publisher where Output == Set<Snode> {
func tryFlatMapWithRandomSnode<T, P>(
maxPublishers: Subscribers.Demand = .unlimited,
retry retries: Int = 0,
_ transform: @escaping (Snode) throws -> P
) -> AnyPublisher<T, Error> where T == P.Output, P: Publisher, P.Failure == Error {
return self
.mapError { $0 }
.flatMap(maxPublishers: maxPublishers) { swarm -> AnyPublisher<T, Error> in
var remainingSnodes: Set<Snode> = swarm
return Just(())
.setFailureType(to: Error.self)
.tryFlatMap(maxPublishers: maxPublishers) { _ -> AnyPublisher<T, Error> in
let snode: Snode = try remainingSnodes.popRandomElement() ?? { throw SnodeAPIError.ranOutOfRandomSnodes }()
return try transform(snode) return error
.eraseToAnyPublisher()
} }
.retry(retries) .retry(retries)
.eraseToAnyPublisher() .eraseToAnyPublisher()

@ -21,12 +21,12 @@ public enum SnodeAPIError: Error, CustomStringConvertible {
// Onion Request Errors // Onion Request Errors
case emptySnodePool case emptySnodePool
case insufficientSnodes case insufficientSnodes
case ranOutOfRandomSnodes case ranOutOfRandomSnodes(Error?)
// ONS // ONS
case decryptionFailed case onsDecryptionFailed
case hashingFailed case onsHashingFailed
case validationFailed case onsValidationFailed
// Quic // Quic
case invalidPayload case invalidPayload
@ -51,12 +51,18 @@ public enum SnodeAPIError: Error, CustomStringConvertible {
// Onion Request Errors // Onion Request Errors
case .emptySnodePool: return "Service Node pool is empty (SnodeAPIError.emptySnodePool)." case .emptySnodePool: return "Service Node pool is empty (SnodeAPIError.emptySnodePool)."
case .insufficientSnodes: return "Couldn't find enough Service Nodes to build a path (SnodeAPIError.insufficientSnodes)." case .insufficientSnodes: return "Couldn't find enough Service Nodes to build a path (SnodeAPIError.insufficientSnodes)."
case .ranOutOfRandomSnodes: return "Ran out of random snodes to send the request through (SnodeAPIError.ranOutOfRandomSnodes)." case .ranOutOfRandomSnodes(let maybeError):
switch maybeError {
case .none: return "Ran out of random snodes (SnodeAPIError.ranOutOfRandomSnodes(nil))."
case .some(let error):
let errorDesc = "\(error)".trimmingCharacters(in: CharacterSet(["."]))
return "Ran out of random snodes (SnodeAPIError.ranOutOfRandomSnodes(\(errorDesc))."
}
// ONS // ONS
case .decryptionFailed: return "Couldn't decrypt ONS name (SnodeAPIError.decryptionFailed)." case .onsDecryptionFailed: return "Couldn't decrypt ONS name (SnodeAPIError.onsDecryptionFailed)."
case .hashingFailed: return "Couldn't compute ONS name hash (SnodeAPIError.hashingFailed)." case .onsHashingFailed: return "Couldn't compute ONS name hash (SnodeAPIError.onsHashingFailed)."
case .validationFailed: return "ONS name validation failed (SnodeAPIError.validationFailed)." case .onsValidationFailed: return "ONS name validation failed (SnodeAPIError.onsValidationFailed)."
// Quic // Quic
case .invalidPayload: return "Invalid payload (SnodeAPIError.invalidPayload)." case .invalidPayload: return "Invalid payload (SnodeAPIError.invalidPayload)."

@ -0,0 +1,3 @@
// Copyright © 2024 Rangeproof Pty Ltd. All rights reserved.
import Foundation

@ -1,35 +0,0 @@
// Copyright (c) 2019 Open Whisper Systems. All rights reserved.
import Foundation
import SignalCoreKit
extension UIAlertController {
@objc
public func applyAccessibilityIdentifiers() {
for action in actions {
guard let view = action.value(forKey: "__representer") as? UIView else {
owsFailDebug("Missing representer.")
continue
}
view.accessibilityIdentifier = action.accessibilityIdentifier
}
}
}
// MARK: -
extension UIAlertAction {
private struct AssociatedKeys {
static var AccessibilityIdentifier = "ows_accessibilityIdentifier"
}
@objc
public var accessibilityIdentifier: String? {
get {
return objc_getAssociatedObject(self, &AssociatedKeys.AccessibilityIdentifier) as? String
}
set {
objc_setAssociatedObject(self, &AssociatedKeys.AccessibilityIdentifier, newValue, objc_AssociationPolicy.OBJC_ASSOCIATION_RETAIN)
}
}
}
Loading…
Cancel
Save