diff --git a/Signal/src/Jobs/MessageFetcherJob.swift b/Signal/src/Jobs/MessageFetcherJob.swift index d1e8da742..3fac71a50 100644 --- a/Signal/src/Jobs/MessageFetcherJob.swift +++ b/Signal/src/Jobs/MessageFetcherJob.swift @@ -47,8 +47,30 @@ public class MessageFetcherJob: NSObject { // } // ======== - Logger.info("fetching messages via REST.") - + Logger.info("Fetching messages via REST.") + let promise = fetchUndeliveredMessages().then { promises -> Promise in + let promises = promises.map { promise -> Promise in + return promise.then { envelopes -> Promise in + for envelope in envelopes { + Logger.info("Envelope received.") + do { + let envelopeData = try envelope.serializedData() + self.messageReceiver.handleReceivedEnvelopeData(envelopeData) + } catch { + owsFailDebug("Failed to serialize envelope.") + } + self.acknowledgeDelivery(envelope: envelope) + } + return Promise.value(()) + } + } + return when(resolved: promises).asVoid() + } + promise.retainUntilComplete() + return promise + + /* Loki: Original code + * ======== let promise = self.fetchUndeliveredMessages().then { (envelopes: [SSKProtoEnvelope], more: Bool) -> Promise in for envelope in envelopes { Logger.info("received envelope.") @@ -73,6 +95,8 @@ public class MessageFetcherJob: NSObject { promise.retainUntilComplete() return promise + * ======== + */ } @objc @@ -174,37 +198,30 @@ public class MessageFetcherJob: NSObject { } } - private func fetchUndeliveredMessages() -> Promise<(envelopes: [SSKProtoEnvelope], more: Bool)> { - notImplemented() -// return Promise { resolver in -// LokiAPI.getMessages().done { envelopes in -// resolver.fulfill((envelopes: envelopes, more: false)) -// }.catch { error in -// resolver.reject(error) -// } - // Loki: Original code - // ======== -// let request = OWSRequestFactory.getMessagesRequest() -// self.networkManager.makeRequest( -// request, -// success: { (_: URLSessionDataTask?, responseObject: Any?) -> Void in -// guard let (envelopes, more) = self.parseMessagesResponse(responseObject: responseObject) else { -// Logger.error("response object had unexpected content") -// return resolver.reject(OWSErrorMakeUnableToProcessServerResponseError()) -// } + private func fetchUndeliveredMessages() -> Promise>> { + return LokiAPI.getMessages() + // Loki: Original code + // ======== +// let request = OWSRequestFactory.getMessagesRequest() +// self.networkManager.makeRequest( +// request, +// success: { (_: URLSessionDataTask?, responseObject: Any?) -> Void in +// guard let (envelopes, more) = self.parseMessagesResponse(responseObject: responseObject) else { +// Logger.error("response object had unexpected content") +// return resolver.reject(OWSErrorMakeUnableToProcessServerResponseError()) +// } // -// resolver.fulfill((envelopes: envelopes, more: more)) -// }, -// failure: { (_: URLSessionDataTask?, error: Error?) in -// guard let error = error else { -// Logger.error("error was surpringly nil. sheesh rough day.") -// return resolver.reject(OWSErrorMakeUnableToProcessServerResponseError()) -// } +// resolver.fulfill((envelopes: envelopes, more: more)) +// }, +// failure: { (_: URLSessionDataTask?, error: Error?) in +// guard let error = error else { +// Logger.error("error was surpringly nil. sheesh rough day.") +// return resolver.reject(OWSErrorMakeUnableToProcessServerResponseError()) +// } // -// resolver.reject(error) -// }) - // ======== -// } +// resolver.reject(error) +// }) + // ======== } private func acknowledgeDelivery(envelope: SSKProtoEnvelope) { diff --git a/SignalServiceKit/src/Loki/API/LokiAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI.swift index 94ee2129e..826c41bdd 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI.swift @@ -47,7 +47,9 @@ import PromiseKit } private static func getRandomSnode() -> Promise { - return Promise { _ in notImplemented() } // TODO: Implement + return Promise { seal in + seal.fulfill(Target(address: "http://13.238.53.205", port: 8080)) // TODO: For debugging purposes + } } private static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[Target]> { @@ -91,7 +93,11 @@ import PromiseKit // MARK: Public API (Obj-C) @objc public static func objc_sendSignalMessage(_ signalMessage: SignalMessage, to destination: String, timestamp: UInt64, requiringPoW isPoWRequired: Bool) -> AnyPromise { - let promise = Message.from(signalMessage: signalMessage, timestamp: timestamp, requiringPoW: isPoWRequired).then(sendMessage) + let promise = Message.from(signalMessage: signalMessage, timestamp: timestamp, requiringPoW: isPoWRequired).then(sendMessage).mapValues { promise -> AnyPromise in + let anyPromise = AnyPromise(promise) + anyPromise.retainUntilComplete() + return anyPromise + }.map { Set($0) } let anyPromise = AnyPromise(promise) anyPromise.retainUntilComplete() return anyPromise @@ -102,11 +108,16 @@ import PromiseKit // The parsing utilities below use a best attempt approach to parsing; they warn for parsing failures but don't throw exceptions. private static func parseTargets(from rawResponse: Any) -> [Target] { - guard let json = rawResponse as? JSON, let addresses = json["snodes"] as? [String] else { - Logger.warn("[Loki] Failed to parse targets from: \(rawResponse).") - return [] - } - return addresses.map { Target(address: $0, port: defaultSnodePort) } + // TODO: For debugging purposes + // ======== + let target = Target(address: "http://13.238.53.205", port: 8080) + return Array(repeating: target, count: 3) + // ======== +// guard let json = rawResponse as? JSON, let addresses = json["snodes"] as? [String] else { +// Logger.warn("[Loki] Failed to parse targets from: \(rawResponse).") +// return [] +// } +// return addresses.map { Target(address: $0, port: defaultSnodePort) } } private static func updateLastMessageHashValueIfPossible(for target: Target, from rawMessages: [JSON]) { @@ -118,7 +129,7 @@ import PromiseKit } private static func removeDuplicates(from rawMessages: [JSON]) -> [JSON] { - var receivedMessageHashValues = getReceivedMessageHashValues() + var receivedMessageHashValues = getReceivedMessageHashValues() ?? [] return rawMessages.filter { rawMessage in guard let hashValue = rawMessage["hash"] as? String else { Logger.warn("[Loki] Missing hash value for message: \(rawMessage).") @@ -161,8 +172,8 @@ import PromiseKit } } - private static func getReceivedMessageHashValues() -> Set { - var result: Set = [] + private static func getReceivedMessageHashValues() -> Set? { + var result: Set? = nil storage.dbReadConnection.read { transaction in result = storage.getReceivedMessageHashes(with: transaction) } diff --git a/SignalServiceKit/src/Loki/Crypto/OWSPrimaryStorage+Loki.h b/SignalServiceKit/src/Loki/Crypto/OWSPrimaryStorage+Loki.h index b99592924..beb789e9e 100644 --- a/SignalServiceKit/src/Loki/Crypto/OWSPrimaryStorage+Loki.h +++ b/SignalServiceKit/src/Loki/Crypto/OWSPrimaryStorage+Loki.h @@ -95,7 +95,7 @@ NS_ASSUME_NONNULL_BEGIN */ - (void)setLastMessageHashForServiceNode:(NSString *)serviceNode hash:(NSString *)hash expiresAt:(u_int64_t)expiresAt transaction:(YapDatabaseReadWriteTransaction *)transaction NS_SWIFT_NAME(setLastMessageHash(forServiceNode:hash:expiresAt:transaction:)); -- (NSSet *)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction; +- (NSSet *_Nullable)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction; - (void)setReceivedMessageHashes:(NSSet *)receivedMessageHashes withTransaction:(YapDatabaseReadWriteTransaction *)transaction; @end diff --git a/SignalServiceKit/src/Loki/Crypto/OWSPrimaryStorage+Loki.m b/SignalServiceKit/src/Loki/Crypto/OWSPrimaryStorage+Loki.m index 7ccefb5ab..26744fbec 100644 --- a/SignalServiceKit/src/Loki/Crypto/OWSPrimaryStorage+Loki.m +++ b/SignalServiceKit/src/Loki/Crypto/OWSPrimaryStorage+Loki.m @@ -153,7 +153,7 @@ [transaction removeObjectForKey:serviceNode inCollection:LKLastMessageHashCollection]; } -- (NSSet *)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction { +- (NSSet *_Nullable)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction { return (NSSet *)[[transaction objectForKey:LKReceivedMessageHashesKey inCollection:LKReceivedMessageHashesCollection] as:NSSet.class]; } diff --git a/SignalServiceKit/src/Messages/OWSMessageSender.m b/SignalServiceKit/src/Messages/OWSMessageSender.m index 419102baa..736a89798 100644 --- a/SignalServiceKit/src/Messages/OWSMessageSender.m +++ b/SignalServiceKit/src/Messages/OWSMessageSender.m @@ -1103,11 +1103,11 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; return messageSend.failure(error); } - // Update the state to show that proof of work is being calculated + // Update the state to show that the proof of work is being calculated [self setIsCalculatingProofOfWorkForMessage:messageSend]; // Convert the message to a Loki message and send it using the Loki messaging API NSDictionary *signalMessage = deviceMessages.firstObject; - // Update the thread's friend request status if needed + // Update the message and thread if needed NSInteger *messageType = ((NSNumber *)signalMessage[@"type"]).integerValue; if (messageType == TSFriendRequestMessageType) { [message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusRequestSending withTransaction:nil]; @@ -1116,51 +1116,53 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; BOOL isPoWRequired = YES; // TODO: Base on message type [[LokiAPI objc_sendSignalMessage:signalMessage to:recipient.recipientId timestamp:message.timestamp requiringPoW:isPoWRequired] .thenOn(OWSDispatch.sendingQueue, ^(id result) { - // Loki - // ======== - if (messageType == TSFriendRequestMessageType) { - [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - [message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusRequestSent withTransaction:transaction]; - [message.thread removeOutgoingFriendRequestMessagesWithTransaction:transaction]; - // Set the expiration date - NSTimeInterval expirationInterval = 72 * kHourInterval; - NSDate *expireDate = [[NSDate new] dateByAddingTimeInterval:expirationInterval]; - [message saveFriendRequestExpiresAt:[NSDate ows_millisecondsSince1970ForDate:expireDate] withTransaction:transaction]; - }]; - } - // ======== - // Invoke the completion handler - [self messageSendDidSucceed:messageSend - deviceMessages:deviceMessages - wasSentByUD:false - wasSentByWebsocket:false]; - }) - .catchOn(OWSDispatch.sendingQueue, ^(NSError *error) { - // Loki - // ======== - if (messageType == TSFriendRequestMessageType) { - [message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusNone withTransaction:nil]; - } - // ======== - // Handle the error - NSUInteger statusCode = 0; - NSData *_Nullable responseData = nil; - if ([error.domain isEqualToString:TSNetworkManagerErrorDomain]) { - statusCode = error.code; - NSError *_Nullable underlyingError = error.userInfo[NSUnderlyingErrorKey]; - if (underlyingError) { - responseData = underlyingError.userInfo[AFNetworkingOperationFailingURLResponseDataErrorKey]; - } else { - OWSFailDebug(@"Missing underlying error: %@", error); - } - } else { - OWSFailDebug(@"Unexpected error: %@", error); + NSSet *promises = (NSSet *)result; + __block BOOL isSuccess = NO; + NSUInteger promiseCount = promises.count; + __block NSUInteger errorCount = 0; + for (AnyPromise *promise in promises) { + [promise + .thenOn(OWSDispatch.sendingQueue, ^(id result) { + if (isSuccess) { return; } // Succeed as soon as the first promise succeeds + isSuccess = YES; + // Update the message and thread if needed + if (messageType == TSFriendRequestMessageType) { + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + [message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusRequestSent withTransaction:transaction]; + [message.thread removeOutgoingFriendRequestMessagesWithTransaction:transaction]; + NSTimeInterval expirationInterval = 72 * kHourInterval; + NSDate *expirationDate = [[NSDate new] dateByAddingTimeInterval:expirationInterval]; + [message saveFriendRequestExpiresAt:[NSDate ows_millisecondsSince1970ForDate:expirationDate] withTransaction:transaction]; + }]; + } + // Invoke the completion handler + [self messageSendDidSucceed:messageSend deviceMessages:deviceMessages wasSentByUD:false wasSentByWebsocket:false]; + }) + .catchOn(OWSDispatch.sendingQueue, ^(NSError *error) { + errorCount += 1; + if (errorCount != promiseCount) { return; } // Only error out if all promises failed + // Update the thread if needed + if (messageType == TSFriendRequestMessageType) { + [message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusNone withTransaction:nil]; + } + // Handle the error + NSUInteger statusCode = 0; + NSData *_Nullable responseData = nil; + if ([error.domain isEqualToString:TSNetworkManagerErrorDomain]) { + statusCode = error.code; + NSError *_Nullable underlyingError = error.userInfo[NSUnderlyingErrorKey]; + if (underlyingError) { + responseData = underlyingError.userInfo[AFNetworkingOperationFailingURLResponseDataErrorKey]; + } else { + OWSFailDebug(@"Missing underlying error: %@.", error); + } + } else { + OWSFailDebug(@"Unexpected error: %@.", error); + } + [self messageSendDidFail:messageSend deviceMessages:deviceMessages statusCode:statusCode error:error responseData:responseData]; + }) retainUntilComplete]; } - [self messageSendDidFail:messageSend - deviceMessages:deviceMessages - statusCode:statusCode - error:error - responseData:responseData]; + }) retainUntilComplete]; // Loki: Original code