Simplify LokiAPI changes

pull/26/head
Niels Andriesse 6 years ago
parent 9676af0d6b
commit df7ca74b70

@ -7,7 +7,7 @@
<key>CarthageVersion</key> <key>CarthageVersion</key>
<string>0.33.0</string> <string>0.33.0</string>
<key>OSXVersion</key> <key>OSXVersion</key>
<string>10.14.4</string> <string>10.14.5</string>
<key>WebRTCCommit</key> <key>WebRTCCommit</key>
<string>1445d719bf05280270e9f77576f80f973fd847f8 M73</string> <string>1445d719bf05280270e9f77576f80f973fd847f8 M73</string>
</dict> </dict>

@ -310,8 +310,8 @@ static NSTimeInterval launchStartedAt;
name:NSNotificationName_2FAStateDidChange name:NSNotificationName_2FAStateDidChange
object:nil]; object:nil];
// Loki Message received // Loki - Observe messages received notifications
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(receivedNewMessages:) name:NSNotification.receivedNewMessages object:nil]; [[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(handleNewMessagesReceived:) name:NSNotification.newMessagesReceived object:nil];
OWSLogInfo(@"application: didFinishLaunchingWithOptions completed."); OWSLogInfo(@"application: didFinishLaunchingWithOptions completed.");
@ -1413,17 +1413,17 @@ static NSTimeInterval launchStartedAt;
#pragma mark - Long polling #pragma mark - Long polling
- (void)receivedNewMessages:(NSNotification *)notification - (void)handleNewMessagesReceived:(NSNotification *)notification
{ {
NSArray *messages = (NSArray *)notification.userInfo[@"messages"]; NSArray *messages = (NSArray *)notification.userInfo[@"messages"];
OWSLogInfo(@"[Loki] Received %lu messages from long polling", messages.count); OWSLogInfo(@"[Loki] Received %lu messages through long polling.", messages.count);
for (SSKProtoEnvelope *envelope in messages) { for (SSKProtoEnvelope *envelope in messages) {
NSData *envelopeData = envelope.serializedDataIgnoringErrors; NSData *envelopeData = envelope.serializedDataIgnoringErrors;
if (envelopeData != nil) { if (envelopeData != nil) {
[SSKEnvironment.shared.messageReceiver handleReceivedEnvelopeData:envelopeData]; [SSKEnvironment.shared.messageReceiver handleReceivedEnvelopeData:envelopeData];
} else { } else {
OWSFailDebug(@"Failed to serialize envelope"); OWSFailDebug(@"Failed to deserialize envelope.");
} }
} }
} }

@ -3,7 +3,6 @@
// //
import UIKit import UIKit
import SignalServiceKit
@objc @objc
public class AvatarImageView: UIImageView { public class AvatarImageView: UIImageView {

@ -37,7 +37,7 @@ public extension LokiAPI {
// This is here so we can stop the infinite loop // This is here so we can stop the infinite loop
guard !shouldStopPolling else { return } guard !shouldStopPolling else { return }
fetchSwarmIfNeeded(for: hexEncodedPublicKey).then { _ -> Guarantee<[Result<Void>]> in getSwarm(for: hexEncodedPublicKey).then { _ -> Guarantee<[Result<Void>]> in
var promises = [Promise<Void>]() var promises = [Promise<Void>]()
let connections = 3 let connections = 3
for i in 0..<connections { for i in 0..<connections {
@ -61,7 +61,7 @@ public extension LokiAPI {
} }
private static func getUnusedSnodes() -> [LokiAPITarget] { private static func getUnusedSnodes() -> [LokiAPITarget] {
let snodes = getCachedSnodes(for: hexEncodedPublicKey) let snodes = LokiAPI.swarmCache[hexEncodedPublicKey] ?? []
return snodes.filter { !usedSnodes.contains($0) } return snodes.filter { !usedSnodes.contains($0) }
} }
@ -84,15 +84,15 @@ public extension LokiAPI {
func getMessagesInfinitely(from target: LokiAPITarget) -> Promise<Void> { func getMessagesInfinitely(from target: LokiAPITarget) -> Promise<Void> {
// The only way to exit the infinite loop is to throw an error 3 times or cancel // The only way to exit the infinite loop is to throw an error 3 times or cancel
return getRawMessages(from: target).then { rawMessages -> Promise<Void> in return getRawMessages(from: target, useLongPolling: true).then { rawResponse -> Promise<Void> in
// Check if we need to abort // Check if we need to abort
guard !isCancelled else { throw PMKError.cancelled } guard !isCancelled else { throw PMKError.cancelled }
// Process the messages // Process the messages
let messages = process(rawMessages: rawMessages, from: target) let messages = parseRawMessagesResponse(rawResponse, from: target)
// Send our messages as a notification // Send our messages as a notification
NotificationCenter.default.post(name: .receivedNewMessages, object: nil, userInfo: ["messages": messages]) NotificationCenter.default.post(name: .newMessagesReceived, object: nil, userInfo: ["messages": messages])
// Continue fetching if we haven't cancelled // Continue fetching if we haven't cancelled
return getMessagesInfinitely(from: target) return getMessagesInfinitely(from: target)
@ -107,7 +107,7 @@ public extension LokiAPI {
// Connect to the next snode if we haven't cancelled // Connect to the next snode if we haven't cancelled
// We also need to remove the cached snode so we don't contact it again // We also need to remove the cached snode so we don't contact it again
removeCachedSnode(nextSnode, for: hexEncodedPublicKey) dropIfNeeded(nextSnode, hexEncodedPublicKey: hexEncodedPublicKey)
return connectToNextSnode() return connectToNextSnode()
} }
} }

@ -1,20 +0,0 @@
internal extension LokiAPI {
internal struct Request {
var method: LokiAPITarget.Method
var target: LokiAPITarget
var publicKey: String
var parameters: [String:Any]
var headers: [String:String]
var timeout: TimeInterval?
init(method: LokiAPITarget.Method, target: LokiAPITarget, publicKey: String, parameters: [String:Any] = [:]) {
self.method = method
self.target = target
self.publicKey = publicKey
self.parameters = parameters
self.headers = [:]
self.timeout = nil
}
}
}

@ -11,7 +11,7 @@ public extension LokiAPI {
private static let swarmCacheKey = "swarmCacheKey" private static let swarmCacheKey = "swarmCacheKey"
private static let swarmCacheCollection = "swarmCacheCollection" private static let swarmCacheCollection = "swarmCacheCollection"
fileprivate static var swarmCache: [String:[LokiAPITarget]] { internal static var swarmCache: [String:[LokiAPITarget]] {
get { get {
var result: [String:[LokiAPITarget]]? = nil var result: [String:[LokiAPITarget]]? = nil
storage.dbReadConnection.read { transaction in storage.dbReadConnection.read { transaction in
@ -26,6 +26,14 @@ public extension LokiAPI {
} }
} }
internal static func dropIfNeeded(_ target: LokiAPITarget, hexEncodedPublicKey: String) {
let swarm = LokiAPI.swarmCache[hexEncodedPublicKey]
if var swarm = swarm, let index = swarm.firstIndex(of: target) {
swarm.remove(at: index)
LokiAPI.swarmCache[hexEncodedPublicKey] = swarm
}
}
// MARK: Internal API // MARK: Internal API
private static func getRandomSnode() -> Promise<LokiAPITarget> { private static func getRandomSnode() -> Promise<LokiAPITarget> {
return Promise<LokiAPITarget> { seal in return Promise<LokiAPITarget> { seal in
@ -33,16 +41,7 @@ public extension LokiAPI {
} }
} }
internal static func getCachedSnodes(for hexEncodedPublicKey: String) -> [LokiAPITarget] { internal static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[LokiAPITarget]> {
return swarmCache[hexEncodedPublicKey] ?? []
}
internal static func removeCachedSnode(_ target: LokiAPITarget, for hexEncodedPublicKey: String) {
guard let cache = swarmCache[hexEncodedPublicKey] else { return }
swarmCache[hexEncodedPublicKey] = cache.filter { $0 != target }
}
internal static func fetchSwarmIfNeeded(for hexEncodedPublicKey: String) -> Promise<[LokiAPITarget]> {
if let cachedSwarm = swarmCache[hexEncodedPublicKey], cachedSwarm.count >= minimumSnodeCount { if let cachedSwarm = swarmCache[hexEncodedPublicKey], cachedSwarm.count >= minimumSnodeCount {
return Promise<[LokiAPITarget]> { $0.fulfill(cachedSwarm) } return Promise<[LokiAPITarget]> { $0.fulfill(cachedSwarm) }
} else { } else {
@ -54,7 +53,7 @@ public extension LokiAPI {
// MARK: Public API // MARK: Public API
internal static func getTargetSnodes(for hexEncodedPublicKey: String) -> Promise<[LokiAPITarget]> { internal static func getTargetSnodes(for hexEncodedPublicKey: String) -> Promise<[LokiAPITarget]> {
// shuffled() uses the system's default random generator, which is cryptographically secure // shuffled() uses the system's default random generator, which is cryptographically secure
return fetchSwarmIfNeeded(for: hexEncodedPublicKey).map { Array($0.shuffled().prefix(targetSnodeCount)) } return getSwarm(for: hexEncodedPublicKey).map { Array($0.shuffled().prefix(targetSnodeCount)) }
} }
// MARK: Parsing // MARK: Parsing
@ -85,11 +84,7 @@ internal extension Promise {
case 421: case 421:
// The snode isn't associated with the given public key anymore // The snode isn't associated with the given public key anymore
Logger.warn("[Loki] Invalidating swarm for: \(hexEncodedPublicKey).") Logger.warn("[Loki] Invalidating swarm for: \(hexEncodedPublicKey).")
let swarm = LokiAPI.swarmCache[hexEncodedPublicKey] LokiAPI.dropIfNeeded(target, hexEncodedPublicKey: hexEncodedPublicKey)
if var swarm = swarm, let index = swarm.firstIndex(of: target) {
swarm.remove(at: index)
LokiAPI.swarmCache[hexEncodedPublicKey] = swarm
}
default: break default: break
} }
} }

@ -6,7 +6,8 @@ import PromiseKit
// MARK: Settings // MARK: Settings
private static let version = "v1" private static let version = "v1"
private static let maxRetryCount: UInt = 3 private static let maxRetryCount: UInt = 3
public static let defaultMessageTTL: UInt64 = 1 * 24 * 60 * 60 * 1000 private static let longPollingTimeout: TimeInterval = 40
public static let defaultMessageTTL: UInt64 = 24 * 60 * 60 * 1000
// MARK: Types // MARK: Types
public typealias RawResponse = Any public typealias RawResponse = Any
@ -31,56 +32,30 @@ import PromiseKit
override private init() { } override private init() { }
// MARK: Internal API // MARK: Internal API
internal static func invoke(_ method: LokiAPITarget.Method, on target: LokiAPITarget, associatedWith hexEncodedPublicKey: String, parameters: [String:Any]) -> RawResponsePromise { internal static func invoke(_ method: LokiAPITarget.Method, on target: LokiAPITarget, associatedWith hexEncodedPublicKey: String,
return invoke(request: Request(method: method, target: target, publicKey: hexEncodedPublicKey, parameters: parameters)) parameters: [String:Any], headers: [String:String]? = nil, timeout: TimeInterval? = nil) -> RawResponsePromise {
let url = URL(string: "\(target.address):\(target.port)/\(version)/storage_rpc")!
let request = TSRequest(url: url, method: "POST", parameters: [ "method" : method.rawValue, "params" : parameters ])
if let headers = headers { request.allHTTPHeaderFields = headers }
if let timeout = timeout { request.timeoutInterval = timeout }
return TSNetworkManager.shared().makePromise(request: request).map { $0.responseObject }
.handlingSwarmSpecificErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey).recoveringNetworkErrorsIfNeeded()
} }
internal static func invoke(request: Request) -> RawResponsePromise { internal static func getRawMessages(from target: LokiAPITarget, useLongPolling: Bool) -> RawResponsePromise {
let url = URL(string: "\(request.target.address):\(request.target.port)/\(version)/storage_rpc")!
let networkRequest = TSRequest(url: url, method: "POST", parameters: [ "method" : request.method.rawValue, "params" : request.parameters ])
networkRequest.allHTTPHeaderFields = request.headers
if let timeout = request.timeout {
networkRequest.timeoutInterval = timeout
}
return TSNetworkManager.shared().makePromise(request: networkRequest).map { $0.responseObject }
.handlingSwarmSpecificErrorsIfNeeded(for: request.target, associatedWith: request.publicKey).recoveringNetworkErrorsIfNeeded()
}
internal static func getMessages(from target: LokiAPITarget, longPolling: Bool = true) -> MessageListPromise {
return getRawMessages(from: target, longPolling: longPolling).map { process(rawMessages: $0, from: target) }
}
internal static func getRawMessages(from target: LokiAPITarget, longPolling: Bool = true) -> Promise<[JSON]> {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
let lastHashValue = getLastMessageHashValue(for: target) ?? "" let lastHashValue = getLastMessageHashValue(for: target) ?? ""
let parameters = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHashValue ] let parameters = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHashValue ]
let headers: [String:String]? = useLongPolling ? [ "X-Loki-Long-Poll" : "true" ] : nil
var request = Request(method: .getMessages, target: target, publicKey: hexEncodedPublicKey, parameters: parameters) let timeout: TimeInterval? = useLongPolling ? longPollingTimeout : nil
if (longPolling) { return invoke(.getMessages, on: target, associatedWith: hexEncodedPublicKey, parameters: parameters, headers: headers, timeout: timeout)
request.headers = ["X-Loki-Long-Poll" : "true"]
request.timeout = 40 // 40 second timeout
}
return invoke(request: request).map { rawResponse in
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
return rawMessages
}
}
internal static func process(rawMessages: [JSON], from target: LokiAPITarget) -> [SSKProtoEnvelope] {
updateLastMessageHashValueIfPossible(for: target, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
return parseProtoEnvelopes(from: newRawMessages)
} }
// MARK: Public API // MARK: Public API
public static func getMessages() -> Promise<Set<MessageListPromise>> { public static func getMessages() -> Promise<Set<MessageListPromise>> {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { targetSnode in return getTargetSnodes(for: hexEncodedPublicKey).mapValues { targetSnode in
return getMessages(from: targetSnode, longPolling: false) return getRawMessages(from: targetSnode, useLongPolling: false).map { parseRawMessagesResponse($0, from: targetSnode) }
}.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount) }.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount)
} }
@ -132,12 +107,19 @@ import PromiseKit
// The parsing utilities below use a best attempt approach to parsing; they warn for parsing failures but don't throw exceptions. // The parsing utilities below use a best attempt approach to parsing; they warn for parsing failures but don't throw exceptions.
internal static func parseRawMessagesResponse(_ rawResponse: Any, from target: LokiAPITarget) -> [SSKProtoEnvelope] {
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
updateLastMessageHashValueIfPossible(for: target, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
return parseProtoEnvelopes(from: newRawMessages)
}
private static func updateLastMessageHashValueIfPossible(for target: LokiAPITarget, from rawMessages: [JSON]) { private static func updateLastMessageHashValueIfPossible(for target: LokiAPITarget, from rawMessages: [JSON]) {
guard let lastMessage = rawMessages.last, let hashValue = lastMessage["hash"] as? String, let expiresAt = lastMessage["expiration"] as? Int else { if let lastMessage = rawMessages.last, let hashValue = lastMessage["hash"] as? String, let expirationDate = lastMessage["expiration"] as? Int {
if rawMessages.count > 0 { Logger.warn("[Loki] Failed to update last message hash value from: \(rawMessages).") } setLastMessageHashValue(for: target, hashValue: hashValue, expiresAt: UInt64(expirationDate))
return } else if (!rawMessages.isEmpty) {
Logger.warn("[Loki] Failed to update last message hash value from: \(rawMessages).")
} }
setLastMessageHashValue(for: target, hashValue: hashValue, expiresAt: UInt64(expiresAt))
} }
private static func removeDuplicates(from rawMessages: [JSON]) -> [JSON] { private static func removeDuplicates(from rawMessages: [JSON]) -> [JSON] {

@ -174,12 +174,12 @@
AssertIsOnMainThread() AssertIsOnMainThread()
guard let message = onlineBroadcastMessage(forThread: thread) else { guard let message = onlineBroadcastMessage(forThread: thread) else {
Logger.warn("[Loki] P2P Address not set") Logger.warn("[Loki] P2P address not set.")
return return
} }
messageSender.sendPromise(message: message).catch { error in messageSender.sendPromise(message: message).catch { error in
Logger.warn("Failed to send online status to \(thread.contactIdentifier())") Logger.warn("Failed to send online status to \(thread.contactIdentifier()).")
}.retainUntilComplete() }.retainUntilComplete()
} }
@ -198,7 +198,7 @@
private static func createLokiAddressMessage(for thread: TSThread, isPing: Bool) -> LokiAddressMessage? { private static func createLokiAddressMessage(for thread: TSThread, isPing: Bool) -> LokiAddressMessage? {
guard let ourAddress = ourP2PAddress else { guard let ourAddress = ourP2PAddress else {
Logger.error("P2P Address not set") Logger.warn("[Loki] P2P address not set.")
return nil return nil
} }

@ -1,9 +1,7 @@
public extension Notification.Name { public extension Notification.Name {
public static let contactOnlineStatusChanged = Notification.Name("contactOnlineStatusChanged") public static let contactOnlineStatusChanged = Notification.Name("contactOnlineStatusChanged")
public static let receivedNewMessages = Notification.Name("receivedNewMessages") public static let newMessagesReceived = Notification.Name("newMessagesReceived")
// Friend request
public static let threadFriendRequestStatusChanged = Notification.Name("threadFriendRequestStatusChanged") public static let threadFriendRequestStatusChanged = Notification.Name("threadFriendRequestStatusChanged")
public static let messageFriendRequestStatusChanged = Notification.Name("messageFriendRequestStatusChanged") public static let messageFriendRequestStatusChanged = Notification.Name("messageFriendRequestStatusChanged")
} }
@ -12,9 +10,7 @@ public extension Notification.Name {
@objc public extension NSNotification { @objc public extension NSNotification {
@objc public static let contactOnlineStatusChanged = Notification.Name.contactOnlineStatusChanged.rawValue as NSString @objc public static let contactOnlineStatusChanged = Notification.Name.contactOnlineStatusChanged.rawValue as NSString
@objc public static let receivedNewMessages = Notification.Name.receivedNewMessages.rawValue as NSString @objc public static let newMessagesReceived = Notification.Name.newMessagesReceived.rawValue as NSString
// Friend request
@objc public static let threadFriendRequestStatusChanged = Notification.Name.threadFriendRequestStatusChanged.rawValue as NSString @objc public static let threadFriendRequestStatusChanged = Notification.Name.threadFriendRequestStatusChanged.rawValue as NSString
@objc public static let messageFriendRequestStatusChanged = Notification.Name.messageFriendRequestStatusChanged.rawValue as NSString @objc public static let messageFriendRequestStatusChanged = Notification.Name.messageFriendRequestStatusChanged.rawValue as NSString
} }

Loading…
Cancel
Save