Fix retrying

pull/156/head
nielsandriesse 5 years ago
parent 89c0cd36ad
commit 6aa360da71

@ -55,21 +55,22 @@ public extension LokiAPI {
] ]
print("[Loki] Populating snode pool using: \(target).") print("[Loki] Populating snode pool using: \(target).")
let (promise, seal) = Promise<LokiAPITarget>.pending() let (promise, seal) = Promise<LokiAPITarget>.pending()
let queue = workQueue attempt(maxRetryCount: 4, recoveringOn: workQueue) {
HTTP.execute(.post, url, parameters: parameters).map(on: queue) { json in HTTP.execute(.post, url, parameters: parameters).map(on: workQueue) { json in
guard let intermediate = json["result"] as? JSON, let rawTargets = intermediate["service_node_states"] as? [JSON] else { throw LokiAPIError.randomSnodePoolUpdatingFailed } guard let intermediate = json["result"] as? JSON, let rawTargets = intermediate["service_node_states"] as? [JSON] else { throw LokiAPIError.randomSnodePoolUpdatingFailed }
randomSnodePool = try Set(rawTargets.flatMap { rawTarget in randomSnodePool = try Set(rawTargets.flatMap { rawTarget in
guard let address = rawTarget["public_ip"] as? String, let port = rawTarget["storage_port"] as? Int, let ed25519PublicKey = rawTarget["pubkey_ed25519"] as? String, let x25519PublicKey = rawTarget["pubkey_x25519"] as? String, address != "0.0.0.0" else { guard let address = rawTarget["public_ip"] as? String, let port = rawTarget["storage_port"] as? Int, let ed25519PublicKey = rawTarget["pubkey_ed25519"] as? String, let x25519PublicKey = rawTarget["pubkey_x25519"] as? String, address != "0.0.0.0" else {
print("[Loki] Failed to parse target from: \(rawTarget).") print("[Loki] Failed to parse target from: \(rawTarget).")
return nil return nil
} }
return LokiAPITarget(address: "https://\(address)", port: UInt16(port), publicKeySet: LokiAPITarget.KeySet(ed25519Key: ed25519PublicKey, x25519Key: x25519PublicKey)) return LokiAPITarget(address: "https://\(address)", port: UInt16(port), publicKeySet: LokiAPITarget.KeySet(ed25519Key: ed25519PublicKey, x25519Key: x25519PublicKey))
}) })
// randomElement() uses the system's default random generator, which is cryptographically secure // randomElement() uses the system's default random generator, which is cryptographically secure
return randomSnodePool.randomElement()! return randomSnodePool.randomElement()!
}.retryingIfNeeded(maxRetryCount: 4).done(on: queue) { snode in }
}.done(on: workQueue) { snode in
seal.fulfill(snode) seal.fulfill(snode)
}.catch(on: queue) { error in }.catch(on: workQueue) { error in
print("[Loki] Failed to contact seed node at: \(target).") print("[Loki] Failed to contact seed node at: \(target).")
seal.reject(error) seal.reject(error)
} }

@ -122,9 +122,11 @@ public final class LokiAPI : NSObject {
// MARK: Public API // MARK: Public API
public static func getMessages() -> Promise<Set<MessageListPromise>> { public static func getMessages() -> Promise<Set<MessageListPromise>> {
return getTargetSnodes(for: userHexEncodedPublicKey).mapValues { targetSnode in return attempt(maxRetryCount: maxRetryCount, recoveringOn: workQueue) {
return getRawMessages(from: targetSnode, usingLongPolling: false).map { parseRawMessagesResponse($0, from: targetSnode) } getTargetSnodes(for: userHexEncodedPublicKey).mapValues { targetSnode in
}.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount) getRawMessages(from: targetSnode, usingLongPolling: false).map { parseRawMessagesResponse($0, from: targetSnode) }
}.map { Set($0) }
}
} }
public static func getDestinations(for hexEncodedPublicKey: String) -> Promise<[Destination]> { public static func getDestinations(for hexEncodedPublicKey: String) -> Promise<[Destination]> {
@ -189,16 +191,18 @@ public final class LokiAPI : NSObject {
let destination = lokiMessage.destination let destination = lokiMessage.destination
func sendLokiMessage(_ lokiMessage: LokiMessage, to target: LokiAPITarget) -> RawResponsePromise { func sendLokiMessage(_ lokiMessage: LokiMessage, to target: LokiAPITarget) -> RawResponsePromise {
let parameters = lokiMessage.toJSON() let parameters = lokiMessage.toJSON()
return invoke(.sendMessage, on: target, associatedWith: destination, parameters: parameters) return attempt(maxRetryCount: maxRetryCount, recoveringOn: workQueue) {
invoke(.sendMessage, on: target, associatedWith: destination, parameters: parameters)
}
} }
func sendLokiMessageUsingSwarmAPI() -> Promise<Set<RawResponsePromise>> { func sendLokiMessageUsingSwarmAPI() -> Promise<Set<RawResponsePromise>> {
notificationCenter.post(name: .calculatingPoW, object: NSNumber(value: signalMessage.timestamp)) notificationCenter.post(name: .calculatingPoW, object: NSNumber(value: signalMessage.timestamp))
return lokiMessage.calculatePoW().then { lokiMessageWithPoW -> Promise<Set<RawResponsePromise>> in return lokiMessage.calculatePoW().then { lokiMessageWithPoW -> Promise<Set<RawResponsePromise>> in
notificationCenter.post(name: .routing, object: NSNumber(value: signalMessage.timestamp)) notificationCenter.post(name: .routing, object: NSNumber(value: signalMessage.timestamp))
return getTargetSnodes(for: destination).map { swarm in return getTargetSnodes(for: destination).map { snodes in
return Set(swarm.map { target in return Set(snodes.map { snode in
notificationCenter.post(name: .messageSending, object: NSNumber(value: signalMessage.timestamp)) notificationCenter.post(name: .messageSending, object: NSNumber(value: signalMessage.timestamp))
return sendLokiMessage(lokiMessageWithPoW, to: target).map { rawResponse in return sendLokiMessage(lokiMessageWithPoW, to: snode).map { rawResponse in
if let json = rawResponse as? JSON, let powDifficulty = json["difficulty"] as? Int { if let json = rawResponse as? JSON, let powDifficulty = json["difficulty"] as? Int {
guard powDifficulty != LokiAPI.powDifficulty else { return rawResponse } guard powDifficulty != LokiAPI.powDifficulty else { return rawResponse }
print("[Loki] Setting proof of work difficulty to \(powDifficulty).") print("[Loki] Setting proof of work difficulty to \(powDifficulty).")
@ -207,14 +211,15 @@ public final class LokiAPI : NSObject {
print("[Loki] Failed to update proof of work difficulty from: \(rawResponse).") print("[Loki] Failed to update proof of work difficulty from: \(rawResponse).")
} }
return rawResponse return rawResponse
}.retryingIfNeeded(maxRetryCount: maxRetryCount) }
}) })
}.retryingIfNeeded(maxRetryCount: maxRetryCount) }
} }
} }
if let peer = LokiP2PAPI.getInfo(for: destination), (lokiMessage.isPing || peer.isOnline) { if let peer = LokiP2PAPI.getInfo(for: destination), (lokiMessage.isPing || peer.isOnline) {
let target = LokiAPITarget(address: peer.address, port: peer.port, publicKeySet: nil) let target = LokiAPITarget(address: peer.address, port: peer.port, publicKeySet: nil)
return Promise.value([ target ]).mapValues { sendLokiMessage(lokiMessage, to: $0) }.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount).get { _ in // TODO: Retrying
return Promise.value([ target ]).mapValues { sendLokiMessage(lokiMessage, to: $0) }.map { Set($0) }.get { _ in
LokiP2PAPI.markOnline(destination) LokiP2PAPI.markOnline(destination)
onP2PSuccess() onP2PSuccess()
}.recover { error -> Promise<Set<RawResponsePromise>> in }.recover { error -> Promise<Set<RawResponsePromise>> in

@ -101,7 +101,9 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
let url = URL(string: "\(server)/users/me")! let url = URL(string: "\(server)/users/me")!
let request = TSRequest(url: url, method: "PATCH", parameters: parameters) let request = TSRequest(url: url, method: "PATCH", parameters: parameters)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).map { _ in }.retryingIfNeeded(maxRetryCount: 8).recover { error in return attempt(maxRetryCount: 8, recoveringOn: LokiAPI.workQueue) {
LokiFileServerProxy(for: server).perform(request).map { _ in }
}.recover { error in
print("Couldn't update device links due to error: \(error).") print("Couldn't update device links due to error: \(error).")
throw error throw error
} }

@ -155,11 +155,13 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
} }
public static func sendMessage(_ message: LokiPublicChatMessage, to channel: UInt64, on server: String) -> Promise<LokiPublicChatMessage> { public static func sendMessage(_ message: LokiPublicChatMessage, to channel: UInt64, on server: String) -> Promise<LokiPublicChatMessage> {
return Promise<LokiPublicChatMessage> { [privateKey = userKeyPair.privateKey] seal in print("[Loki] Sending message to public chat channel with ID: \(channel) on server: \(server).")
DispatchQueue.global().async { let (promise, seal) = Promise<LokiPublicChatMessage>.pending()
guard let signedMessage = message.sign(with: privateKey) else { return seal.reject(LokiDotNetAPIError.signingFailed) } let queue = DispatchQueue.global()
queue.async { [privateKey = userKeyPair.privateKey] in
guard let signedMessage = message.sign(with: privateKey) else { return seal.reject(LokiDotNetAPIError.signingFailed) }
attempt(maxRetryCount: maxRetryCount, recoveringOn: queue) {
getAuthToken(for: server).then { token -> Promise<LokiPublicChatMessage> in getAuthToken(for: server).then { token -> Promise<LokiPublicChatMessage> in
print("[Loki] Sending message to public chat channel with ID: \(channel) on server: \(server).")
let url = URL(string: "\(server)/channels/\(channel)/messages")! let url = URL(string: "\(server)/channels/\(channel)/messages")!
let parameters = signedMessage.toJSON() let parameters = signedMessage.toJSON()
let request = TSRequest(url: url, method: "POST", parameters: parameters) let request = TSRequest(url: url, method: "POST", parameters: parameters)
@ -183,13 +185,14 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
storage.dbReadWriteConnection.removeObject(forKey: server, inCollection: authTokenCollection) storage.dbReadWriteConnection.removeObject(forKey: server, inCollection: authTokenCollection)
} }
throw error throw error
}.retryingIfNeeded(maxRetryCount: maxRetryCount).done { message in
seal.fulfill(message)
}.catch { error in
seal.reject(error)
} }
}.done { message in
seal.fulfill(message)
}.catch { error in
seal.reject(error)
} }
} }
return promise
} }
public static func getDeletedMessageServerIDs(for channel: UInt64, on server: String) -> Promise<[UInt64]> { public static func getDeletedMessageServerIDs(for channel: UInt64, on server: String) -> Promise<[UInt64]> {
@ -220,16 +223,18 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
} }
public static func deleteMessage(with messageID: UInt, for channel: UInt64, on server: String, isSentByUser: Bool) -> Promise<Void> { public static func deleteMessage(with messageID: UInt, for channel: UInt64, on server: String, isSentByUser: Bool) -> Promise<Void> {
return getAuthToken(for: server).then { token -> Promise<Void> in return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
let isModerationRequest = !isSentByUser getAuthToken(for: server).then { token -> Promise<Void> in
print("[Loki] Deleting message with ID: \(messageID) for public chat channel with ID: \(channel) on server: \(server) (isModerationRequest = \(isModerationRequest)).") let isModerationRequest = !isSentByUser
let urlAsString = isSentByUser ? "\(server)/channels/\(channel)/messages/\(messageID)" : "\(server)/loki/v1/moderation/message/\(messageID)" print("[Loki] Deleting message with ID: \(messageID) for public chat channel with ID: \(channel) on server: \(server) (isModerationRequest = \(isModerationRequest)).")
let url = URL(string: urlAsString)! let urlAsString = isSentByUser ? "\(server)/channels/\(channel)/messages/\(messageID)" : "\(server)/loki/v1/moderation/message/\(messageID)"
let request = TSRequest(url: url, method: "DELETE", parameters: [:]) let url = URL(string: urlAsString)!
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] let request = TSRequest(url: url, method: "DELETE", parameters: [:])
return LokiFileServerProxy(for: server).perform(request).done { result -> Void in request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
print("[Loki] Deleted message with ID: \(messageID) on server: \(server).") return LokiFileServerProxy(for: server).perform(request).done { result -> Void in
}.retryingIfNeeded(maxRetryCount: maxRetryCount) print("[Loki] Deleted message with ID: \(messageID) on server: \(server).")
}
}
} }
} }
@ -252,24 +257,28 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
} }
public static func join(_ channel: UInt64, on server: String) -> Promise<Void> { public static func join(_ channel: UInt64, on server: String) -> Promise<Void> {
return getAuthToken(for: server).then { token -> Promise<Void> in return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
let url = URL(string: "\(server)/channels/\(channel)/subscribe")! getAuthToken(for: server).then { token -> Promise<Void> in
let request = TSRequest(url: url, method: "POST", parameters: [:]) let url = URL(string: "\(server)/channels/\(channel)/subscribe")!
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] let request = TSRequest(url: url, method: "POST", parameters: [:])
return LokiFileServerProxy(for: server).perform(request).done { result -> Void in request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
print("[Loki] Joined channel with ID: \(channel) on server: \(server).") return LokiFileServerProxy(for: server).perform(request).done { result -> Void in
}.retryingIfNeeded(maxRetryCount: maxRetryCount) print("[Loki] Joined channel with ID: \(channel) on server: \(server).")
}
}
} }
} }
public static func leave(_ channel: UInt64, on server: String) -> Promise<Void> { public static func leave(_ channel: UInt64, on server: String) -> Promise<Void> {
return getAuthToken(for: server).then { token -> Promise<Void> in return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
let url = URL(string: "\(server)/channels/\(channel)/subscribe")! getAuthToken(for: server).then { token -> Promise<Void> in
let request = TSRequest(url: url, method: "DELETE", parameters: [:]) let url = URL(string: "\(server)/channels/\(channel)/subscribe")!
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] let request = TSRequest(url: url, method: "DELETE", parameters: [:])
return LokiFileServerProxy(for: server).perform(request).done { result -> Void in request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
print("[Loki] Left channel with ID: \(channel) on server: \(server).") return LokiFileServerProxy(for: server).perform(request).done { result -> Void in
}.retryingIfNeeded(maxRetryCount: maxRetryCount) print("[Loki] Left channel with ID: \(channel) on server: \(server).")
}
}
} }
} }
@ -328,34 +337,38 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
public static func setDisplayName(to newDisplayName: String?, on server: String) -> Promise<Void> { public static func setDisplayName(to newDisplayName: String?, on server: String) -> Promise<Void> {
print("[Loki] Updating display name on server: \(server).") print("[Loki] Updating display name on server: \(server).")
return getAuthToken(for: server).then { token -> Promise<Void> in return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
let parameters: JSON = [ "name" : (newDisplayName ?? "") ] getAuthToken(for: server).then { token -> Promise<Void> in
let url = URL(string: "\(server)/users/me")! let parameters: JSON = [ "name" : (newDisplayName ?? "") ]
let request = TSRequest(url: url, method: "PATCH", parameters: parameters) let url = URL(string: "\(server)/users/me")!
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] let request = TSRequest(url: url, method: "PATCH", parameters: parameters)
return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
print("Couldn't update display name due to error: \(error).") return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in
throw error print("Couldn't update display name due to error: \(error).")
throw error
}
} }
}.retryingIfNeeded(maxRetryCount: maxRetryCount) }
} }
public static func setProfilePictureURL(to url: String?, using profileKey: Data, on server: String) -> Promise<Void> { public static func setProfilePictureURL(to url: String?, using profileKey: Data, on server: String) -> Promise<Void> {
print("[Loki] Updating profile picture on server: \(server).") print("[Loki] Updating profile picture on server: \(server).")
return getAuthToken(for: server).then { token -> Promise<Void> in return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
var annotation: JSON = [ "type" : profilePictureType ] getAuthToken(for: server).then { token -> Promise<Void> in
if let url = url { var annotation: JSON = [ "type" : profilePictureType ]
annotation["value"] = [ "profileKey" : profileKey.base64EncodedString(), "url" : url ] if let url = url {
} annotation["value"] = [ "profileKey" : profileKey.base64EncodedString(), "url" : url ]
let parameters: JSON = [ "annotations" : [ annotation ] ] }
let url = URL(string: "\(server)/users/me")! let parameters: JSON = [ "annotations" : [ annotation ] ]
let request = TSRequest(url: url, method: "PATCH", parameters: parameters) let url = URL(string: "\(server)/users/me")!
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ] let request = TSRequest(url: url, method: "PATCH", parameters: parameters)
return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
print("[Loki] Couldn't update profile picture due to error: \(error).") return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in
throw error print("[Loki] Couldn't update profile picture due to error: \(error).")
throw error
}
} }
}.retryingIfNeeded(maxRetryCount: maxRetryCount) }
} }
public static func getInfo(for channel: UInt64, on server: String) -> Promise<LokiPublicChatInfo> { public static func getInfo(for channel: UInt64, on server: String) -> Promise<LokiPublicChatInfo> {

@ -1,16 +1,16 @@
import PromiseKit import PromiseKit
internal extension Promise { /// Retry the promise constructed in `body` up to `maxRetryCount` times.
///
internal func retryingIfNeeded(maxRetryCount: UInt) -> Promise<T> { /// - Note: Intentionally explicit about the recovery queue at the call site.
var retryCount = 0 internal func attempt<T>(maxRetryCount: UInt, recoveringOn queue: DispatchQueue, body: @escaping () -> Promise<T>) -> Promise<T> {
func retryIfNeeded() -> Promise<T> { var retryCount = 0
return recover(on: DispatchQueue.global()) { error -> Promise<T> in func attempt() -> Promise<T> {
guard retryCount != maxRetryCount else { throw error } return body().recover(on: queue) { error -> Promise<T> in
retryCount += 1 guard retryCount < maxRetryCount else { throw error }
return retryIfNeeded() retryCount += 1
} return attempt()
} }
return retryIfNeeded()
} }
return attempt()
} }

Loading…
Cancel
Save