From 4774123ad43c61e27b0aad7dcff6c9925fc86e03 Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Mon, 19 Apr 2021 14:44:27 +1000 Subject: [PATCH] Update V2 open group poller for compact polling --- .../Open Groups/V2/OpenGroupAPIV2.swift | 37 +++++--- .../Open Groups/V2/OpenGroupManagerV2.swift | 36 +++---- .../Pollers/OpenGroupPollerV2.swift | 95 +++++++++---------- 3 files changed, 86 insertions(+), 82 deletions(-) diff --git a/SessionMessagingKit/Open Groups/V2/OpenGroupAPIV2.swift b/SessionMessagingKit/Open Groups/V2/OpenGroupAPIV2.swift index 92175f7de..c7be97bd6 100644 --- a/SessionMessagingKit/Open Groups/V2/OpenGroupAPIV2.swift +++ b/SessionMessagingKit/Open Groups/V2/OpenGroupAPIV2.swift @@ -3,9 +3,9 @@ import SessionSnodeKit @objc(SNOpenGroupAPIV2) public final class OpenGroupAPIV2 : NSObject { - private static var moderators: [String:[String:Set]] = [:] // Server URL to room ID to set of moderator IDs private static var authTokenPromises: [String:Promise] = [:] + public static var moderators: [String:[String:Set]] = [:] // Server URL to room ID to set of moderator IDs public static let defaultServer = "https://sessionopengroup.com" public static let defaultServerPublicKey = "658d29b91892a2389505596b135e76a53db6e11d613a51dbd3d0816adffb231b" public static var defaultRoomsPromise: Promise<[Info]>? @@ -72,6 +72,14 @@ public final class OpenGroupAPIV2 : NSObject { self.imageID = imageID } } + + // MARK: Compact Poll Response Body + public struct CompactPollResponseBody { + let room: String + let messages: [OpenGroupMessageV2] + let deletions: [Int64] + let moderators: [String] + } // MARK: Convenience private static func send(_ request: Request) -> Promise { @@ -119,12 +127,17 @@ public final class OpenGroupAPIV2 : NSObject { } } - public static func compactPoll(_ server: String) -> Promise<[(room: String, messages: [OpenGroupMessageV2], deletions: [Int64], moderators: [String])]> { + public static func compactPoll(_ server: String) -> Promise<[CompactPollResponseBody]> { + #if DEBUG + dispatchPrecondition(condition: .onQueue(.main)) + #endif let storage = SNMessagingKitConfiguration.shared.storage let rooms = storage.getAllV2OpenGroups().values.filter { $0.server == server }.map { $0.room } var body: [JSON] = [] for room in rooms { - let authToken = try! getAuthToken(for: room, on: server).wait() // TODO: This should be async + // It's okay for this to be blocking. This call happens on a background thread + // plus we'll almost always have the auth token in storage anyway. + guard let authToken = try? getAuthToken(for: room, on: server).wait() else { continue } var json: JSON = [ "room_id" : room, "auth_token" : authToken ] if let lastMessageServerID = storage.getLastMessageServerID(for: room, on: server) { json["from_message_server_id"] = String(lastMessageServerID) @@ -135,17 +148,17 @@ public final class OpenGroupAPIV2 : NSObject { body.append(json) } let request = Request(verb: .post, room: nil, server: server, endpoint: "compact_poll", parameters: [ "requests" : body ], isAuthRequired: false) - return send(request).map(on: DispatchQueue.global(qos: .userInitiated)) { json in + return send(request).then(on: DispatchQueue.global(qos: .userInitiated)) { json -> Promise<[CompactPollResponseBody]> in guard let results = json["results"] as? [JSON] else { throw Error.parsingFailed } - var x: [(room: String, messages: [OpenGroupMessageV2], deletions: [Int64], moderators: [String])] = [] - for result in results { - guard let room = result["room_id"] as? String else { continue } - let messages = try! parseMessages(from: result, for: room, on: server).wait() // TODO: This should be async - let deletions = result["deletions"] as? [Int64] ?? [] - let moderators = result["moderators"] as? [String] ?? [] - x.append((room: room, messages: messages, deletions: deletions, moderators: moderators)) + let promises = results.compactMap { json -> Promise? in + guard let room = json["room_id"] as? String else { return nil } + let deletions = json["deletions"] as? [Int64] ?? [] + let moderators = json["moderators"] as? [String] ?? [] + return try? parseMessages(from: json, for: room, on: server).map { messages in + return CompactPollResponseBody(room: room, messages: messages, deletions: deletions, moderators: moderators) + } } - return x + return when(fulfilled: promises) } } diff --git a/SessionMessagingKit/Open Groups/V2/OpenGroupManagerV2.swift b/SessionMessagingKit/Open Groups/V2/OpenGroupManagerV2.swift index 69e7a2c66..af7bb8b91 100644 --- a/SessionMessagingKit/Open Groups/V2/OpenGroupManagerV2.swift +++ b/SessionMessagingKit/Open Groups/V2/OpenGroupManagerV2.swift @@ -2,7 +2,7 @@ import PromiseKit @objc(SNOpenGroupManagerV2) public final class OpenGroupManagerV2 : NSObject { - private var pollers: [String:OpenGroupPollerV2] = [:] + private var pollers: [String:OpenGroupPollerV2] = [:] // One for each server private var isPolling = false @objc public static var useV2OpenGroups = false @@ -16,12 +16,12 @@ public final class OpenGroupManagerV2 : NSObject { @objc public func startPolling() { guard !isPolling else { return } isPolling = true - let openGroups = Storage.shared.getAllV2OpenGroups() - for (_, openGroup) in openGroups { - if let poller = pollers[openGroup.id] { poller.stop() } // Should never occur - let poller = OpenGroupPollerV2(for: openGroup) + let servers = Set(Storage.shared.getAllV2OpenGroups().values.map { $0.server }) + servers.forEach { server in + if let poller = pollers[server] { poller.stop() } // Should never occur + let poller = OpenGroupPollerV2(for: server) poller.startIfNeeded() - pollers[openGroup.id] = poller + pollers[server] = poller } } @@ -56,15 +56,12 @@ public final class OpenGroupManagerV2 : NSObject { thread.save(with: transaction) storage.setV2OpenGroup(openGroup, for: thread.uniqueId!, using: transaction) }, completion: { - // Stop any existing poller if needed - if let poller = OpenGroupManagerV2.shared.pollers[openGroup.id] { - poller.stop() - OpenGroupManagerV2.shared.pollers[openGroup.id] = nil + // Start the poller if needed + if OpenGroupManagerV2.shared.pollers[server] == nil { + let poller = OpenGroupPollerV2(for: server) + poller.startIfNeeded() + OpenGroupManagerV2.shared.pollers[server] = poller } - // Start the poller - let poller = OpenGroupPollerV2(for: openGroup) - poller.startIfNeeded() - OpenGroupManagerV2.shared.pollers[openGroup.id] = poller // Fetch the group image OpenGroupAPIV2.getGroupImage(for: room, on: server).done(on: DispatchQueue.global(qos: .userInitiated)) { data in storage.write { transaction in @@ -86,10 +83,13 @@ public final class OpenGroupManagerV2 : NSObject { } public func delete(_ openGroup: OpenGroupV2, associatedWith thread: TSThread, using transaction: YapDatabaseReadWriteTransaction) { + let storage = SNMessagingKitConfiguration.shared.storage // Stop the poller if needed - if let poller = pollers[openGroup.id] { - poller.stop() - pollers[openGroup.id] = nil + let openGroups = storage.getAllV2OpenGroups().values.filter { $0.server == openGroup.server } + if openGroups.count == 1 && openGroups.last == openGroup { + let poller = pollers[openGroup.server] + poller?.stop() + pollers[openGroup.server] = nil } // Remove all data var messageIDs: Set = [] @@ -98,7 +98,7 @@ public final class OpenGroupManagerV2 : NSObject { messageIDs.insert(interaction.uniqueId!) messageTimestamps.insert(interaction.timestamp) } - SNMessagingKitConfiguration.shared.storage.updateMessageIDCollectionByPruningMessagesWithIDs(messageIDs, using: transaction) + storage.updateMessageIDCollectionByPruningMessagesWithIDs(messageIDs, using: transaction) Storage.shared.removeReceivedMessageTimestamps(messageTimestamps, using: transaction) Storage.shared.removeLastMessageServerID(for: openGroup.room, on: openGroup.server, using: transaction) Storage.shared.removeLastDeletionServerID(for: openGroup.room, on: openGroup.server, using: transaction) diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPollerV2.swift b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPollerV2.swift index d186a7306..45504f55a 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPollerV2.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPollerV2.swift @@ -2,10 +2,8 @@ import PromiseKit @objc(SNOpenGroupPollerV2) public final class OpenGroupPollerV2 : NSObject { - private let openGroup: OpenGroupV2 - private var pollForNewMessagesTimer: Timer? = nil - private var pollForDeletedMessagesTimer: Timer? = nil - private var pollForModeratorsTimer: Timer? = nil + private let server: String + private var timer: Timer? = nil private var hasStarted = false private var isPolling = false @@ -18,13 +16,11 @@ public final class OpenGroupPollerV2 : NSObject { } // MARK: Settings - private let pollForNewMessagesInterval: TimeInterval = 4 - private let pollForDeletedMessagesInterval: TimeInterval = 30 - private let pollForModeratorsInterval: TimeInterval = 10 * 60 + private let pollInterval: TimeInterval = 4 // MARK: Lifecycle - public init(for openGroup: OpenGroupV2) { - self.openGroup = openGroup + public init(for server: String) { + self.server = server super.init() } @@ -34,55 +30,33 @@ public final class OpenGroupPollerV2 : NSObject { DispatchQueue.main.async { [weak self] in // Timers don't do well on background queues guard let strongSelf = self else { return } strongSelf.hasStarted = true - // Create timers - strongSelf.pollForNewMessagesTimer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollForNewMessagesInterval, repeats: true) { _ in self?.pollForNewMessages() } - strongSelf.pollForDeletedMessagesTimer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollForDeletedMessagesInterval, repeats: true) { _ in self?.pollForDeletedMessages() } - strongSelf.pollForModeratorsTimer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollForModeratorsInterval, repeats: true) { _ in self?.pollForModerators() } - // Perform initial updates - strongSelf.pollForNewMessages() - strongSelf.pollForDeletedMessages() - strongSelf.pollForModerators() + strongSelf.timer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollInterval, repeats: true) { _ in self?.poll() } + strongSelf.poll() } } @objc public func stop() { - pollForNewMessagesTimer?.invalidate() - pollForDeletedMessagesTimer?.invalidate() - pollForModeratorsTimer?.invalidate() + timer?.invalidate() hasStarted = false } // MARK: Polling @discardableResult - public func pollForNewMessages() -> Promise { + public func poll() -> Promise { guard isMainAppAndActive else { stop(); return Promise.value(()) } - return pollForNewMessages(isBackgroundPoll: false) + return poll(isBackgroundPoll: false) } @discardableResult - public func pollForNewMessages(isBackgroundPoll: Bool) -> Promise { + public func poll(isBackgroundPoll: Bool) -> Promise { guard !self.isPolling else { return Promise.value(()) } self.isPolling = true - let openGroup = self.openGroup let (promise, seal) = Promise.pending() promise.retainUntilComplete() - OpenGroupAPIV2.getMessages(for: openGroup.room, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { [weak self] messages in + OpenGroupAPIV2.compactPoll(server).done(on: DispatchQueue.global(qos: .default)) { [weak self] bodies in guard let self = self else { return } self.isPolling = false - // Sorting the messages by server ID before importing them fixes an issue where messages that quote older messages can't find those older messages - let messages = messages.sorted { $0.serverID! < $1.serverID! } // Safe because messages with a nil serverID are filtered out - messages.forEach { message in - guard let data = Data(base64Encoded: message.base64EncodedData) else { - return SNLog("Ignoring open group message with invalid encoding.") - } - let envelope = SNProtoEnvelope.builder(type: .sessionMessage, timestamp: message.sentTimestamp) - envelope.setContent(data) - envelope.setSource(message.sender!) // Safe because messages with a nil sender are filtered out - let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: UInt64(message.serverID!), openGroupID: self.openGroup.id, isBackgroundPoll: isBackgroundPoll) - SNMessagingKitConfiguration.shared.storage.write { transaction in - SessionMessagingKit.JobQueue.shared.add(job, using: transaction) - } - } + bodies.forEach { self.handleCompactPollBody($0, isBackgroundPoll: isBackgroundPoll) } seal.fulfill(()) }.catch(on: DispatchQueue.global(qos: .userInitiated)) { _ in seal.fulfill(()) // The promise is just used to keep track of when we're done @@ -90,20 +64,37 @@ public final class OpenGroupPollerV2 : NSObject { return promise } - private func pollForDeletedMessages() { - let openGroup = self.openGroup - OpenGroupAPIV2.getDeletedMessages(for: openGroup.room, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { serverIDs in - let messageIDs = serverIDs.compactMap { Storage.shared.getIDForMessage(withServerID: UInt64($0)) } + private func handleCompactPollBody(_ body: OpenGroupAPIV2.CompactPollResponseBody, isBackgroundPoll: Bool) { + // - Messages + // Sorting the messages by server ID before importing them fixes an issue where messages that quote older messages can't find those older messages + let openGroupID = "\(server).\(body.room)" + let messages = body.messages.sorted { $0.serverID! < $1.serverID! } // Safe because messages with a nil serverID are filtered out + messages.forEach { message in + guard let data = Data(base64Encoded: message.base64EncodedData) else { + return SNLog("Ignoring open group message with invalid encoding.") + } + let envelope = SNProtoEnvelope.builder(type: .sessionMessage, timestamp: message.sentTimestamp) + envelope.setContent(data) + envelope.setSource(message.sender!) // Safe because messages with a nil sender are filtered out + let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: UInt64(message.serverID!), openGroupID: openGroupID, isBackgroundPoll: isBackgroundPoll) SNMessagingKitConfiguration.shared.storage.write { transaction in - let transaction = transaction as! YapDatabaseReadWriteTransaction - messageIDs.forEach { messageID in - TSMessage.fetch(uniqueId: messageID, transaction: transaction)?.remove(with: transaction) - } + SessionMessagingKit.JobQueue.shared.add(job, using: transaction) } - }.retainUntilComplete() - } - - private func pollForModerators() { - OpenGroupAPIV2.getModerators(for: openGroup.room, on: openGroup.server).retainUntilComplete() + } + // - Deletions + let messageIDs = body.deletions.compactMap { Storage.shared.getIDForMessage(withServerID: UInt64($0)) } + SNMessagingKitConfiguration.shared.storage.write { transaction in + let transaction = transaction as! YapDatabaseReadWriteTransaction + messageIDs.forEach { messageID in + TSMessage.fetch(uniqueId: messageID, transaction: transaction)?.remove(with: transaction) + } + } + // - Moderators + if var x = OpenGroupAPIV2.moderators[server] { + x[body.room] = Set(body.moderators) + OpenGroupAPIV2.moderators[server] = x + } else { + OpenGroupAPIV2.moderators[server] = [body.room:Set(body.moderators)] + } } }