diff --git a/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift index 1d8d56fa8..b66516844 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI+SwarmAPI.swift @@ -3,8 +3,12 @@ import PromiseKit public extension LokiAPI { private static var snodeVersion: [LokiAPITarget:String] = [:] + fileprivate static let seedNodePool: Set = [ "https://storage.seed1.loki.network", "https://storage.seed3.loki.network", "https://public.loki.foundation" ] + /// Only ever modified from `LokiAPI.errorHandlingQueue` to avoid race conditions. internal static var snodeFailureCount: [LokiAPITarget:UInt] = [:] + internal static var snodePool: Set = [] + internal static var swarmCache: [String:[LokiAPITarget]] = [:] // TODO: Make this set based? // MARK: Settings private static let minimumSnodePoolCount = 32 @@ -13,39 +17,6 @@ public extension LokiAPI { internal static let snodeFailureThreshold = 2 - // MARK: Caching - internal static var swarmCache: [String:[LokiAPITarget]] = [:] // TODO: Make this set based? - - internal static func dropSnodeFromSwarmIfNeeded(_ target: LokiAPITarget, hexEncodedPublicKey: String) { - let swarm = LokiAPI.swarmCache[hexEncodedPublicKey] - if var swarm = swarm, let index = swarm.firstIndex(of: target) { - swarm.remove(at: index) - LokiAPI.swarmCache[hexEncodedPublicKey] = swarm - // Dispatch async on the main queue to avoid nested write transactions - DispatchQueue.main.async { - let storage = OWSPrimaryStorage.shared() - storage.dbReadWriteConnection.readWrite { transaction in - storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction) - } - } - } - } - - // MARK: Clearnet Setup - fileprivate static let seedNodePool: Set = [ "https://storage.seed1.loki.network", "https://storage.seed3.loki.network", "https://public.loki.foundation" ] - - internal static var snodePool: Set = [] - - @objc public static func clearSnodePool() { - snodePool.removeAll() - // Dispatch async on the main queue to avoid nested write transactions - DispatchQueue.main.async { - storage.dbReadWriteConnection.readWrite { transaction in - storage.clearSnodePool(in: transaction) - } - } - } - // MARK: Internal API internal static func getRandomSnode() -> Promise { if snodePool.count < minimumSnodePoolCount { @@ -135,6 +106,32 @@ public extension LokiAPI { return getSwarm(for: hexEncodedPublicKey).map { Array($0.shuffled().prefix(targetSwarmSnodeCount)) } } + internal static func dropSnodeFromSnodePool(_ target: LokiAPITarget) { + LokiAPI.snodePool.remove(target) + // Dispatch async on the main queue to avoid nested write transactions + DispatchQueue.main.async { + let storage = OWSPrimaryStorage.shared() + storage.dbReadWriteConnection.readWrite { transaction in + storage.dropSnodeFromSnodePool(target, in: transaction) + } + } + } + + internal static func dropSnodeFromSwarmIfNeeded(_ target: LokiAPITarget, hexEncodedPublicKey: String) { + let swarm = LokiAPI.swarmCache[hexEncodedPublicKey] + if var swarm = swarm, let index = swarm.firstIndex(of: target) { + swarm.remove(at: index) + LokiAPI.swarmCache[hexEncodedPublicKey] = swarm + // Dispatch async on the main queue to avoid nested write transactions + DispatchQueue.main.async { + let storage = OWSPrimaryStorage.shared() + storage.dbReadWriteConnection.readWrite { transaction in + storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction) + } + } + } + } + internal static func getFileServerProxy() -> Promise { let (promise, seal) = Promise.pending() func getVersion(for snode: LokiAPITarget) -> Promise { @@ -170,7 +167,18 @@ public extension LokiAPI { } return promise } - + + // MARK: Public API + @objc public static func clearSnodePool() { + snodePool.removeAll() + // Dispatch async on the main queue to avoid nested write transactions + DispatchQueue.main.async { + storage.dbReadWriteConnection.readWrite { transaction in + storage.clearSnodePool(in: transaction) + } + } + } + // MARK: Parsing private static func parseTargets(from rawResponse: Any) -> [LokiAPITarget] { guard let json = rawResponse as? JSON, let rawTargets = json["snodes"] as? [JSON] else { @@ -202,15 +210,8 @@ internal extension Promise { print("[Loki] Couldn't reach snode at: \(target); setting failure count to \(newFailureCount).") if newFailureCount >= LokiAPI.snodeFailureThreshold { print("[Loki] Failure threshold reached for: \(target); dropping it.") - LokiAPI.dropSnodeFromSwarmIfNeeded(target, hexEncodedPublicKey: hexEncodedPublicKey) // Remove it from the swarm cache associated with the given public key - LokiAPI.snodePool.remove(target) // Remove it from the snode pool - // Dispatch async on the main queue to avoid nested write transactions - DispatchQueue.main.async { - let storage = OWSPrimaryStorage.shared() - storage.dbReadWriteConnection.readWrite { transaction in - storage.dropSnode(target, in: transaction) - } - } + LokiAPI.dropSnodeFromSwarmIfNeeded(target, hexEncodedPublicKey: hexEncodedPublicKey) + LokiAPI.dropSnodeFromSnodePool(target) LokiAPI.snodeFailureCount[target] = 0 } case 406: diff --git a/SignalServiceKit/src/Loki/API/LokiAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI.swift index 98fd482b0..4bd85fc5b 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI.swift @@ -1,9 +1,7 @@ import PromiseKit -// TODO: A lot of the API relies on things happening serially and state being maintained correctly (i.e. without -// race conditions). To this end we should just have one high quality serial queue and do everything on there, except -// for things that explicitly *can* be done in parallel and don't modify state, which should then happen -// on a global queue. +// TODO: We guarantee that things happen in-order through promise chaining. For performance we should be able to use different queues for everything as long +// as we always modify state from the same queue. @objc(LKAPI) public final class LokiAPI : NSObject { diff --git a/SignalServiceKit/src/Loki/API/Onion Requests/OnionRequestAPI.swift b/SignalServiceKit/src/Loki/API/Onion Requests/OnionRequestAPI.swift index e54b59385..05e2a803c 100644 --- a/SignalServiceKit/src/Loki/API/Onion Requests/OnionRequestAPI.swift +++ b/SignalServiceKit/src/Loki/API/Onion Requests/OnionRequestAPI.swift @@ -100,7 +100,7 @@ public enum OnionRequestAPI { /// Builds and returns `pathCount` paths. The returned promise errors out with `Error.insufficientSnodes` /// if not enough (reliable) snodes are available. - public static func buildPaths() -> Promise<[Path]> { + private static func buildPaths() -> Promise<[Path]> { print("[Loki] [Onion Request API] Building onion request paths.") DispatchQueue.main.async { NotificationCenter.default.post(name: .buildingPaths, object: nil) @@ -146,6 +146,7 @@ public enum OnionRequestAPI { let storage = OWSPrimaryStorage.shared() storage.dbReadConnection.read { transaction in paths = storage.getOnionRequestPaths(in: transaction) + guardSnodes.formUnion([ paths[0][0], paths[1][0] ]) } } // randomElement() uses the system's default random generator, which is cryptographically secure @@ -160,7 +161,7 @@ public enum OnionRequestAPI { } } - private static func dropPaths() { + private static func dropAllPaths() { paths.removeAll() // Dispatch async on the main queue to avoid nested write transactions DispatchQueue.main.async { @@ -250,7 +251,7 @@ public enum OnionRequestAPI { } promise.catch(on: LokiAPI.workQueue) { error in // Must be invoked on LokiAPI.workQueue guard case HTTP.Error.httpRequestFailed(_, _) = error else { return } - dropPaths() // A snode in the path is bad; retry with a different path + dropAllPaths() // A snode in the path is bad; retry with a different path dropGuardSnode(guardSnode) } promise.handlingErrorsIfNeeded(forTargetSnode: snode, associatedWith: hexEncodedPublicKey) @@ -275,15 +276,8 @@ private extension Promise where T == JSON { print("[Loki] Couldn't reach snode at: \(snode); setting failure count to \(newFailureCount).") if newFailureCount >= LokiAPI.snodeFailureThreshold { print("[Loki] Failure threshold reached for: \(snode); dropping it.") - LokiAPI.dropSnodeFromSwarmIfNeeded(snode, hexEncodedPublicKey: hexEncodedPublicKey) // Remove it from the swarm cache associated with the given public key - LokiAPI.snodePool.remove(snode) // Remove it from the snode pool - // Dispatch async on the main queue to avoid nested write transactions - DispatchQueue.main.async { - let storage = OWSPrimaryStorage.shared() - storage.dbReadWriteConnection.readWrite { transaction in - storage.dropSnode(snode, in: transaction) - } - } + LokiAPI.dropSnodeFromSwarmIfNeeded(snode, hexEncodedPublicKey: hexEncodedPublicKey) + LokiAPI.dropSnodeFromSnodePool(snode) LokiAPI.snodeFailureCount[snode] = 0 } case 406: diff --git a/SignalServiceKit/src/Loki/Database/OWSPrimaryStorage+Loki.swift b/SignalServiceKit/src/Loki/Database/OWSPrimaryStorage+Loki.swift index d519e8904..c69a996a5 100644 --- a/SignalServiceKit/src/Loki/Database/OWSPrimaryStorage+Loki.swift +++ b/SignalServiceKit/src/Loki/Database/OWSPrimaryStorage+Loki.swift @@ -26,7 +26,7 @@ public extension OWSPrimaryStorage { return result } - public func dropSnode(_ snode: LokiAPITarget, in transaction: YapDatabaseReadWriteTransaction) { + public func dropSnodeFromSnodePool(_ snode: LokiAPITarget, in transaction: YapDatabaseReadWriteTransaction) { transaction.removeObject(forKey: snode.description, inCollection: OWSPrimaryStorage.snodePoolCollection) } @@ -63,7 +63,7 @@ public extension OWSPrimaryStorage { - // MARK: - Onion Request Path + // MARK: - Onion Request Paths private static let onionRequestPathCollection = "LokiOnionRequestPathCollection" public func setOnionRequestPaths(_ paths: [OnionRequestAPI.Path], in transaction: YapDatabaseReadWriteTransaction) {