diff --git a/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift index bf5632430..f04afdbc3 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift @@ -55,21 +55,22 @@ public extension LokiAPI { ] print("[Loki] Populating snode pool using: \(target).") let (promise, seal) = Promise.pending() - let queue = workQueue - HTTP.execute(.post, url, parameters: parameters).map(on: queue) { json in - guard let intermediate = json["result"] as? JSON, let rawTargets = intermediate["service_node_states"] as? [JSON] else { throw LokiAPIError.randomSnodePoolUpdatingFailed } - randomSnodePool = try Set(rawTargets.flatMap { rawTarget in - guard let address = rawTarget["public_ip"] as? String, let port = rawTarget["storage_port"] as? Int, let ed25519PublicKey = rawTarget["pubkey_ed25519"] as? String, let x25519PublicKey = rawTarget["pubkey_x25519"] as? String, address != "0.0.0.0" else { - print("[Loki] Failed to parse target from: \(rawTarget).") - return nil - } - return LokiAPITarget(address: "https://\(address)", port: UInt16(port), publicKeySet: LokiAPITarget.KeySet(ed25519Key: ed25519PublicKey, x25519Key: x25519PublicKey)) - }) - // randomElement() uses the system's default random generator, which is cryptographically secure - return randomSnodePool.randomElement()! - }.retryingIfNeeded(maxRetryCount: 4).done(on: queue) { snode in + attempt(maxRetryCount: 4, recoveringOn: workQueue) { + HTTP.execute(.post, url, parameters: parameters).map(on: workQueue) { json in + guard let intermediate = json["result"] as? JSON, let rawTargets = intermediate["service_node_states"] as? [JSON] else { throw LokiAPIError.randomSnodePoolUpdatingFailed } + randomSnodePool = try Set(rawTargets.flatMap { rawTarget in + guard let address = rawTarget["public_ip"] as? String, let port = rawTarget["storage_port"] as? Int, let ed25519PublicKey = rawTarget["pubkey_ed25519"] as? String, let x25519PublicKey = rawTarget["pubkey_x25519"] as? String, address != "0.0.0.0" else { + print("[Loki] Failed to parse target from: \(rawTarget).") + return nil + } + return LokiAPITarget(address: "https://\(address)", port: UInt16(port), publicKeySet: LokiAPITarget.KeySet(ed25519Key: ed25519PublicKey, x25519Key: x25519PublicKey)) + }) + // randomElement() uses the system's default random generator, which is cryptographically secure + return randomSnodePool.randomElement()! + } + }.done(on: workQueue) { snode in seal.fulfill(snode) - }.catch(on: queue) { error in + }.catch(on: workQueue) { error in print("[Loki] Failed to contact seed node at: \(target).") seal.reject(error) } diff --git a/SignalServiceKit/src/Loki/API/LokiAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI.swift index 9d43de67f..ce0cdeba9 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI.swift @@ -122,9 +122,11 @@ public final class LokiAPI : NSObject { // MARK: Public API public static func getMessages() -> Promise> { - return getTargetSnodes(for: userHexEncodedPublicKey).mapValues { targetSnode in - return getRawMessages(from: targetSnode, usingLongPolling: false).map { parseRawMessagesResponse($0, from: targetSnode) } - }.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount) + return attempt(maxRetryCount: maxRetryCount, recoveringOn: workQueue) { + getTargetSnodes(for: userHexEncodedPublicKey).mapValues { targetSnode in + getRawMessages(from: targetSnode, usingLongPolling: false).map { parseRawMessagesResponse($0, from: targetSnode) } + }.map { Set($0) } + } } public static func getDestinations(for hexEncodedPublicKey: String) -> Promise<[Destination]> { @@ -189,16 +191,18 @@ public final class LokiAPI : NSObject { let destination = lokiMessage.destination func sendLokiMessage(_ lokiMessage: LokiMessage, to target: LokiAPITarget) -> RawResponsePromise { let parameters = lokiMessage.toJSON() - return invoke(.sendMessage, on: target, associatedWith: destination, parameters: parameters) + return attempt(maxRetryCount: maxRetryCount, recoveringOn: workQueue) { + invoke(.sendMessage, on: target, associatedWith: destination, parameters: parameters) + } } func sendLokiMessageUsingSwarmAPI() -> Promise> { notificationCenter.post(name: .calculatingPoW, object: NSNumber(value: signalMessage.timestamp)) return lokiMessage.calculatePoW().then { lokiMessageWithPoW -> Promise> in notificationCenter.post(name: .routing, object: NSNumber(value: signalMessage.timestamp)) - return getTargetSnodes(for: destination).map { swarm in - return Set(swarm.map { target in + return getTargetSnodes(for: destination).map { snodes in + return Set(snodes.map { snode in notificationCenter.post(name: .messageSending, object: NSNumber(value: signalMessage.timestamp)) - return sendLokiMessage(lokiMessageWithPoW, to: target).map { rawResponse in + return sendLokiMessage(lokiMessageWithPoW, to: snode).map { rawResponse in if let json = rawResponse as? JSON, let powDifficulty = json["difficulty"] as? Int { guard powDifficulty != LokiAPI.powDifficulty else { return rawResponse } print("[Loki] Setting proof of work difficulty to \(powDifficulty).") @@ -207,14 +211,15 @@ public final class LokiAPI : NSObject { print("[Loki] Failed to update proof of work difficulty from: \(rawResponse).") } return rawResponse - }.retryingIfNeeded(maxRetryCount: maxRetryCount) + } }) - }.retryingIfNeeded(maxRetryCount: maxRetryCount) + } } } if let peer = LokiP2PAPI.getInfo(for: destination), (lokiMessage.isPing || peer.isOnline) { let target = LokiAPITarget(address: peer.address, port: peer.port, publicKeySet: nil) - return Promise.value([ target ]).mapValues { sendLokiMessage(lokiMessage, to: $0) }.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount).get { _ in + // TODO: Retrying + return Promise.value([ target ]).mapValues { sendLokiMessage(lokiMessage, to: $0) }.map { Set($0) }.get { _ in LokiP2PAPI.markOnline(destination) onP2PSuccess() }.recover { error -> Promise> in diff --git a/SignalServiceKit/src/Loki/API/LokiFileServerAPI.swift b/SignalServiceKit/src/Loki/API/LokiFileServerAPI.swift index f3492e949..1a8052e98 100644 --- a/SignalServiceKit/src/Loki/API/LokiFileServerAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiFileServerAPI.swift @@ -101,7 +101,9 @@ public final class LokiFileServerAPI : LokiDotNetAPI { let url = URL(string: "\(server)/users/me")! let request = TSRequest(url: url, method: "PATCH", parameters: parameters) request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).map { _ in }.retryingIfNeeded(maxRetryCount: 8).recover { error in + return attempt(maxRetryCount: 8, recoveringOn: LokiAPI.workQueue) { + LokiFileServerProxy(for: server).perform(request).map { _ in } + }.recover { error in print("Couldn't update device links due to error: \(error).") throw error } diff --git a/SignalServiceKit/src/Loki/API/Public Chats/LokiPublicChatAPI.swift b/SignalServiceKit/src/Loki/API/Public Chats/LokiPublicChatAPI.swift index 77480decc..bea572af7 100644 --- a/SignalServiceKit/src/Loki/API/Public Chats/LokiPublicChatAPI.swift +++ b/SignalServiceKit/src/Loki/API/Public Chats/LokiPublicChatAPI.swift @@ -155,11 +155,13 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { } public static func sendMessage(_ message: LokiPublicChatMessage, to channel: UInt64, on server: String) -> Promise { - return Promise { [privateKey = userKeyPair.privateKey] seal in - DispatchQueue.global().async { - guard let signedMessage = message.sign(with: privateKey) else { return seal.reject(LokiDotNetAPIError.signingFailed) } + print("[Loki] Sending message to public chat channel with ID: \(channel) on server: \(server).") + let (promise, seal) = Promise.pending() + let queue = DispatchQueue.global() + queue.async { [privateKey = userKeyPair.privateKey] in + guard let signedMessage = message.sign(with: privateKey) else { return seal.reject(LokiDotNetAPIError.signingFailed) } + attempt(maxRetryCount: maxRetryCount, recoveringOn: queue) { getAuthToken(for: server).then { token -> Promise in - print("[Loki] Sending message to public chat channel with ID: \(channel) on server: \(server).") let url = URL(string: "\(server)/channels/\(channel)/messages")! let parameters = signedMessage.toJSON() let request = TSRequest(url: url, method: "POST", parameters: parameters) @@ -183,13 +185,14 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { storage.dbReadWriteConnection.removeObject(forKey: server, inCollection: authTokenCollection) } throw error - }.retryingIfNeeded(maxRetryCount: maxRetryCount).done { message in - seal.fulfill(message) - }.catch { error in - seal.reject(error) } + }.done { message in + seal.fulfill(message) + }.catch { error in + seal.reject(error) } } + return promise } public static func getDeletedMessageServerIDs(for channel: UInt64, on server: String) -> Promise<[UInt64]> { @@ -220,16 +223,18 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { } public static func deleteMessage(with messageID: UInt, for channel: UInt64, on server: String, isSentByUser: Bool) -> Promise { - return getAuthToken(for: server).then { token -> Promise in - let isModerationRequest = !isSentByUser - print("[Loki] Deleting message with ID: \(messageID) for public chat channel with ID: \(channel) on server: \(server) (isModerationRequest = \(isModerationRequest)).") - let urlAsString = isSentByUser ? "\(server)/channels/\(channel)/messages/\(messageID)" : "\(server)/loki/v1/moderation/message/\(messageID)" - let url = URL(string: urlAsString)! - let request = TSRequest(url: url, method: "DELETE", parameters: [:]) - request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).done { result -> Void in - print("[Loki] Deleted message with ID: \(messageID) on server: \(server).") - }.retryingIfNeeded(maxRetryCount: maxRetryCount) + return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { + getAuthToken(for: server).then { token -> Promise in + let isModerationRequest = !isSentByUser + print("[Loki] Deleting message with ID: \(messageID) for public chat channel with ID: \(channel) on server: \(server) (isModerationRequest = \(isModerationRequest)).") + let urlAsString = isSentByUser ? "\(server)/channels/\(channel)/messages/\(messageID)" : "\(server)/loki/v1/moderation/message/\(messageID)" + let url = URL(string: urlAsString)! + let request = TSRequest(url: url, method: "DELETE", parameters: [:]) + request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] + return LokiFileServerProxy(for: server).perform(request).done { result -> Void in + print("[Loki] Deleted message with ID: \(messageID) on server: \(server).") + } + } } } @@ -252,24 +257,28 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { } public static func join(_ channel: UInt64, on server: String) -> Promise { - return getAuthToken(for: server).then { token -> Promise in - let url = URL(string: "\(server)/channels/\(channel)/subscribe")! - let request = TSRequest(url: url, method: "POST", parameters: [:]) - request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).done { result -> Void in - print("[Loki] Joined channel with ID: \(channel) on server: \(server).") - }.retryingIfNeeded(maxRetryCount: maxRetryCount) + return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { + getAuthToken(for: server).then { token -> Promise in + let url = URL(string: "\(server)/channels/\(channel)/subscribe")! + let request = TSRequest(url: url, method: "POST", parameters: [:]) + request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] + return LokiFileServerProxy(for: server).perform(request).done { result -> Void in + print("[Loki] Joined channel with ID: \(channel) on server: \(server).") + } + } } } public static func leave(_ channel: UInt64, on server: String) -> Promise { - return getAuthToken(for: server).then { token -> Promise in - let url = URL(string: "\(server)/channels/\(channel)/subscribe")! - let request = TSRequest(url: url, method: "DELETE", parameters: [:]) - request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).done { result -> Void in - print("[Loki] Left channel with ID: \(channel) on server: \(server).") - }.retryingIfNeeded(maxRetryCount: maxRetryCount) + return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { + getAuthToken(for: server).then { token -> Promise in + let url = URL(string: "\(server)/channels/\(channel)/subscribe")! + let request = TSRequest(url: url, method: "DELETE", parameters: [:]) + request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] + return LokiFileServerProxy(for: server).perform(request).done { result -> Void in + print("[Loki] Left channel with ID: \(channel) on server: \(server).") + } + } } } @@ -328,34 +337,38 @@ public final class LokiPublicChatAPI : LokiDotNetAPI { public static func setDisplayName(to newDisplayName: String?, on server: String) -> Promise { print("[Loki] Updating display name on server: \(server).") - return getAuthToken(for: server).then { token -> Promise in - let parameters: JSON = [ "name" : (newDisplayName ?? "") ] - let url = URL(string: "\(server)/users/me")! - let request = TSRequest(url: url, method: "PATCH", parameters: parameters) - request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in - print("Couldn't update display name due to error: \(error).") - throw error + return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { + getAuthToken(for: server).then { token -> Promise in + let parameters: JSON = [ "name" : (newDisplayName ?? "") ] + let url = URL(string: "\(server)/users/me")! + let request = TSRequest(url: url, method: "PATCH", parameters: parameters) + request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] + return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in + print("Couldn't update display name due to error: \(error).") + throw error + } } - }.retryingIfNeeded(maxRetryCount: maxRetryCount) + } } public static func setProfilePictureURL(to url: String?, using profileKey: Data, on server: String) -> Promise { print("[Loki] Updating profile picture on server: \(server).") - return getAuthToken(for: server).then { token -> Promise in - var annotation: JSON = [ "type" : profilePictureType ] - if let url = url { - annotation["value"] = [ "profileKey" : profileKey.base64EncodedString(), "url" : url ] - } - let parameters: JSON = [ "annotations" : [ annotation ] ] - let url = URL(string: "\(server)/users/me")! - let request = TSRequest(url: url, method: "PATCH", parameters: parameters) - request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] - return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in - print("[Loki] Couldn't update profile picture due to error: \(error).") - throw error + return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) { + getAuthToken(for: server).then { token -> Promise in + var annotation: JSON = [ "type" : profilePictureType ] + if let url = url { + annotation["value"] = [ "profileKey" : profileKey.base64EncodedString(), "url" : url ] + } + let parameters: JSON = [ "annotations" : [ annotation ] ] + let url = URL(string: "\(server)/users/me")! + let request = TSRequest(url: url, method: "PATCH", parameters: parameters) + request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] + return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in + print("[Loki] Couldn't update profile picture due to error: \(error).") + throw error + } } - }.retryingIfNeeded(maxRetryCount: maxRetryCount) + } } public static func getInfo(for channel: UInt64, on server: String) -> Promise { diff --git a/SignalServiceKit/src/Loki/Utilities/Promise+Retrying.swift b/SignalServiceKit/src/Loki/Utilities/Promise+Retrying.swift index b23381141..81895c044 100644 --- a/SignalServiceKit/src/Loki/Utilities/Promise+Retrying.swift +++ b/SignalServiceKit/src/Loki/Utilities/Promise+Retrying.swift @@ -1,16 +1,16 @@ import PromiseKit -internal extension Promise { - - internal func retryingIfNeeded(maxRetryCount: UInt) -> Promise { - var retryCount = 0 - func retryIfNeeded() -> Promise { - return recover(on: DispatchQueue.global()) { error -> Promise in - guard retryCount != maxRetryCount else { throw error } - retryCount += 1 - return retryIfNeeded() - } +/// Retry the promise constructed in `body` up to `maxRetryCount` times. +/// +/// - Note: Intentionally explicit about the recovery queue at the call site. +internal func attempt(maxRetryCount: UInt, recoveringOn queue: DispatchQueue, body: @escaping () -> Promise) -> Promise { + var retryCount = 0 + func attempt() -> Promise { + return body().recover(on: queue) { error -> Promise in + guard retryCount < maxRetryCount else { throw error } + retryCount += 1 + return attempt() } - return retryIfNeeded() } + return attempt() }