diff --git a/SessionMessagingKit/Open Groups/OpenGroupAPI.swift b/SessionMessagingKit/Open Groups/OpenGroupAPI.swift index ecd7d9de3..0b1512247 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupAPI.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupAPI.swift @@ -15,19 +15,19 @@ public final class OpenGroupAPI: NSObject { // MARK: - Polling State - private static var hasPerformedInitialPoll: [String: Bool] = [:] - private static var timeSinceLastPoll: [String: TimeInterval] = [:] - private static var lastPollTime: TimeInterval = .greatestFiniteMagnitude + private static var hasPerformedInitialPoll: AtomicDict = AtomicDict() + private static var timeSinceLastPoll: AtomicDict = AtomicDict() + private static var lastPollTime: Atomic = Atomic(.greatestFiniteMagnitude) - private static let timeSinceLastOpen: TimeInterval = { - guard let lastOpen = UserDefaults.standard[.lastOpen] else { return .greatestFiniteMagnitude } + private static let timeSinceLastOpen: Atomic = { + guard let lastOpen = UserDefaults.standard[.lastOpen] else { return Atomic(.greatestFiniteMagnitude) } - return Date().timeIntervalSince(lastOpen) + return Atomic(Date().timeIntervalSince(lastOpen)) }() // TODO: Remove these - private static var legacyAuthTokenPromises: Atomic<[String: Promise]> = Atomic([:]) + private static var legacyAuthTokenPromises: AtomicDict> = AtomicDict() private static var legacyHasUpdatedLastOpenDate = false private static var legacyGroupImagePromises: [String: Promise] = [:] @@ -44,13 +44,13 @@ public final class OpenGroupAPI: NSObject { public static func poll(_ server: String, using dependencies: Dependencies = Dependencies()) -> Promise<[Endpoint: (OnionRequestResponseInfoType, Codable?)]> { // Store a local copy of the cached state for this server let hadPerformedInitialPoll: Bool = (hasPerformedInitialPoll[server] == true) - let originalTimeSinceLastPoll: TimeInterval = (timeSinceLastPoll[server] ?? min(lastPollTime, timeSinceLastOpen)) + let originalTimeSinceLastPoll: TimeInterval = (timeSinceLastPoll[server] ?? min(lastPollTime.wrappedValue, timeSinceLastOpen.wrappedValue)) let maybeLastInboxMessageId: Int64? = dependencies.storage.getOpenGroupInboxLatestMessageId(for: server) let lastInboxMessageId: Int64 = (maybeLastInboxMessageId ?? 0) // Update the cached state for this server - hasPerformedInitialPoll[server] = true - lastPollTime = min(lastPollTime, timeSinceLastOpen) + hasPerformedInitialPoll.wrappedValue[server] = true + lastPollTime.wrappedValue = min(lastPollTime.wrappedValue, timeSinceLastOpen.wrappedValue) UserDefaults.standard[.lastOpen] = Date() // Generate the requests diff --git a/SessionMessagingKit/Open Groups/OpenGroupManager.swift b/SessionMessagingKit/Open Groups/OpenGroupManager.swift index 5a0bdeb1f..dd3e71863 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupManager.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupManager.swift @@ -45,7 +45,6 @@ public final class OpenGroupManager: NSObject { // Clear any existing data if needed storage.removeOpenGroupSequenceNumber(for: roomToken, on: server, using: transaction) - storage.removeAuthToken(for: roomToken, on: server, using: transaction) // Store the public key storage.setOpenGroupPublicKey(for: server, to: publicKey, using: transaction) diff --git a/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift b/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift index 1915dd063..4093628be 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift @@ -843,7 +843,7 @@ extension MessageReceiver { public static func handleMessageRequestResponse(_ message: MessageRequestResponse, using transaction: Any) { let userPublicKey = getUserHexEncodedPublicKey() - var blindedContactIds: [String] = [] + var hadBlindedContact: Bool = false var blindedThreadIds: [String] = [] // Ignore messages which were sent from the current user @@ -882,11 +882,16 @@ extension MessageReceiver { let mapping: BlindedIdMapping = BlindedIdMapping(blindedId: blindedId, sessionId: senderId, serverPublicKey: serverPublicKey) Storage.shared.cacheBlindedIdMapping(mapping, using: transaction) - // Add the `blindedId` to an array so we can remove them at the end of processing - blindedContactIds.append(blindedId) + // Flag that we had a blinded contact and add the `blindedThreadId` to an array so we can remove + // them at the end of processing + hadBlindedContact = true blindedThreadIds.append(blindedThreadId) // Loop through all of the interactions and add them to a list to be moved to the new thread + // Note: Pending `MessageSendJobs` _shouldn't_ be an issue as even if they are sent after the + // un-blinding of a thread, the logic when handling the sent messages should automatically + // assign them to the correct thread + // TODO: Validate the above note once `/outbox` has been implemented view.enumerateRows(inGroup: blindedThreadId) { _, _, object, _, _, _ in guard let interaction: TSInteraction = object as? TSInteraction else { return @@ -896,9 +901,6 @@ extension MessageReceiver { } threadsToDelete.append(blindedThread) - - // TODO: Pending jobs??? -// Storage.shared.getAllPendingJobs(of: <#T##Job.Type#>) } // Sort the interactions by their `sortId` (which looks to be a global sort id for all interactions) just in case @@ -916,7 +918,6 @@ extension MessageReceiver { // Delete the old threads for thread in threadsToDelete { - // TODO: This isn't updating the HomeVC... Race condition??? (Seems to not happen when stepping through with breakpoints) thread.removeAllThreadInteractions(with: transaction) thread.remove(with: transaction) } @@ -926,19 +927,13 @@ extension MessageReceiver { updateContactApprovalStatusIfNeeded( senderSessionId: senderId, threadId: nil, - forceConfigSync: blindedContactIds.isEmpty, // Sync here if there are no blinded contacts + forceConfigSync: !hadBlindedContact, // Sync here if there were no blinded contacts using: transaction ) - // If there were blinded contacts then we should remove them - if !blindedContactIds.isEmpty { - // Delete all of the processed blinded contacts (shouldn't need them anymore and don't want them taking up - // space in the config message) - for blindedId in blindedContactIds { - // TODO: OWSBlockingManager...??? - } - - // We should assume the 'sender' is a newly created contact and hence need to update it's `isApproved` state + // If there were blinded contacts then we need to assume that the 'sender' is a newly create contact and hence + // need to update it's `isApproved` state + if hadBlindedContact { updateContactApprovalStatusIfNeeded( senderSessionId: userPublicKey, threadId: unblindedThreadId, diff --git a/SessionMessagingKit/Storage.swift b/SessionMessagingKit/Storage.swift index c73373d1e..976b7d606 100644 --- a/SessionMessagingKit/Storage.swift +++ b/SessionMessagingKit/Storage.swift @@ -38,12 +38,6 @@ public protocol SessionMessagingKitStorageProtocol { func resumeMessageSendJobIfNeeded(_ messageSendJobID: String) func isJobCanceled(_ job: Job) -> Bool - // MARK: - Authorization - - func getAuthToken(for room: String, on server: String) -> String? - func setAuthToken(for room: String, on server: String, to newValue: String, using transaction: Any) - func removeAuthToken(for room: String, on server: String, using transaction: Any) - // MARK: - Open Groups func getAllOpenGroups() -> [String: OpenGroup] diff --git a/SessionMessagingKit/Utilities/Atomic.swift b/SessionMessagingKit/Utilities/Atomic.swift index 8ba7ca568..69d5ecf7c 100644 --- a/SessionMessagingKit/Utilities/Atomic.swift +++ b/SessionMessagingKit/Utilities/Atomic.swift @@ -2,26 +2,95 @@ import Foundation -/// The `Atomic` wrapper is a generic wrapper providing a thread-safe way to get and set a value +/// See https://www.donnywals.com/why-your-atomic-property-wrapper-doesnt-work-for-collection-types/ +/// for more information about the below types + +protocol UnsupportedType {} + +extension Array: UnsupportedType {} +extension Set: UnsupportedType {} +extension Dictionary: UnsupportedType {} + +// MARK: - Atomic + +/// The `Atomic` wrapper is a generic wrapper providing a thread-safe way to get and set a value @propertyWrapper struct Atomic { - private let lock = DispatchSemaphore(value: 1) + private let queue: DispatchQueue = DispatchQueue(label: "io.oxen.\(UUID().uuidString)", qos: .utility, attributes: .concurrent, autoreleaseFrequency: .inherit, target: .global()) private var value: Value init(_ initialValue: Value) { + if initialValue is UnsupportedType { preconditionFailure("Use the appropriate Aromic... type for collections") } + self.value = initialValue } var wrappedValue: Value { - get { - lock.wait() - defer { lock.signal() } - return value - } - set { - lock.wait() - value = newValue - lock.signal() - } + get { return queue.sync { return value } } + set { return queue.sync { value = newValue } } + } +} + +extension Atomic where Value: CustomDebugStringConvertible { + var debugDescription: String { + return value.debugDescription + } +} + +// MARK: - AtomicArray + +/// The `AtomicArray` wrapper is a generic wrapper providing a thread-safe way to get and set an array or one of it's values +/// +/// Note: This is a class rather than a struct as you need to modify a reference rather than a copy for the concurrency to work +@propertyWrapper +class AtomicArray: CustomDebugStringConvertible { + private let queue: DispatchQueue = DispatchQueue(label: "io.oxen.\(UUID().uuidString)", qos: .utility, attributes: .concurrent, autoreleaseFrequency: .inherit, target: .global()) + private var value: [Value] + + init(_ initialValue: [Value] = []) { + self.value = initialValue + } + + var wrappedValue: [Value] { + get { return queue.sync { return value } } + set { return queue.sync { value = newValue } } + } + + subscript(index: Int) -> Value { + get { queue.sync { value[index] }} + set { queue.async(flags: .barrier) { self.value[index] = newValue } } + } + + public var debugDescription: String { + return value.debugDescription + } +} + +// MARK: - AtomicDict + +/// The `AtomicDict` wrapper is a generic wrapper providing a thread-safe way to get and set a dictionaries or one of it's values +/// +/// Note: This is a class rather than a struct as you need to modify a reference rather than a copy for the concurrency to work +@propertyWrapper +class AtomicDict: CustomDebugStringConvertible { + private let queue: DispatchQueue = DispatchQueue(label: "io.oxen.\(UUID().uuidString)", qos: .utility, attributes: .concurrent, autoreleaseFrequency: .inherit, target: .global()) + private var value: [Key: Value] + + init(_ initialValue: [Key: Value] = [:]) { + self.value = initialValue + } + + var wrappedValue: [Key: Value] { + get { return queue.sync { return value } } + set { return queue.sync { value = newValue } } + } + + subscript(key: Key) -> Value? { + get { queue.sync { value[key] }} + set { queue.async(flags: .barrier) { self.value[key] = newValue } } + } + + var debugDescription: String { + return value.debugDescription } }