From b8fb751b8d9fbb9dd237b32e2196dafc4a852079 Mon Sep 17 00:00:00 2001 From: nielsandriesse Date: Thu, 11 Jun 2020 16:33:11 +1000 Subject: [PATCH] Clean up threading --- .../src/Loki/API/LokiAPI+SwarmAPI.swift | 18 ++--- SignalServiceKit/src/Loki/API/LokiAPI.swift | 40 ++++------ .../src/Loki/API/LokiDotNetAPI.swift | 18 ++--- .../src/Loki/API/LokiFileServerAPI.swift | 20 ++--- .../src/Loki/API/LokiFileServerProxy.swift | 8 +- .../src/Loki/API/LokiHTTPClient.swift | 4 +- .../src/Loki/API/LokiPoller.swift | 10 +-- .../API/Onion Requests/OnionRequestAPI.swift | 45 +++++------ .../API/Open Groups/LokiPublicChatAPI.swift | 67 ++++++++-------- .../Open Groups/LokiPublicChatManager.swift | 4 +- .../Open Groups/LokiPublicChatPoller.swift | 10 +-- .../Loki/API/Shelved/LokiRSSFeedProxy.swift | 2 +- .../Protocol/Mentions/MentionsManager.swift | 8 +- .../Multi Device/MultiDeviceProtocol.swift | 6 +- .../LokiSessionResetImplementation.swift | 2 +- .../Sync Messages/SyncMessagesProtocol.swift | 4 +- .../LokiPushNotificationManager.swift | 8 +- .../src/Loki/Shelved/LokiP2PAPI.swift | 2 +- .../src/Loki/Utilities/Promise+Retrying.swift | 2 +- .../Loki/Utilities/Promise+Threading.swift | 76 +++++++++++++++++++ 20 files changed, 204 insertions(+), 150 deletions(-) create mode 100644 SignalServiceKit/src/Loki/Utilities/Promise+Threading.swift diff --git a/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift index 060c189a3..4b6a5548d 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift @@ -38,8 +38,8 @@ public extension LokiAPI { ] print("[Loki] Populating snode pool using: \(target).") let (promise, seal) = Promise.pending() - attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.global()) { - HTTP.execute(.post, url, parameters: parameters).map(on: DispatchQueue.global()) { json -> LokiAPITarget in + attempt(maxRetryCount: 4) { + HTTP.execute(.post, url, parameters: parameters).map2 { json -> LokiAPITarget in guard let intermediate = json["result"] as? JSON, let rawTargets = intermediate["service_node_states"] as? [JSON] else { throw LokiAPIError.randomSnodePoolUpdatingFailed } snodePool = try Set(rawTargets.flatMap { rawTarget in guard let address = rawTarget["public_ip"] as? String, let port = rawTarget["storage_port"] as? Int, let ed25519PublicKey = rawTarget["pubkey_ed25519"] as? String, let x25519PublicKey = rawTarget["pubkey_x25519"] as? String, address != "0.0.0.0" else { @@ -51,13 +51,13 @@ public extension LokiAPI { // randomElement() uses the system's default random generator, which is cryptographically secure return snodePool.randomElement()! } - }.done(on: DispatchQueue.global()) { snode in + }.done2 { snode in seal.fulfill(snode) try! Storage.writeSync { transaction in print("[Loki] Persisting snode pool to database.") storage.setSnodePool(LokiAPI.snodePool, in: transaction) } - }.catch(on: DispatchQueue.global()) { error in + }.catch2 { error in print("[Loki] Failed to contact seed node at: \(target).") seal.reject(error) } @@ -81,11 +81,11 @@ public extension LokiAPI { } else { print("[Loki] Getting swarm for: \(hexEncodedPublicKey).") let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ] - return getRandomSnode().then(on: workQueue) { + return getRandomSnode().then2 { invoke(.getSwarm, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters) - }.map { + }.map2 { parseTargets(from: $0) - }.get { swarm in + }.get2 { swarm in swarmCache[hexEncodedPublicKey] = swarm try! Storage.writeSync { transaction in storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction) @@ -96,7 +96,7 @@ public extension LokiAPI { internal static func getTargetSnodes(for hexEncodedPublicKey: String) -> Promise<[LokiAPITarget]> { // shuffled() uses the system's default random generator, which is cryptographically secure - return getSwarm(for: hexEncodedPublicKey).map { Array($0.shuffled().prefix(targetSwarmSnodeCount)) } + return getSwarm(for: hexEncodedPublicKey).map2 { Array($0.shuffled().prefix(targetSwarmSnodeCount)) } } internal static func dropSnodeFromSnodePool(_ target: LokiAPITarget) { @@ -145,7 +145,7 @@ public extension LokiAPI { internal extension Promise { internal func handlingSnodeErrorsIfNeeded(for target: LokiAPITarget, associatedWith hexEncodedPublicKey: String) -> Promise { - return recover(on: LokiAPI.errorHandlingQueue) { error -> Promise in + return recover2 { error -> Promise in if let error = error as? LokiHTTPClient.HTTPError { switch error.statusCode { case 0, 400, 500, 503: diff --git a/SignalServiceKit/src/Loki/API/LokiAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI.swift index 5f6aff3e7..e3c63db67 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI.swift @@ -1,16 +1,8 @@ import PromiseKit -// TODO: We guarantee that things happen in-order through promise chaining. For performance we should be able to use different queues for everything as long -// as we always modify state from the same queue. - @objc(LKAPI) public final class LokiAPI : NSObject { - /// All service node related errors must be handled on this queue to avoid race conditions maintaining e.g. failure counts. - internal static let errorHandlingQueue = DispatchQueue(label: "LokiAPI.errorHandlingQueue") - internal static let stateQueue = DispatchQueue(label: "LokiAPI.stateQueue") - internal static let workQueue = DispatchQueue(label: "LokiAPI.workQueue", qos: .userInitiated) - internal static var storage: OWSPrimaryStorage { OWSPrimaryStorage.shared() } // MARK: Settings @@ -22,7 +14,7 @@ public final class LokiAPI : NSObject { /// - Note: Changing this on the fly is not recommended. internal static var useOnionRequests = true - // MARK: Nested Types + // MARK: Types public typealias RawResponse = Any @objc public class LokiAPIError : NSError { // Not called `Error` for Obj-C interoperablity @@ -46,13 +38,13 @@ public final class LokiAPI : NSObject { parameters: JSON, headers: [String:String]? = nil, timeout: TimeInterval? = nil) -> RawResponsePromise { let url = URL(string: "\(target.address):\(target.port)/storage_rpc/v1")! if useOnionRequests { - return OnionRequestAPI.sendOnionRequest(invoking: method, on: target, with: parameters, associatedWith: hexEncodedPublicKey).map { $0 as Any } + return OnionRequestAPI.sendOnionRequest(invoking: method, on: target, with: parameters, associatedWith: hexEncodedPublicKey).map2 { $0 as Any } } else { let request = TSRequest(url: url, method: "POST", parameters: [ "method" : method.rawValue, "params" : parameters ]) if let headers = headers { request.allHTTPHeaderFields = headers } request.timeoutInterval = timeout ?? defaultTimeout - return TSNetworkManager.shared().perform(request, withCompletionQueue: workQueue) - .map { $0.responseObject } + return TSNetworkManager.shared().perform(request, withCompletionQueue: DispatchQueue.global(qos: .userInitiated)) + .map2 { $0.responseObject } .handlingSnodeErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey) .recoveringNetworkErrorsIfNeeded() } @@ -68,16 +60,16 @@ public final class LokiAPI : NSObject { // MARK: Public API public static func getMessages() -> Promise> { - return attempt(maxRetryCount: maxRetryCount, recoveringOn: workQueue) { - getTargetSnodes(for: getUserHexEncodedPublicKey()).mapValues { targetSnode in - getRawMessages(from: targetSnode, usingLongPolling: false).map { parseRawMessagesResponse($0, from: targetSnode) } - }.map { Set($0) } + return attempt(maxRetryCount: maxRetryCount) { + getTargetSnodes(for: getUserHexEncodedPublicKey()).mapValues2 { targetSnode in + getRawMessages(from: targetSnode, usingLongPolling: false).map2 { parseRawMessagesResponse($0, from: targetSnode) } + }.map2 { Set($0) } } } @objc(sendSignalMessage:onP2PSuccess:) public static func objc_sendSignalMessage(_ signalMessage: SignalMessage, onP2PSuccess: @escaping () -> Void) -> AnyPromise { - let promise = sendSignalMessage(signalMessage, onP2PSuccess: onP2PSuccess).mapValues { AnyPromise.from($0) }.map { Set($0) } + let promise = sendSignalMessage(signalMessage, onP2PSuccess: onP2PSuccess).mapValues2 { AnyPromise.from($0) }.map2 { Set($0) } return AnyPromise.from(promise) } @@ -87,18 +79,18 @@ public final class LokiAPI : NSObject { let destination = lokiMessage.destination func sendLokiMessage(_ lokiMessage: LokiMessage, to target: LokiAPITarget) -> RawResponsePromise { let parameters = lokiMessage.toJSON() - return attempt(maxRetryCount: maxRetryCount, recoveringOn: workQueue) { + return attempt(maxRetryCount: maxRetryCount) { invoke(.sendMessage, on: target, associatedWith: destination, parameters: parameters) } } func sendLokiMessageUsingSwarmAPI() -> Promise> { notificationCenter.post(name: .calculatingPoW, object: NSNumber(value: signalMessage.timestamp)) - return lokiMessage.calculatePoW().then { lokiMessageWithPoW -> Promise> in + return lokiMessage.calculatePoW().then2 { lokiMessageWithPoW -> Promise> in notificationCenter.post(name: .routing, object: NSNumber(value: signalMessage.timestamp)) - return getTargetSnodes(for: destination).map { snodes in + return getTargetSnodes(for: destination).map2 { snodes in return Set(snodes.map { snode in notificationCenter.post(name: .messageSending, object: NSNumber(value: signalMessage.timestamp)) - return sendLokiMessage(lokiMessageWithPoW, to: snode).map { rawResponse in + return sendLokiMessage(lokiMessageWithPoW, to: snode).map2 { rawResponse in if let json = rawResponse as? JSON, let powDifficulty = json["difficulty"] as? Int { guard powDifficulty != LokiAPI.powDifficulty else { return rawResponse } print("[Loki] Setting proof of work difficulty to \(powDifficulty).") @@ -115,10 +107,10 @@ public final class LokiAPI : NSObject { if let peer = LokiP2PAPI.getInfo(for: destination), (lokiMessage.isPing || peer.isOnline) { let target = LokiAPITarget(address: peer.address, port: peer.port, publicKeySet: nil) // TODO: Retrying - return Promise.value([ target ]).mapValues { sendLokiMessage(lokiMessage, to: $0) }.map { Set($0) }.get { _ in + return Promise.value([ target ]).mapValues2 { sendLokiMessage(lokiMessage, to: $0) }.map2 { Set($0) }.get2 { _ in LokiP2PAPI.markOnline(destination) onP2PSuccess() - }.recover { error -> Promise> in + }.recover2 { error -> Promise> in LokiP2PAPI.markOffline(destination) if lokiMessage.isPing { print("[Loki] Failed to ping \(destination); marking contact as offline.") @@ -228,7 +220,7 @@ public final class LokiAPI : NSObject { private extension Promise { fileprivate func recoveringNetworkErrorsIfNeeded() -> Promise { - return recover { error -> Promise in + return recover2 { error -> Promise in switch error { case NetworkManagerError.taskError(_, let underlyingError): throw underlyingError case LokiHTTPClient.HTTPError.networkError(_, _, let underlyingError): throw underlyingError ?? error diff --git a/SignalServiceKit/src/Loki/API/LokiDotNetAPI.swift b/SignalServiceKit/src/Loki/API/LokiDotNetAPI.swift index 5210cbc60..05e4ff3bc 100644 --- a/SignalServiceKit/src/Loki/API/LokiDotNetAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiDotNetAPI.swift @@ -37,7 +37,7 @@ public class LokiDotNetAPI : NSObject { if let token = getAuthTokenFromDatabase(for: server) { return Promise.value(token) } else { - return requestNewAuthToken(for: server).then(on: DispatchQueue.global()) { submitAuthToken($0, for: server) }.map(on: DispatchQueue.global()) { token in + return requestNewAuthToken(for: server).then2 { submitAuthToken($0, for: server) }.map2 { token in try! Storage.writeSync { transaction in setAuthToken(for: server, to: token, in: transaction) } @@ -65,7 +65,7 @@ public class LokiDotNetAPI : NSObject { let queryParameters = "pubKey=\(getUserHexEncodedPublicKey())" let url = URL(string: "\(server)/loki/v1/get_challenge?\(queryParameters)")! let request = TSRequest(url: url) - return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: LokiAPI.workQueue).map(on: LokiAPI.workQueue) { rawResponse in + return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global(qos: .userInitiated)).map2 { rawResponse in guard let json = rawResponse as? JSON, let base64EncodedChallenge = json["cipherText64"] as? String, let base64EncodedServerPublicKey = json["serverPubKey64"] as? String, let challenge = Data(base64Encoded: base64EncodedChallenge), var serverPublicKey = Data(base64Encoded: base64EncodedServerPublicKey) else { throw LokiDotNetAPIError.parsingFailed @@ -89,7 +89,7 @@ public class LokiDotNetAPI : NSObject { let url = URL(string: "\(server)/loki/v1/submit_challenge")! let parameters = [ "pubKey" : getUserHexEncodedPublicKey(), "token" : token ] let request = TSRequest(url: url, method: "POST", parameters: parameters) - return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { _ in token } + return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global(qos: .userInitiated)).map2 { _ in token } } // MARK: Public API @@ -157,9 +157,9 @@ public class LokiDotNetAPI : NSObject { if isProxyingRequired { attachment.isUploaded = false attachment.save() - let _ = LokiFileServerProxy(for: server).performLokiFileServerNSURLRequest(request as NSURLRequest).done { responseObject in + let _ = LokiFileServerProxy(for: server).performLokiFileServerNSURLRequest(request as NSURLRequest).done2 { responseObject in parseResponse(responseObject) - }.catch { error in + }.catch2 { error in seal.reject(error) } } else { @@ -187,13 +187,13 @@ public class LokiDotNetAPI : NSObject { } } if server == LokiFileServerAPI.server { - LokiAPI.workQueue.async { + DispatchQueue.global(qos: .userInitiated).async { proceed(with: "loki") // Uploads to the Loki File Server shouldn't include any personally identifiable information so use a dummy auth token } } else { - getAuthToken(for: server).done(on: LokiAPI.workQueue) { token in + getAuthToken(for: server).done2 { token in proceed(with: token) - }.catch { error in + }.catch2 { error in print("[Loki] Couldn't upload attachment due to error: \(error).") seal.reject(error) } @@ -206,7 +206,7 @@ public class LokiDotNetAPI : NSObject { internal extension Promise { internal func handlingInvalidAuthTokenIfNeeded(for server: String) -> Promise { - return recover(on: DispatchQueue.global()) { error -> Promise in + return recover2 { error -> Promise in if let error = error as? NetworkManagerError, (error.statusCode == 401 || error.statusCode == 403) { print("[Loki] Group chat auth token for: \(server) expired; dropping it.") LokiDotNetAPI.clearAuthToken(for: server) diff --git a/SignalServiceKit/src/Loki/API/LokiFileServerAPI.swift b/SignalServiceKit/src/Loki/API/LokiFileServerAPI.swift index d0eabc4dc..948896d6b 100644 --- a/SignalServiceKit/src/Loki/API/LokiFileServerAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiFileServerAPI.swift @@ -34,11 +34,11 @@ public final class LokiFileServerAPI : LokiDotNetAPI { public static func getDeviceLinks(associatedWith hexEncodedPublicKeys: Set) -> Promise> { let hexEncodedPublicKeysDescription = "[ \(hexEncodedPublicKeys.joined(separator: ", ")) ]" print("[Loki] Getting device links for: \(hexEncodedPublicKeysDescription).") - return getAuthToken(for: server).then(on: DispatchQueue.global()) { token -> Promise> in + return getAuthToken(for: server).then2 { token -> Promise> in let queryParameters = "ids=\(hexEncodedPublicKeys.map { "@\($0)" }.joined(separator: ","))&include_user_annotations=1" let url = URL(string: "\(server)/users?\(queryParameters)")! let request = TSRequest(url: url) - return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global(qos: .userInitiated)) { rawResponse -> Set in + return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global(qos: .userInitiated)).map2 { rawResponse -> Set in guard let json = rawResponse as? JSON, let data = json["data"] as? [JSON] else { print("[Loki] Couldn't parse device links for users: \(hexEncodedPublicKeys) from: \(rawResponse).") throw LokiDotNetAPIError.parsingFailed @@ -80,7 +80,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI { return deviceLink } }) - }.map(on: DispatchQueue.global()) { deviceLinks in + }.map2 { deviceLinks in storage.setDeviceLinks(deviceLinks) return deviceLinks } @@ -89,7 +89,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI { public static func setDeviceLinks(_ deviceLinks: Set) -> Promise { print("[Loki] Updating device links.") - return getAuthToken(for: server).then { token -> Promise in + return getAuthToken(for: server).then2 { token -> Promise in let isMaster = deviceLinks.contains { $0.master.hexEncodedPublicKey == getUserHexEncodedPublicKey() } let deviceLinksAsJSON = deviceLinks.map { $0.toJSON() } let value = !deviceLinksAsJSON.isEmpty ? [ "isPrimary" : isMaster ? 1 : 0, "authorisations" : deviceLinksAsJSON ] : nil @@ -98,9 +98,9 @@ public final class LokiFileServerAPI : LokiDotNetAPI { let url = URL(string: "\(server)/users/me")! let request = TSRequest(url: url, method: "PATCH", parameters: parameters) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return attempt(maxRetryCount: 8, recoveringOn: LokiAPI.workQueue) { - LokiFileServerProxy(for: server).perform(request).map { _ in } - }.handlingInvalidAuthTokenIfNeeded(for: server).recover { error in + return attempt(maxRetryCount: 8) { + LokiFileServerProxy(for: server).perform(request).map2 { _ in } + }.handlingInvalidAuthTokenIfNeeded(for: server).recover2 { error in print("Couldn't update device links due to error: \(error).") throw error } @@ -114,7 +114,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI { deviceLinks = storage.getDeviceLinks(for: getUserHexEncodedPublicKey(), in: transaction) } deviceLinks.insert(deviceLink) - return setDeviceLinks(deviceLinks).map(on: LokiAPI.workQueue) { _ in + return setDeviceLinks(deviceLinks).map2 { _ in storage.addDeviceLink(deviceLink) } } @@ -126,7 +126,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI { deviceLinks = storage.getDeviceLinks(for: getUserHexEncodedPublicKey(), in: transaction) } deviceLinks.remove(deviceLink) - return setDeviceLinks(deviceLinks).map(on: LokiAPI.workQueue) { _ in + return setDeviceLinks(deviceLinks).map2 { _ in storage.removeDeviceLink(deviceLink) } } @@ -151,7 +151,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI { print("[Loki] Couldn't upload profile picture due to error: \(error).") return Promise(error: error) } - return LokiFileServerProxy(for: server).performLokiFileServerNSURLRequest(request as NSURLRequest).map { responseObject in + return LokiFileServerProxy(for: server).performLokiFileServerNSURLRequest(request as NSURLRequest).map2 { responseObject in guard let json = responseObject as? JSON, let data = json["data"] as? JSON, let downloadURL = data["url"] as? String else { print("[Loki] Couldn't parse profile picture from: \(responseObject).") throw LokiDotNetAPIError.parsingFailed diff --git a/SignalServiceKit/src/Loki/API/LokiFileServerProxy.swift b/SignalServiceKit/src/Loki/API/LokiFileServerProxy.swift index f276d7af3..d9221c181 100644 --- a/SignalServiceKit/src/Loki/API/LokiFileServerProxy.swift +++ b/SignalServiceKit/src/Loki/API/LokiFileServerProxy.swift @@ -49,7 +49,7 @@ internal class LokiFileServerProxy : LokiHTTPClient { DispatchQueue.global(qos: .userInitiated).async { let uncheckedSymmetricKey = try? Curve25519.generateSharedSecret(fromPublicKey: LokiFileServerProxy.fileServerPublicKey, privateKey: keyPair.privateKey) guard let symmetricKey = uncheckedSymmetricKey else { return seal.reject(Error.symmetricKeyGenerationFailed) } - LokiAPI.getRandomSnode().then(on: DispatchQueue.global()) { proxy -> Promise in + LokiAPI.getRandomSnode().then2 { proxy -> Promise in let url = "\(proxy.address):\(proxy.port)/file_proxy" guard let urlAsString = request.url?.absoluteString, let serverURLEndIndex = urlAsString.range(of: server)?.upperBound, serverURLEndIndex < urlAsString.endIndex else { throw Error.endpointParsingFailed } @@ -102,7 +102,7 @@ internal class LokiFileServerProxy : LokiHTTPClient { } task.resume() return promise - }.map(on: DispatchQueue.global(qos: .userInitiated)) { rawResponse in + }.map2 { rawResponse in guard let responseAsData = rawResponse as? Data, let responseAsJSON = try? JSONSerialization.jsonObject(with: responseAsData, options: .allowFragments) as? JSON, let base64EncodedCipherText = responseAsJSON["data"] as? String, let meta = responseAsJSON["meta"] as? JSON, let statusCode = meta["code"] as? Int, let cipherText = Data(base64Encoded: base64EncodedCipherText) else { print("[Loki] Received an invalid response.") @@ -115,9 +115,9 @@ internal class LokiFileServerProxy : LokiHTTPClient { let uncheckedJSON = try? JSONSerialization.jsonObject(with: uncheckedJSONAsData, options: .allowFragments) as? JSON guard let json = uncheckedJSON else { throw HTTPError.networkError(code: -1, response: nil, underlyingError: Error.proxyResponseParsingFailed) } return json - }.done(on: DispatchQueue.global()) { rawResponse in + }.done2 { rawResponse in seal.fulfill(rawResponse) - }.catch(on: DispatchQueue.global()) { error in + }.catch2 { error in print("[Loki] File server proxy request failed with error: \(error.localizedDescription).") seal.reject(HTTPError.from(error: error) ?? error) } diff --git a/SignalServiceKit/src/Loki/API/LokiHTTPClient.swift b/SignalServiceKit/src/Loki/API/LokiHTTPClient.swift index f42a247e9..516a903b4 100644 --- a/SignalServiceKit/src/Loki/API/LokiHTTPClient.swift +++ b/SignalServiceKit/src/Loki/API/LokiHTTPClient.swift @@ -10,12 +10,12 @@ public class LokiHTTPClient { securityPolicy.validatesDomainName = false result.securityPolicy = securityPolicy result.responseSerializer = AFHTTPResponseSerializer() - result.completionQueue = DispatchQueue.global() + result.completionQueue = DispatchQueue.global(qos: .userInitiated) return result }() internal func perform(_ request: TSRequest, withCompletionQueue queue: DispatchQueue = DispatchQueue.main) -> LokiAPI.RawResponsePromise { - return TSNetworkManager.shared().perform(request, withCompletionQueue: queue).map { $0.responseObject }.recover { error -> LokiAPI.RawResponsePromise in + return TSNetworkManager.shared().perform(request, withCompletionQueue: queue).map2 { $0.responseObject }.recover2 { error -> LokiAPI.RawResponsePromise in throw HTTPError.from(error: error) ?? error } } diff --git a/SignalServiceKit/src/Loki/API/LokiPoller.swift b/SignalServiceKit/src/Loki/API/LokiPoller.swift index 9ebdf1c84..593410273 100644 --- a/SignalServiceKit/src/Loki/API/LokiPoller.swift +++ b/SignalServiceKit/src/Loki/API/LokiPoller.swift @@ -54,13 +54,13 @@ public final class LokiPoller : NSObject { // MARK: Private API private func setUpPolling() { guard !hasStopped else { return } - LokiAPI.getSwarm(for: getUserHexEncodedPublicKey()).then { [weak self] _ -> Promise in + LokiAPI.getSwarm(for: getUserHexEncodedPublicKey()).then2 { [weak self] _ -> Promise in guard let strongSelf = self else { return Promise { $0.fulfill(()) } } strongSelf.usedSnodes.removeAll() let (promise, seal) = Promise.pending() strongSelf.pollNextSnode(seal: seal) return promise - }.ensure { [weak self] in + }.ensure2 { [weak self] in guard let strongSelf = self, !strongSelf.hasStopped else { return } Timer.scheduledTimer(withTimeInterval: LokiPoller.retryInterval, repeats: false) { _ in guard let strongSelf = self else { return } @@ -77,9 +77,9 @@ public final class LokiPoller : NSObject { // randomElement() uses the system's default random generator, which is cryptographically secure let nextSnode = unusedSnodes.randomElement()! usedSnodes.insert(nextSnode) - poll(nextSnode, seal: seal).done(on: LokiAPI.workQueue) { + poll(nextSnode, seal: seal).done2 { seal.fulfill(()) - }.catch(on: LokiAPI.errorHandlingQueue) { [weak self] error in + }.catch2 { [weak self] error in if let error = error as? Error, error == .pollLimitReached { self?.pollCount = 0 } else { @@ -94,7 +94,7 @@ public final class LokiPoller : NSObject { } private func poll(_ target: LokiAPITarget, seal longTermSeal: Resolver) -> Promise { - return LokiAPI.getRawMessages(from: target, usingLongPolling: false).then(on: LokiAPI.workQueue) { [weak self] rawResponse -> Promise in + return LokiAPI.getRawMessages(from: target, usingLongPolling: false).then2 { [weak self] rawResponse -> Promise in guard let strongSelf = self, !strongSelf.hasStopped else { return Promise { $0.fulfill(()) } } let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target) strongSelf.onMessagesReceived(messages) diff --git a/SignalServiceKit/src/Loki/API/Onion Requests/OnionRequestAPI.swift b/SignalServiceKit/src/Loki/API/Onion Requests/OnionRequestAPI.swift index 9e07c9c9e..dcfdc7e19 100644 --- a/SignalServiceKit/src/Loki/API/Onion Requests/OnionRequestAPI.swift +++ b/SignalServiceKit/src/Loki/API/Onion Requests/OnionRequestAPI.swift @@ -3,9 +3,7 @@ import PromiseKit /// See the "Onion Requests" section of [The Session Whitepaper](https://arxiv.org/pdf/2002.04609.pdf) for more information. public enum OnionRequestAPI { - /// - Note: Must only be modified from `LokiAPI.workQueue`. public static var guardSnodes: Set = [] - /// - Note: Must only be modified from `LokiAPI.workQueue`. public static var paths: [Path] = [] // Not a set to ensure we consistently show the same path to the user private static var snodePool: Set { @@ -51,11 +49,10 @@ public enum OnionRequestAPI { /// Tests the given snode. The returned promise errors out if the snode is faulty; the promise is fulfilled otherwise. private static func testSnode(_ snode: LokiAPITarget) -> Promise { let (promise, seal) = Promise.pending() - let queue = DispatchQueue.global() // No need to block the work queue for this - queue.async { + DispatchQueue.global(qos: .userInitiated).async { let url = "\(snode.address):\(snode.port)/get_stats/v1" let timeout: TimeInterval = 3 // Use a shorter timeout for testing - HTTP.execute(.get, url, timeout: timeout).done(on: queue) { rawResponse in + HTTP.execute(.get, url, timeout: timeout).done2 { rawResponse in guard let json = rawResponse as? JSON, let version = json["version"] as? String else { return seal.reject(Error.missingSnodeVersion) } if version >= "2.0.0" { seal.fulfill(()) @@ -63,7 +60,7 @@ public enum OnionRequestAPI { print("[Loki] [Onion Request API] Unsupported snode version: \(version).") seal.reject(Error.unsupportedSnodeVersion(version)) } - }.catch(on: queue) { error in + }.catch2 { error in seal.reject(error) } } @@ -77,7 +74,7 @@ public enum OnionRequestAPI { return Promise> { $0.fulfill(guardSnodes) } } else { print("[Loki] [Onion Request API] Populating guard snode cache.") - return LokiAPI.getRandomSnode().then(on: LokiAPI.workQueue) { _ -> Promise> in // Just used to populate the snode pool + return LokiAPI.getRandomSnode().then2 { _ -> Promise> in // Just used to populate the snode pool var unusedSnodes = snodePool // Sync on LokiAPI.workQueue guard unusedSnodes.count >= guardSnodeCount else { throw Error.insufficientSnodes } func getGuardSnode() -> Promise { @@ -86,10 +83,10 @@ public enum OnionRequestAPI { unusedSnodes.remove(candidate) // All used snodes should be unique print("[Loki] [Onion Request API] Testing guard snode: \(candidate).") // Loop until a reliable guard snode is found - return testSnode(candidate).map(on: LokiAPI.workQueue) { candidate }.recover(on: LokiAPI.workQueue) { _ in getGuardSnode() } + return testSnode(candidate).map2 { candidate }.recover2 { _ in getGuardSnode() } } let promises = (0.. Promise<[Path]> in // Just used to populate the snode pool - return getGuardSnodes().map(on: LokiAPI.workQueue) { guardSnodes -> [Path] in + return LokiAPI.getRandomSnode().then2 { _ -> Promise<[Path]> in // Just used to populate the snode pool + return getGuardSnodes().map2 { guardSnodes -> [Path] in var unusedSnodes = snodePool.subtracting(guardSnodes) let pathSnodeCount = guardSnodeCount * pathSize - guardSnodeCount guard unusedSnodes.count >= pathSnodeCount else { throw Error.insufficientSnodes } @@ -121,7 +118,7 @@ public enum OnionRequestAPI { print("[Loki] [Onion Request API] Built new onion request path: \(result.prettifiedDescription).") return result } - }.map(on: LokiAPI.workQueue) { paths in + }.map2 { paths in OnionRequestAPI.paths = paths try! Storage.writeSync { transaction in print("[Loki] Persisting onion request paths to database.") @@ -155,7 +152,7 @@ public enum OnionRequestAPI { seal.fulfill(paths.filter { !$0.contains(snode) }.randomElement()!) } } else { - return buildPaths().map(on: LokiAPI.workQueue) { paths in + return buildPaths().map2 { paths in return paths.filter { !$0.contains(snode) }.randomElement()! } } @@ -177,10 +174,10 @@ public enum OnionRequestAPI { var guardSnode: LokiAPITarget! var targetSnodeSymmetricKey: Data! // Needed by invoke(_:on:with:) to decrypt the response sent back by the target snode var encryptionResult: EncryptionResult! - return getPath(excluding: snode).then(on: LokiAPI.workQueue) { path -> Promise in + return getPath(excluding: snode).then2 { path -> Promise in guardSnode = path.first! // Encrypt in reverse order, i.e. the target snode first - return encrypt(payload, forTargetSnode: snode).then(on: LokiAPI.workQueue) { r -> Promise in + return encrypt(payload, forTargetSnode: snode).then2 { r -> Promise in targetSnodeSymmetricKey = r.symmetricKey // Recursively encrypt the layers of the onion (again in reverse order) encryptionResult = r @@ -191,7 +188,7 @@ public enum OnionRequestAPI { return Promise { $0.fulfill(encryptionResult) } } else { let lhs = path.removeLast() - return OnionRequestAPI.encryptHop(from: lhs, to: rhs, using: encryptionResult).then(on: LokiAPI.workQueue) { r -> Promise in + return OnionRequestAPI.encryptHop(from: lhs, to: rhs, using: encryptionResult).then2 { r -> Promise in encryptionResult = r rhs = lhs return addLayer() @@ -200,7 +197,7 @@ public enum OnionRequestAPI { } return addLayer() } - }.map(on: LokiAPI.workQueue) { _ in (guardSnode, encryptionResult, targetSnodeSymmetricKey) } + }.map2 { _ in (guardSnode, encryptionResult, targetSnodeSymmetricKey) } } // MARK: Internal API @@ -208,9 +205,9 @@ public enum OnionRequestAPI { internal static func sendOnionRequest(invoking method: LokiAPITarget.Method, on snode: LokiAPITarget, with parameters: JSON, associatedWith hexEncodedPublicKey: String) -> Promise { let (promise, seal) = Promise.pending() var guardSnode: LokiAPITarget! - LokiAPI.workQueue.async { + DispatchQueue.global(qos: .userInitiated).async { let payload: JSON = [ "method" : method.rawValue, "params" : parameters ] - buildOnion(around: payload, targetedAt: snode).done(on: LokiAPI.workQueue) { intermediate in + buildOnion(around: payload, targetedAt: snode).done2 { intermediate in guardSnode = intermediate.guardSnode let url = "\(guardSnode.address):\(guardSnode.port)/onion_req" let finalEncryptionResult = intermediate.finalEncryptionResult @@ -220,7 +217,7 @@ public enum OnionRequestAPI { "ephemeral_key" : finalEncryptionResult.ephemeralPublicKey.toHexString() ] let targetSnodeSymmetricKey = intermediate.targetSnodeSymmetricKey - HTTP.execute(.post, url, parameters: parameters).done(on: LokiAPI.workQueue) { rawResponse in + HTTP.execute(.post, url, parameters: parameters).done2 { rawResponse in guard let json = rawResponse as? JSON, let base64EncodedIVAndCiphertext = json["result"] as? String, let ivAndCiphertext = Data(base64Encoded: base64EncodedIVAndCiphertext) else { return seal.reject(HTTP.Error.invalidJSON) } let iv = ivAndCiphertext[0.. Promise { - return recover(on: LokiAPI.errorHandlingQueue) { error -> Promise in // Must be invoked on LokiAPI.errorHandlingQueue + return recover2 { error -> Promise in // Must be invoked on LokiAPI.errorHandlingQueue // The code below is very similar to that in LokiAPI.handlingSnodeErrorsIfNeeded(for:associatedWith:), but unfortunately slightly // different due to the fact that OnionRequestAPI uses the newer HTTP API, whereas LokiAPI still uses TSNetworkManager guard case OnionRequestAPI.Error.httpRequestFailedAtTargetSnode(let statusCode, let json) = error else { throw error } diff --git a/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatAPI.swift b/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatAPI.swift index a6a7b8794..3cad1c4bf 100644 --- a/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatAPI.swift +++ b/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatAPI.swift @@ -86,11 +86,11 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { } else { queryParameters += "&count=\(fallbackBatchCount)&include_deleted=0" } - return getAuthToken(for: server).then { token -> Promise<[LokiPublicChatMessage]> in + return getAuthToken(for: server).then2 { token -> Promise<[LokiPublicChatMessage]> in let url = URL(string: "\(server)/channels/\(channel)/messages?\(queryParameters)")! let request = TSRequest(url: url) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).map(on: DispatchQueue.global()) { rawResponse in + return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in guard let json = rawResponse as? JSON, let rawMessages = json["data"] as? [JSON] else { print("[Loki] Couldn't parse messages for public chat channel with ID: \(channel) on server: \(server) from: \(rawResponse).") throw LokiDotNetAPIError.parsingFailed @@ -172,17 +172,16 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { public static func sendMessage(_ message: LokiPublicChatMessage, to channel: UInt64, on server: String) -> Promise { print("[Loki] Sending message to public chat channel with ID: \(channel) on server: \(server).") let (promise, seal) = Promise.pending() - let queue = DispatchQueue.global() - queue.async { [privateKey = userKeyPair.privateKey] in + DispatchQueue.global(qos: .userInitiated).async { [privateKey = userKeyPair.privateKey] in guard let signedMessage = message.sign(with: privateKey) else { return seal.reject(LokiDotNetAPIError.signingFailed) } - attempt(maxRetryCount: maxRetryCount, recoveringOn: queue) { - getAuthToken(for: server).then { token -> Promise in + attempt(maxRetryCount: maxRetryCount) { + getAuthToken(for: server).then2 { token -> Promise in let url = URL(string: "\(server)/channels/\(channel)/messages")! let parameters = signedMessage.toJSON() let request = TSRequest(url: url, method: "POST", parameters: parameters) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] let displayName = userDisplayName - return LokiFileServerProxy(for: server).perform(request).map { rawResponse in + return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in // ISO8601DateFormatter doesn't support milliseconds before iOS 11 let dateFormatter = DateFormatter() dateFormatter.dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ" @@ -195,9 +194,9 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { return LokiPublicChatMessage(serverID: serverID, hexEncodedPublicKey: getUserHexEncodedPublicKey(), displayName: displayName, profilePicture: signedMessage.profilePicture, body: body, type: publicChatMessageType, timestamp: timestamp, quote: signedMessage.quote, attachments: signedMessage.attachments, signature: signedMessage.signature) } }.handlingInvalidAuthTokenIfNeeded(for: server) - }.done { message in + }.done2 { message in seal.fulfill(message) - }.catch { error in + }.catch2 { error in seal.reject(error) } } @@ -213,11 +212,11 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { } else { queryParameters = "count=\(fallbackBatchCount)" } - return getAuthToken(for: server).then { token -> Promise<[UInt64]> in + return getAuthToken(for: server).then2 { token -> Promise<[UInt64]> in let url = URL(string: "\(server)/loki/v1/channel/\(channel)/deletes?\(queryParameters)")! let request = TSRequest(url: url) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).map { rawResponse in + return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in guard let json = rawResponse as? JSON, let deletions = json["data"] as? [JSON] else { print("[Loki] Couldn't parse deleted messages for public chat channel with ID: \(channel) on server: \(server) from: \(rawResponse).") throw LokiDotNetAPIError.parsingFailed @@ -244,12 +243,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { let isModerationRequest = !isSentByUser print("[Loki] Deleting message with ID: \(messageID) for public chat channel with ID: \(channel) on server: \(server) (isModerationRequest = \(isModerationRequest)).") let urlAsString = isSentByUser ? "\(server)/channels/\(channel)/messages/\(messageID)" : "\(server)/loki/v1/moderation/message/\(messageID)" - return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { - getAuthToken(for: server).then { token -> Promise in + return attempt(maxRetryCount: maxRetryCount) { + getAuthToken(for: server).then2 { token -> Promise in let url = URL(string: urlAsString)! let request = TSRequest(url: url, method: "DELETE", parameters: [:]) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).done { result -> Void in + return LokiFileServerProxy(for: server).perform(request).done2 { result -> Void in print("[Loki] Deleted message with ID: \(messageID) on server: \(server).") } }.handlingInvalidAuthTokenIfNeeded(for: server) @@ -262,11 +261,11 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { guard let hexEncodedPublicKeys = displayNameUpdatees[publicChatID] else { return Promise.value(()) } displayNameUpdatees[publicChatID] = [] print("[Loki] Getting display names for: \(hexEncodedPublicKeys).") - return getAuthToken(for: server).then { token -> Promise in + return getAuthToken(for: server).then2 { token -> Promise in let queryParameters = "ids=\(hexEncodedPublicKeys.map { "@\($0)" }.joined(separator: ","))&include_user_annotations=1" let url = URL(string: "\(server)/users?\(queryParameters)")! let request = TSRequest(url: url) - return LokiFileServerProxy(for: server).perform(request).map { rawResponse in + return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in guard let json = rawResponse as? JSON, let data = json["data"] as? [JSON] else { print("[Loki] Couldn't parse display names for users: \(hexEncodedPublicKeys) from: \(rawResponse).") throw LokiDotNetAPIError.parsingFailed @@ -292,12 +291,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { public static func setDisplayName(to newDisplayName: String?, on server: String) -> Promise { print("[Loki] Updating display name on server: \(server).") let parameters: JSON = [ "name" : (newDisplayName ?? "") ] - return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { - getAuthToken(for: server).then { token -> Promise in + return attempt(maxRetryCount: maxRetryCount) { + getAuthToken(for: server).then2 { token -> Promise in let url = URL(string: "\(server)/users/me")! let request = TSRequest(url: url, method: "PATCH", parameters: parameters) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in + return LokiFileServerProxy(for: server).perform(request).map2 { _ in }.recover2 { error in print("Couldn't update display name due to error: \(error).") throw error } @@ -317,12 +316,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { annotation["value"] = [ "profileKey" : profileKey.base64EncodedString(), "url" : url ] } let parameters: JSON = [ "annotations" : [ annotation ] ] - return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { - getAuthToken(for: server).then { token -> Promise in + return attempt(maxRetryCount: maxRetryCount) { + getAuthToken(for: server).then2 { token -> Promise in let url = URL(string: "\(server)/users/me")! let request = TSRequest(url: url, method: "PATCH", parameters: parameters) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in + return LokiFileServerProxy(for: server).perform(request).map2 { _ in }.recover2 { error in print("[Loki] Couldn't update profile picture due to error: \(error).") throw error } @@ -337,12 +336,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { } public static func getInfo(for channel: UInt64, on server: String) -> Promise { - return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { - getAuthToken(for: server).then { token -> Promise in + return attempt(maxRetryCount: maxRetryCount) { + getAuthToken(for: server).then2 { token -> Promise in let url = URL(string: "\(server)/channels/\(channel)?include_annotations=1")! let request = TSRequest(url: url) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).map { rawResponse in + return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in guard let json = rawResponse as? JSON, let data = json["data"] as? JSON, let annotations = data["annotations"] as? [JSON], @@ -366,12 +365,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { } public static func join(_ channel: UInt64, on server: String) -> Promise { - return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { - getAuthToken(for: server).then { token -> Promise in + return attempt(maxRetryCount: maxRetryCount) { + getAuthToken(for: server).then2 { token -> Promise in let url = URL(string: "\(server)/channels/\(channel)/subscribe")! let request = TSRequest(url: url, method: "POST", parameters: [:]) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).done { result -> Void in + return LokiFileServerProxy(for: server).perform(request).done2 { result -> Void in print("[Loki] Joined channel with ID: \(channel) on server: \(server).") } }.handlingInvalidAuthTokenIfNeeded(for: server) @@ -379,12 +378,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { } public static func leave(_ channel: UInt64, on server: String) -> Promise { - return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { - getAuthToken(for: server).then { token -> Promise in + return attempt(maxRetryCount: maxRetryCount) { + getAuthToken(for: server).then2 { token -> Promise in let url = URL(string: "\(server)/channels/\(channel)/subscribe")! let request = TSRequest(url: url, method: "DELETE", parameters: [:]) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).done { result -> Void in + return LokiFileServerProxy(for: server).perform(request).done2 { result -> Void in print("[Loki] Left channel with ID: \(channel) on server: \(server).") } }.handlingInvalidAuthTokenIfNeeded(for: server) @@ -401,16 +400,16 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { let url = URL(string: "\(server)/loki/v1/channels/\(channel)/messages/\(messageID)/report")! let request = TSRequest(url: url, method: "POST", parameters: [:]) // Only used for the Loki Public Chat which doesn't require authentication - return LokiFileServerProxy(for: server).perform(request).map { _ in } + return LokiFileServerProxy(for: server).perform(request).map2 { _ in } } // MARK: Moderators public static func getModerators(for channel: UInt64, on server: String) -> Promise> { - return getAuthToken(for: server).then { token -> Promise> in + return getAuthToken(for: server).then2 { token -> Promise> in let url = URL(string: "\(server)/loki/v1/channel/\(channel)/get_moderators")! let request = TSRequest(url: url) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).map { rawResponse in + return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in guard let json = rawResponse as? JSON, let moderators = json["moderators"] as? [String] else { print("[Loki] Couldn't parse moderators for public chat channel with ID: \(channel) on server: \(server) from: \(rawResponse).") throw LokiDotNetAPIError.parsingFailed diff --git a/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatManager.swift b/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatManager.swift index 098f0853b..1c1d508ea 100644 --- a/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatManager.swift +++ b/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatManager.swift @@ -56,9 +56,9 @@ public final class LokiPublicChatManager : NSObject { return Promise(error: Error.chatCreationFailed) } } - return LokiPublicChatAPI.getAuthToken(for: server).then { token in + return LokiPublicChatAPI.getAuthToken(for: server).then2 { token in return LokiPublicChatAPI.getInfo(for: channel, on: server) - }.map { channelInfo -> LokiPublicChat in + }.map2 { channelInfo -> LokiPublicChat in guard let chat = self.addChat(server: server, channel: channel, name: channelInfo.displayName) else { throw Error.chatCreationFailed } return chat } diff --git a/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatPoller.swift b/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatPoller.swift index 3e977185f..6d9fc5906 100644 --- a/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatPoller.swift +++ b/SignalServiceKit/src/Loki/API/Open Groups/LokiPublicChatPoller.swift @@ -54,7 +54,7 @@ public final class LokiPublicChatPoller : NSObject { public func pollForNewMessages() -> Promise { let publicChat = self.publicChat let userHexEncodedPublicKey = self.userHexEncodedPublicKey - return LokiPublicChatAPI.getMessages(for: publicChat.channel, on: publicChat.server).done(on: DispatchQueue.global()) { messages in + return LokiPublicChatAPI.getMessages(for: publicChat.channel, on: publicChat.server).done2 { messages in let uniqueHexEncodedPublicKeys = Set(messages.map { $0.hexEncodedPublicKey }) func proceed() { let storage = OWSPrimaryStorage.shared() @@ -188,12 +188,12 @@ public final class LokiPublicChatPoller : NSObject { return timeSinceLastUpdate > MultiDeviceProtocol.deviceLinkUpdateInterval } if !hexEncodedPublicKeysToUpdate.isEmpty { - LokiFileServerAPI.getDeviceLinks(associatedWith: hexEncodedPublicKeysToUpdate).done(on: DispatchQueue.global()) { _ in + LokiFileServerAPI.getDeviceLinks(associatedWith: hexEncodedPublicKeysToUpdate).done2 { _ in proceed() hexEncodedPublicKeysToUpdate.forEach { MultiDeviceProtocol.lastDeviceLinkUpdate[$0] = Date() // TODO: Doing this from a global queue seems a bit iffy } - }.catch(on: DispatchQueue.global()) { error in + }.catch2 { error in if (error as? LokiDotNetAPI.LokiDotNetAPIError) == LokiDotNetAPI.LokiDotNetAPIError.parsingFailed { // Don't immediately re-fetch in case of failure due to a parsing error hexEncodedPublicKeysToUpdate.forEach { @@ -203,7 +203,7 @@ public final class LokiPublicChatPoller : NSObject { proceed() } } else { - DispatchQueue.global().async { + DispatchQueue.global(qos: .userInitiated).async { proceed() } } @@ -212,7 +212,7 @@ public final class LokiPublicChatPoller : NSObject { private func pollForDeletedMessages() { let publicChat = self.publicChat - let _ = LokiPublicChatAPI.getDeletedMessageServerIDs(for: publicChat.channel, on: publicChat.server).done(on: DispatchQueue.global()) { deletedMessageServerIDs in + let _ = LokiPublicChatAPI.getDeletedMessageServerIDs(for: publicChat.channel, on: publicChat.server).done2 { deletedMessageServerIDs in try! Storage.writeSync { transaction in let deletedMessageIDs = deletedMessageServerIDs.compactMap { OWSPrimaryStorage.shared().getIDForMessage(withServerID: UInt($0), in: transaction) } deletedMessageIDs.forEach { messageID in diff --git a/SignalServiceKit/src/Loki/API/Shelved/LokiRSSFeedProxy.swift b/SignalServiceKit/src/Loki/API/Shelved/LokiRSSFeedProxy.swift index 3ff398758..69f06819a 100644 --- a/SignalServiceKit/src/Loki/API/Shelved/LokiRSSFeedProxy.swift +++ b/SignalServiceKit/src/Loki/API/Shelved/LokiRSSFeedProxy.swift @@ -18,7 +18,7 @@ public enum LokiRSSFeedProxy { let endpoint = endpoints.first { url.lowercased().contains($0.key) }!.value let url = URL(string: server + "/" + endpoint)! let request = TSRequest(url: url) - return LokiFileServerProxy(for: server).perform(request).map { response -> String in + return LokiFileServerProxy(for: server).perform(request).map2 { response -> String in guard let json = response as? JSON, let xml = json["data"] as? String else { throw Error.proxyResponseParsingFailed } return xml } diff --git a/SignalServiceKit/src/Loki/Protocol/Mentions/MentionsManager.swift b/SignalServiceKit/src/Loki/Protocol/Mentions/MentionsManager.swift index 0d7c28cd0..d5a9f3454 100644 --- a/SignalServiceKit/src/Loki/Protocol/Mentions/MentionsManager.swift +++ b/SignalServiceKit/src/Loki/Protocol/Mentions/MentionsManager.swift @@ -2,14 +2,8 @@ @objc(LKMentionsManager) public final class MentionsManager : NSObject { - private static var _userHexEncodedPublicKeyCache: [String:Set] = [:] /// A mapping from thread ID to set of user hex encoded public keys. - @objc public static var userPublicKeyCache: [String:Set] { - get { LokiAPI.stateQueue.sync { _userHexEncodedPublicKeyCache } } - set { LokiAPI.stateQueue.sync { _userHexEncodedPublicKeyCache = newValue } } - } - - // TODO: I don't think stateQueue actually helps avoid race conditions + @objc public static var userPublicKeyCache: [String:Set] = [:] internal static var storage: OWSPrimaryStorage { OWSPrimaryStorage.shared() } diff --git a/SignalServiceKit/src/Loki/Protocol/Multi Device/MultiDeviceProtocol.swift b/SignalServiceKit/src/Loki/Protocol/Multi Device/MultiDeviceProtocol.swift index 807a1446c..a768283db 100644 --- a/SignalServiceKit/src/Loki/Protocol/Multi Device/MultiDeviceProtocol.swift +++ b/SignalServiceKit/src/Loki/Protocol/Multi Device/MultiDeviceProtocol.swift @@ -15,12 +15,8 @@ import PromiseKit @objc(LKMultiDeviceProtocol) public final class MultiDeviceProtocol : NSObject { - private static var _lastDeviceLinkUpdate: [String:Date] = [:] /// A mapping from hex encoded public key to date updated. - public static var lastDeviceLinkUpdate: [String:Date] { - get { LokiAPI.stateQueue.sync { _lastDeviceLinkUpdate } } - set { LokiAPI.stateQueue.sync { _lastDeviceLinkUpdate = newValue } } - } + public static var lastDeviceLinkUpdate: [String:Date] = [:] // TODO: I don't think stateQueue actually helps avoid race conditions diff --git a/SignalServiceKit/src/Loki/Protocol/Session Management/LokiSessionResetImplementation.swift b/SignalServiceKit/src/Loki/Protocol/Session Management/LokiSessionResetImplementation.swift index acf6833b6..57d7414d7 100644 --- a/SignalServiceKit/src/Loki/Protocol/Session Management/LokiSessionResetImplementation.swift +++ b/SignalServiceKit/src/Loki/Protocol/Session Management/LokiSessionResetImplementation.swift @@ -22,7 +22,7 @@ public class LokiSessionResetImplementation : NSObject, SessionResetProtocol { guard let preKeyMessage = whisperMessage as? PreKeyWhisperMessage else { return } guard let storedPreKey = storage.getPreKeyRecord(forContact: recipientID, transaction: transaction) else { print("[Loki] Received a friend request accepted message from a public key for which no pre key bundle was created.") - return // FIXME: This is causing trouble when it shouldn't... + return } guard storedPreKey.id == preKeyMessage.prekeyID else { print("[Loki] Received a `PreKeyWhisperMessage` (friend request accepted message) from an unknown source.") diff --git a/SignalServiceKit/src/Loki/Protocol/Sync Messages/SyncMessagesProtocol.swift b/SignalServiceKit/src/Loki/Protocol/Sync Messages/SyncMessagesProtocol.swift index 5574c8463..29f7edc08 100644 --- a/SignalServiceKit/src/Loki/Protocol/Sync Messages/SyncMessagesProtocol.swift +++ b/SignalServiceKit/src/Loki/Protocol/Sync Messages/SyncMessagesProtocol.swift @@ -48,7 +48,7 @@ public final class SyncMessagesProtocol : NSObject { let friends = Set(hepks).map { SignalAccount(recipientId: $0) } let syncManager = SSKEnvironment.shared.syncManager let promises = friends.chunked(by: 3).map { friends -> Promise in // TODO: Does this always fit? - return Promise(syncManager.syncContacts(for: friends)).map { _ in } + return Promise(syncManager.syncContacts(for: friends)).map2 { _ in } } return when(fulfilled: promises) } @@ -67,7 +67,7 @@ public final class SyncMessagesProtocol : NSObject { } let syncManager = SSKEnvironment.shared.syncManager let promises = groups.map { group -> Promise in - return Promise(syncManager.syncGroup(for: group)).map { _ in } + return Promise(syncManager.syncGroup(for: group)).map2 { _ in } } return when(fulfilled: promises) } diff --git a/SignalServiceKit/src/Loki/Push Notifications/LokiPushNotificationManager.swift b/SignalServiceKit/src/Loki/Push Notifications/LokiPushNotificationManager.swift index cacff0001..2259dd98c 100644 --- a/SignalServiceKit/src/Loki/Push Notifications/LokiPushNotificationManager.swift +++ b/SignalServiceKit/src/Loki/Push Notifications/LokiPushNotificationManager.swift @@ -36,7 +36,7 @@ public final class LokiPushNotificationManager : NSObject { let url = URL(string: server + "register")! let request = TSRequest(url: url, method: "POST", parameters: parameters) request.allHTTPHeaderFields = [ "Content-Type" : "application/json" ] - let promise = TSNetworkManager.shared().makePromise(request: request).map { _, response in + let promise = TSNetworkManager.shared().makePromise(request: request).map2 { _, response in guard let json = response as? JSON else { return print("[Loki] Couldn't register device token.") } @@ -48,7 +48,7 @@ public final class LokiPushNotificationManager : NSObject { userDefaults[.isUsingFullAPNs] = false return } - promise.catch { error in + promise.catch2 { error in print("[Loki] Couldn't register device token.") } return promise @@ -77,7 +77,7 @@ public final class LokiPushNotificationManager : NSObject { let url = URL(string: server + "register")! let request = TSRequest(url: url, method: "POST", parameters: parameters) request.allHTTPHeaderFields = [ "Content-Type" : "application/json" ] - let promise = TSNetworkManager.shared().makePromise(request: request).map { _, response in + let promise = TSNetworkManager.shared().makePromise(request: request).map2 { _, response in guard let json = response as? JSON else { return print("[Loki] Couldn't register device token.") } @@ -89,7 +89,7 @@ public final class LokiPushNotificationManager : NSObject { userDefaults[.isUsingFullAPNs] = true return } - promise.catch { error in + promise.catch2 { error in print("[Loki] Couldn't register device token.") } return promise diff --git a/SignalServiceKit/src/Loki/Shelved/LokiP2PAPI.swift b/SignalServiceKit/src/Loki/Shelved/LokiP2PAPI.swift index 60a297b85..085123cae 100644 --- a/SignalServiceKit/src/Loki/Shelved/LokiP2PAPI.swift +++ b/SignalServiceKit/src/Loki/Shelved/LokiP2PAPI.swift @@ -204,7 +204,7 @@ public class LokiP2PAPI : NSObject { return } - messageSender.sendPromise(message: message).catch { error in + messageSender.sendPromise(message: message).catch2 { error in Logger.warn("Failed to send online status to \(thread.contactIdentifier()).") }.retainUntilComplete() } diff --git a/SignalServiceKit/src/Loki/Utilities/Promise+Retrying.swift b/SignalServiceKit/src/Loki/Utilities/Promise+Retrying.swift index 81895c044..b5c023487 100644 --- a/SignalServiceKit/src/Loki/Utilities/Promise+Retrying.swift +++ b/SignalServiceKit/src/Loki/Utilities/Promise+Retrying.swift @@ -3,7 +3,7 @@ import PromiseKit /// Retry the promise constructed in `body` up to `maxRetryCount` times. /// /// - Note: Intentionally explicit about the recovery queue at the call site. -internal func attempt(maxRetryCount: UInt, recoveringOn queue: DispatchQueue, body: @escaping () -> Promise) -> Promise { +internal func attempt(maxRetryCount: UInt, recoveringOn queue: DispatchQueue = .global(qos: .userInitiated), body: @escaping () -> Promise) -> Promise { var retryCount = 0 func attempt() -> Promise { return body().recover(on: queue) { error -> Promise in diff --git a/SignalServiceKit/src/Loki/Utilities/Promise+Threading.swift b/SignalServiceKit/src/Loki/Utilities/Promise+Threading.swift new file mode 100644 index 000000000..6daed1706 --- /dev/null +++ b/SignalServiceKit/src/Loki/Utilities/Promise+Threading.swift @@ -0,0 +1,76 @@ +import PromiseKit + +public extension Thenable { + + func then2(_ body: @escaping (T) throws -> U) -> Promise where U : Thenable { + return then(on: DispatchQueue.global(qos: .userInitiated), body) + } + + func map2(_ transform: @escaping (T) throws -> U) -> Promise { + return map(on: DispatchQueue.global(qos: .userInitiated), transform) + } + + func done2(_ body: @escaping (T) throws -> Void) -> Promise { + return done(on: DispatchQueue.global(qos: .userInitiated), body) + } + + func get2(_ body: @escaping (T) throws -> Void) -> Promise { + return get(on: DispatchQueue.global(qos: .userInitiated), body) + } +} + +public extension Thenable where T: Sequence { + + func mapValues2(_ transform: @escaping (T.Iterator.Element) throws -> U) -> Promise<[U]> { + return mapValues(on: DispatchQueue.global(qos: .userInitiated), transform) + } +} + +public extension Guarantee { + + func then2(_ body: @escaping (T) -> Guarantee) -> Guarantee { + return then(on: DispatchQueue.global(qos: .userInitiated), body) + } + + func map2(_ body: @escaping (T) -> U) -> Guarantee { + return map(on: DispatchQueue.global(qos: .userInitiated), body) + } + + func done2(_ body: @escaping (T) -> Void) -> Guarantee { + return done(on: DispatchQueue.global(qos: .userInitiated), body) + } + + func get2(_ body: @escaping (T) -> Void) -> Guarantee { + return get(on: DispatchQueue.global(qos: .userInitiated), body) + } +} + +public extension CatchMixin { + + func catch2(_ body: @escaping (Error) -> Void) -> PMKFinalizer { + return self.catch(on: DispatchQueue.global(qos: .userInitiated), body) + } + + func recover2(_ body: @escaping(Error) throws -> U) -> Promise where U.T == T { + return recover(on: DispatchQueue.global(qos: .userInitiated), body) + } + + func recover2(_ body: @escaping(Error) -> Guarantee) -> Guarantee { + return recover(on: DispatchQueue.global(qos: .userInitiated), body) + } + + func ensure2(_ body: @escaping () -> Void) -> Promise { + return ensure(on: DispatchQueue.global(qos: .userInitiated), body) + } +} + +public extension CatchMixin where T == Void { + + func recover2(_ body: @escaping(Error) -> Void) -> Guarantee { + return recover(on: DispatchQueue.global(qos: .userInitiated), body) + } + + func recover2(_ body: @escaping(Error) throws -> Void) -> Promise { + return recover(on: DispatchQueue.global(qos: .userInitiated), body) + } +}