diff --git a/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift index 4b4018172..0e1df6849 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift @@ -7,7 +7,7 @@ extension LokiAPI { private static let defaultSnodePort: UInt16 = 8080 // MARK: Caching - private static var swarmCache: [String:[Target]] = [:] + fileprivate static var swarmCache: [String:[Target]] = [:] // TODO: Persist on disk // MARK: Internal API private static func getRandomSnode() -> Promise { @@ -21,7 +21,7 @@ extension LokiAPI { return Promise<[Target]> { $0.fulfill(cachedSwarm) } } else { let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ] - return getRandomSnode().then { invoke(.getSwarm, on: $0, with: parameters) }.map { parseTargets(from: $0) }.get { swarmCache[hexEncodedPublicKey] = $0 } + return getRandomSnode().then { invoke(.getSwarm, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters) }.map { parseTargets(from: $0) }.get { swarmCache[hexEncodedPublicKey] = $0 } } } @@ -44,3 +44,27 @@ extension LokiAPI { // return addresses.map { Target(address: $0, port: defaultSnodePort) } } } + +internal extension Promise { + + func handlingSwarmSpecificErrorsIfNeeded(for target: LokiAPI.Target, associatedWith hexEncodedPublicKey: String) -> Promise { + return recover { error -> Promise in + if let error = error as? NetworkManagerError { + switch error.statusCode { + case 0: + // The snode is unreachable; usually a problem with LokiNet + Logger.warn("[Loki] There appears to be a problem with LokiNet.") + case 421: + // The snode isn't associated with the given public key anymore + let swarm = LokiAPI.swarmCache[hexEncodedPublicKey] + if var swarm = swarm, let index = swarm.firstIndex(of: target) { + swarm.remove(at: index) + LokiAPI.swarmCache[hexEncodedPublicKey] = swarm + } + default: break + } + } + throw error + } + } +} diff --git a/SignalServiceKit/src/Loki/API/LokiAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI.swift index 6d0769096..04dd160b9 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI.swift @@ -13,9 +13,9 @@ import PromiseKit let port: UInt16 enum Method : String { - /// Only applicable to snode targets. + /// Only supported by snode targets. case getSwarm = "get_snodes_for_pubkey" - /// Only applicable to snode targets. + /// Only supported by snode targets. case getMessages = "retrieve" case sendMessage = "store" } @@ -38,10 +38,10 @@ import PromiseKit override private init() { } // MARK: Internal API - internal static func invoke(_ method: Target.Method, on target: Target, with parameters: [String:Any] = [:]) -> Promise { + internal static func invoke(_ method: Target.Method, on target: Target, associatedWith hexEncodedPublicKey: String, parameters: [String:Any] = [:]) -> Promise { let url = URL(string: "\(target.address):\(target.port)/\(version)/storage_rpc")! let request = TSRequest(url: url, method: "POST", parameters: [ "method" : method.rawValue, "params" : parameters ]) - return TSNetworkManager.shared().makePromise(request: request).map { $0.responseObject } + return TSNetworkManager.shared().makePromise(request: request).map { $0.responseObject }.handlingSwarmSpecificErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey) } // MARK: Public API @@ -50,7 +50,7 @@ import PromiseKit return getTargetSnodes(for: hexEncodedPublicKey).mapValues { targetSnode in let lastHash = getLastMessageHashValue(for: targetSnode) ?? "" let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHash ] - return invoke(.getMessages, on: targetSnode, with: parameters).map { rawResponse in + return invoke(.getMessages, on: targetSnode, associatedWith: hexEncodedPublicKey, parameters: parameters).map { rawResponse in guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] } updateLastMessageHashValueIfPossible(for: targetSnode, from: rawMessages) let newRawMessages = removeDuplicates(from: rawMessages) @@ -70,7 +70,7 @@ import PromiseKit // TODO: Send using P2P protocol } else { let parameters = lokiMessage.toJSON() - return getTargetSnodes(for: lokiMessage.destination).mapValues { invoke(.sendMessage, on: $0, with: parameters).recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }.map { Set($0) } + return getTargetSnodes(for: lokiMessage.destination).mapValues { invoke(.sendMessage, on: $0, associatedWith: lokiMessage.destination, parameters: parameters) }.map { Set($0) } } } @@ -80,7 +80,7 @@ import PromiseKit // TODO: Send using P2P protocol } else { let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ] // TODO: Figure out correct parameters - return getTargetSnodes(for: hexEncodedPublicKey).mapValues { invoke(.sendMessage, on: $0, with: parameters).recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }.map { Set($0) } + return getTargetSnodes(for: hexEncodedPublicKey).mapValues { invoke(.sendMessage, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters) }.map { Set($0) } } } @@ -169,16 +169,3 @@ private extension AnyPromise { return result } } - -// MARK: Error Handling -private extension Promise { - - func recoverNetworkErrorIfNeeded(on queue: DispatchQueue) -> Promise { - return recover(on: queue) { error -> Promise in - switch error { - case NetworkManagerError.taskError(_, let underlyingError): throw underlyingError - default: throw error - } - } - } -}