Update V2 open group poller for compact polling

pull/388/head
Niels Andriesse 5 years ago
parent 3ab7733b77
commit 4774123ad4

@ -3,9 +3,9 @@ import SessionSnodeKit
@objc(SNOpenGroupAPIV2)
public final class OpenGroupAPIV2 : NSObject {
private static var moderators: [String:[String:Set<String>]] = [:] // Server URL to room ID to set of moderator IDs
private static var authTokenPromises: [String:Promise<String>] = [:]
public static var moderators: [String:[String:Set<String>]] = [:] // Server URL to room ID to set of moderator IDs
public static let defaultServer = "https://sessionopengroup.com"
public static let defaultServerPublicKey = "658d29b91892a2389505596b135e76a53db6e11d613a51dbd3d0816adffb231b"
public static var defaultRoomsPromise: Promise<[Info]>?
@ -72,6 +72,14 @@ public final class OpenGroupAPIV2 : NSObject {
self.imageID = imageID
}
}
// MARK: Compact Poll Response Body
public struct CompactPollResponseBody {
let room: String
let messages: [OpenGroupMessageV2]
let deletions: [Int64]
let moderators: [String]
}
// MARK: Convenience
private static func send(_ request: Request) -> Promise<JSON> {
@ -119,12 +127,17 @@ public final class OpenGroupAPIV2 : NSObject {
}
}
public static func compactPoll(_ server: String) -> Promise<[(room: String, messages: [OpenGroupMessageV2], deletions: [Int64], moderators: [String])]> {
public static func compactPoll(_ server: String) -> Promise<[CompactPollResponseBody]> {
#if DEBUG
dispatchPrecondition(condition: .onQueue(.main))
#endif
let storage = SNMessagingKitConfiguration.shared.storage
let rooms = storage.getAllV2OpenGroups().values.filter { $0.server == server }.map { $0.room }
var body: [JSON] = []
for room in rooms {
let authToken = try! getAuthToken(for: room, on: server).wait() // TODO: This should be async
// It's okay for this to be blocking. This call happens on a background thread
// plus we'll almost always have the auth token in storage anyway.
guard let authToken = try? getAuthToken(for: room, on: server).wait() else { continue }
var json: JSON = [ "room_id" : room, "auth_token" : authToken ]
if let lastMessageServerID = storage.getLastMessageServerID(for: room, on: server) {
json["from_message_server_id"] = String(lastMessageServerID)
@ -135,17 +148,17 @@ public final class OpenGroupAPIV2 : NSObject {
body.append(json)
}
let request = Request(verb: .post, room: nil, server: server, endpoint: "compact_poll", parameters: [ "requests" : body ], isAuthRequired: false)
return send(request).map(on: DispatchQueue.global(qos: .userInitiated)) { json in
return send(request).then(on: DispatchQueue.global(qos: .userInitiated)) { json -> Promise<[CompactPollResponseBody]> in
guard let results = json["results"] as? [JSON] else { throw Error.parsingFailed }
var x: [(room: String, messages: [OpenGroupMessageV2], deletions: [Int64], moderators: [String])] = []
for result in results {
guard let room = result["room_id"] as? String else { continue }
let messages = try! parseMessages(from: result, for: room, on: server).wait() // TODO: This should be async
let deletions = result["deletions"] as? [Int64] ?? []
let moderators = result["moderators"] as? [String] ?? []
x.append((room: room, messages: messages, deletions: deletions, moderators: moderators))
let promises = results.compactMap { json -> Promise<CompactPollResponseBody>? in
guard let room = json["room_id"] as? String else { return nil }
let deletions = json["deletions"] as? [Int64] ?? []
let moderators = json["moderators"] as? [String] ?? []
return try? parseMessages(from: json, for: room, on: server).map { messages in
return CompactPollResponseBody(room: room, messages: messages, deletions: deletions, moderators: moderators)
}
}
return x
return when(fulfilled: promises)
}
}

@ -2,7 +2,7 @@ import PromiseKit
@objc(SNOpenGroupManagerV2)
public final class OpenGroupManagerV2 : NSObject {
private var pollers: [String:OpenGroupPollerV2] = [:]
private var pollers: [String:OpenGroupPollerV2] = [:] // One for each server
private var isPolling = false
@objc public static var useV2OpenGroups = false
@ -16,12 +16,12 @@ public final class OpenGroupManagerV2 : NSObject {
@objc public func startPolling() {
guard !isPolling else { return }
isPolling = true
let openGroups = Storage.shared.getAllV2OpenGroups()
for (_, openGroup) in openGroups {
if let poller = pollers[openGroup.id] { poller.stop() } // Should never occur
let poller = OpenGroupPollerV2(for: openGroup)
let servers = Set(Storage.shared.getAllV2OpenGroups().values.map { $0.server })
servers.forEach { server in
if let poller = pollers[server] { poller.stop() } // Should never occur
let poller = OpenGroupPollerV2(for: server)
poller.startIfNeeded()
pollers[openGroup.id] = poller
pollers[server] = poller
}
}
@ -56,15 +56,12 @@ public final class OpenGroupManagerV2 : NSObject {
thread.save(with: transaction)
storage.setV2OpenGroup(openGroup, for: thread.uniqueId!, using: transaction)
}, completion: {
// Stop any existing poller if needed
if let poller = OpenGroupManagerV2.shared.pollers[openGroup.id] {
poller.stop()
OpenGroupManagerV2.shared.pollers[openGroup.id] = nil
// Start the poller if needed
if OpenGroupManagerV2.shared.pollers[server] == nil {
let poller = OpenGroupPollerV2(for: server)
poller.startIfNeeded()
OpenGroupManagerV2.shared.pollers[server] = poller
}
// Start the poller
let poller = OpenGroupPollerV2(for: openGroup)
poller.startIfNeeded()
OpenGroupManagerV2.shared.pollers[openGroup.id] = poller
// Fetch the group image
OpenGroupAPIV2.getGroupImage(for: room, on: server).done(on: DispatchQueue.global(qos: .userInitiated)) { data in
storage.write { transaction in
@ -86,10 +83,13 @@ public final class OpenGroupManagerV2 : NSObject {
}
public func delete(_ openGroup: OpenGroupV2, associatedWith thread: TSThread, using transaction: YapDatabaseReadWriteTransaction) {
let storage = SNMessagingKitConfiguration.shared.storage
// Stop the poller if needed
if let poller = pollers[openGroup.id] {
poller.stop()
pollers[openGroup.id] = nil
let openGroups = storage.getAllV2OpenGroups().values.filter { $0.server == openGroup.server }
if openGroups.count == 1 && openGroups.last == openGroup {
let poller = pollers[openGroup.server]
poller?.stop()
pollers[openGroup.server] = nil
}
// Remove all data
var messageIDs: Set<String> = []
@ -98,7 +98,7 @@ public final class OpenGroupManagerV2 : NSObject {
messageIDs.insert(interaction.uniqueId!)
messageTimestamps.insert(interaction.timestamp)
}
SNMessagingKitConfiguration.shared.storage.updateMessageIDCollectionByPruningMessagesWithIDs(messageIDs, using: transaction)
storage.updateMessageIDCollectionByPruningMessagesWithIDs(messageIDs, using: transaction)
Storage.shared.removeReceivedMessageTimestamps(messageTimestamps, using: transaction)
Storage.shared.removeLastMessageServerID(for: openGroup.room, on: openGroup.server, using: transaction)
Storage.shared.removeLastDeletionServerID(for: openGroup.room, on: openGroup.server, using: transaction)

@ -2,10 +2,8 @@ import PromiseKit
@objc(SNOpenGroupPollerV2)
public final class OpenGroupPollerV2 : NSObject {
private let openGroup: OpenGroupV2
private var pollForNewMessagesTimer: Timer? = nil
private var pollForDeletedMessagesTimer: Timer? = nil
private var pollForModeratorsTimer: Timer? = nil
private let server: String
private var timer: Timer? = nil
private var hasStarted = false
private var isPolling = false
@ -18,13 +16,11 @@ public final class OpenGroupPollerV2 : NSObject {
}
// MARK: Settings
private let pollForNewMessagesInterval: TimeInterval = 4
private let pollForDeletedMessagesInterval: TimeInterval = 30
private let pollForModeratorsInterval: TimeInterval = 10 * 60
private let pollInterval: TimeInterval = 4
// MARK: Lifecycle
public init(for openGroup: OpenGroupV2) {
self.openGroup = openGroup
public init(for server: String) {
self.server = server
super.init()
}
@ -34,55 +30,33 @@ public final class OpenGroupPollerV2 : NSObject {
DispatchQueue.main.async { [weak self] in // Timers don't do well on background queues
guard let strongSelf = self else { return }
strongSelf.hasStarted = true
// Create timers
strongSelf.pollForNewMessagesTimer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollForNewMessagesInterval, repeats: true) { _ in self?.pollForNewMessages() }
strongSelf.pollForDeletedMessagesTimer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollForDeletedMessagesInterval, repeats: true) { _ in self?.pollForDeletedMessages() }
strongSelf.pollForModeratorsTimer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollForModeratorsInterval, repeats: true) { _ in self?.pollForModerators() }
// Perform initial updates
strongSelf.pollForNewMessages()
strongSelf.pollForDeletedMessages()
strongSelf.pollForModerators()
strongSelf.timer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollInterval, repeats: true) { _ in self?.poll() }
strongSelf.poll()
}
}
@objc public func stop() {
pollForNewMessagesTimer?.invalidate()
pollForDeletedMessagesTimer?.invalidate()
pollForModeratorsTimer?.invalidate()
timer?.invalidate()
hasStarted = false
}
// MARK: Polling
@discardableResult
public func pollForNewMessages() -> Promise<Void> {
public func poll() -> Promise<Void> {
guard isMainAppAndActive else { stop(); return Promise.value(()) }
return pollForNewMessages(isBackgroundPoll: false)
return poll(isBackgroundPoll: false)
}
@discardableResult
public func pollForNewMessages(isBackgroundPoll: Bool) -> Promise<Void> {
public func poll(isBackgroundPoll: Bool) -> Promise<Void> {
guard !self.isPolling else { return Promise.value(()) }
self.isPolling = true
let openGroup = self.openGroup
let (promise, seal) = Promise<Void>.pending()
promise.retainUntilComplete()
OpenGroupAPIV2.getMessages(for: openGroup.room, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { [weak self] messages in
OpenGroupAPIV2.compactPoll(server).done(on: DispatchQueue.global(qos: .default)) { [weak self] bodies in
guard let self = self else { return }
self.isPolling = false
// Sorting the messages by server ID before importing them fixes an issue where messages that quote older messages can't find those older messages
let messages = messages.sorted { $0.serverID! < $1.serverID! } // Safe because messages with a nil serverID are filtered out
messages.forEach { message in
guard let data = Data(base64Encoded: message.base64EncodedData) else {
return SNLog("Ignoring open group message with invalid encoding.")
}
let envelope = SNProtoEnvelope.builder(type: .sessionMessage, timestamp: message.sentTimestamp)
envelope.setContent(data)
envelope.setSource(message.sender!) // Safe because messages with a nil sender are filtered out
let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: UInt64(message.serverID!), openGroupID: self.openGroup.id, isBackgroundPoll: isBackgroundPoll)
SNMessagingKitConfiguration.shared.storage.write { transaction in
SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
}
}
bodies.forEach { self.handleCompactPollBody($0, isBackgroundPoll: isBackgroundPoll) }
seal.fulfill(())
}.catch(on: DispatchQueue.global(qos: .userInitiated)) { _ in
seal.fulfill(()) // The promise is just used to keep track of when we're done
@ -90,20 +64,37 @@ public final class OpenGroupPollerV2 : NSObject {
return promise
}
private func pollForDeletedMessages() {
let openGroup = self.openGroup
OpenGroupAPIV2.getDeletedMessages(for: openGroup.room, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { serverIDs in
let messageIDs = serverIDs.compactMap { Storage.shared.getIDForMessage(withServerID: UInt64($0)) }
private func handleCompactPollBody(_ body: OpenGroupAPIV2.CompactPollResponseBody, isBackgroundPoll: Bool) {
// - Messages
// Sorting the messages by server ID before importing them fixes an issue where messages that quote older messages can't find those older messages
let openGroupID = "\(server).\(body.room)"
let messages = body.messages.sorted { $0.serverID! < $1.serverID! } // Safe because messages with a nil serverID are filtered out
messages.forEach { message in
guard let data = Data(base64Encoded: message.base64EncodedData) else {
return SNLog("Ignoring open group message with invalid encoding.")
}
let envelope = SNProtoEnvelope.builder(type: .sessionMessage, timestamp: message.sentTimestamp)
envelope.setContent(data)
envelope.setSource(message.sender!) // Safe because messages with a nil sender are filtered out
let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: UInt64(message.serverID!), openGroupID: openGroupID, isBackgroundPoll: isBackgroundPoll)
SNMessagingKitConfiguration.shared.storage.write { transaction in
let transaction = transaction as! YapDatabaseReadWriteTransaction
messageIDs.forEach { messageID in
TSMessage.fetch(uniqueId: messageID, transaction: transaction)?.remove(with: transaction)
}
SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
}
}.retainUntilComplete()
}
private func pollForModerators() {
OpenGroupAPIV2.getModerators(for: openGroup.room, on: openGroup.server).retainUntilComplete()
}
// - Deletions
let messageIDs = body.deletions.compactMap { Storage.shared.getIDForMessage(withServerID: UInt64($0)) }
SNMessagingKitConfiguration.shared.storage.write { transaction in
let transaction = transaction as! YapDatabaseReadWriteTransaction
messageIDs.forEach { messageID in
TSMessage.fetch(uniqueId: messageID, transaction: transaction)?.remove(with: transaction)
}
}
// - Moderators
if var x = OpenGroupAPIV2.moderators[server] {
x[body.room] = Set(body.moderators)
OpenGroupAPIV2.moderators[server] = x
} else {
OpenGroupAPIV2.moderators[server] = [body.room:Set(body.moderators)]
}
}
}

Loading…
Cancel
Save