Merge branch 'dev' into p2p

pull/18/head
Mikunj 6 years ago
commit 5a1272c23b

@ -47,8 +47,30 @@ public class MessageFetcherJob: NSObject {
// } // }
// ======== // ========
Logger.info("fetching messages via REST.") Logger.info("Fetching messages via REST.")
let promise = fetchUndeliveredMessages().then { promises -> Promise<Void> in
let promises = promises.map { promise -> Promise<Void> in
return promise.then { envelopes -> Promise<Void> in
for envelope in envelopes {
Logger.info("Envelope received.")
do {
let envelopeData = try envelope.serializedData()
self.messageReceiver.handleReceivedEnvelopeData(envelopeData)
} catch {
owsFailDebug("Failed to serialize envelope.")
}
self.acknowledgeDelivery(envelope: envelope)
}
return Promise.value(())
}
}
return when(resolved: promises).asVoid()
}
promise.retainUntilComplete()
return promise
/* Loki: Original code
* ========
let promise = self.fetchUndeliveredMessages().then { (envelopes: [SSKProtoEnvelope], more: Bool) -> Promise<Void> in let promise = self.fetchUndeliveredMessages().then { (envelopes: [SSKProtoEnvelope], more: Bool) -> Promise<Void> in
for envelope in envelopes { for envelope in envelopes {
Logger.info("received envelope.") Logger.info("received envelope.")
@ -73,6 +95,8 @@ public class MessageFetcherJob: NSObject {
promise.retainUntilComplete() promise.retainUntilComplete()
return promise return promise
* ========
*/
} }
@objc @objc
@ -174,14 +198,8 @@ public class MessageFetcherJob: NSObject {
} }
} }
private func fetchUndeliveredMessages() -> Promise<(envelopes: [SSKProtoEnvelope], more: Bool)> { private func fetchUndeliveredMessages() -> Promise<Set<Promise<[SSKProtoEnvelope]>>> {
notImplemented() return LokiAPI.getMessages()
// return Promise { resolver in
// LokiAPI.getMessages().done { envelopes in
// resolver.fulfill((envelopes: envelopes, more: false))
// }.catch { error in
// resolver.reject(error)
// }
// Loki: Original code // Loki: Original code
// ======== // ========
// let request = OWSRequestFactory.getMessagesRequest() // let request = OWSRequestFactory.getMessagesRequest()
@ -204,7 +222,6 @@ public class MessageFetcherJob: NSObject {
// resolver.reject(error) // resolver.reject(error)
// }) // })
// ======== // ========
// }
} }
private func acknowledgeDelivery(envelope: SSKProtoEnvelope) { private func acknowledgeDelivery(envelope: SSKProtoEnvelope) {

@ -47,7 +47,9 @@ import PromiseKit
} }
private static func getRandomSnode() -> Promise<Target> { private static func getRandomSnode() -> Promise<Target> {
return Promise<Target> { _ in notImplemented() } // TODO: Implement return Promise<Target> { seal in
seal.fulfill(Target(address: "http://13.238.53.205", port: 8080)) // TODO: For debugging purposes
}
} }
private static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[Target]> { private static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[Target]> {
@ -91,7 +93,11 @@ import PromiseKit
// MARK: Public API (Obj-C) // MARK: Public API (Obj-C)
@objc public static func objc_sendSignalMessage(_ signalMessage: SignalMessage, to destination: String, timestamp: UInt64, requiringPoW isPoWRequired: Bool) -> AnyPromise { @objc public static func objc_sendSignalMessage(_ signalMessage: SignalMessage, to destination: String, timestamp: UInt64, requiringPoW isPoWRequired: Bool) -> AnyPromise {
let promise = Message.from(signalMessage: signalMessage, timestamp: timestamp, requiringPoW: isPoWRequired).then(sendMessage) let promise = Message.from(signalMessage: signalMessage, timestamp: timestamp, requiringPoW: isPoWRequired).then(sendMessage).mapValues { promise -> AnyPromise in
let anyPromise = AnyPromise(promise)
anyPromise.retainUntilComplete()
return anyPromise
}.map { Set($0) }
let anyPromise = AnyPromise(promise) let anyPromise = AnyPromise(promise)
anyPromise.retainUntilComplete() anyPromise.retainUntilComplete()
return anyPromise return anyPromise
@ -102,11 +108,16 @@ import PromiseKit
// The parsing utilities below use a best attempt approach to parsing; they warn for parsing failures but don't throw exceptions. // The parsing utilities below use a best attempt approach to parsing; they warn for parsing failures but don't throw exceptions.
private static func parseTargets(from rawResponse: Any) -> [Target] { private static func parseTargets(from rawResponse: Any) -> [Target] {
guard let json = rawResponse as? JSON, let addresses = json["snodes"] as? [String] else { // TODO: For debugging purposes
Logger.warn("[Loki] Failed to parse targets from: \(rawResponse).") // ========
return [] let target = Target(address: "http://13.238.53.205", port: 8080)
} return Array(repeating: target, count: 3)
return addresses.map { Target(address: $0, port: defaultSnodePort) } // ========
// guard let json = rawResponse as? JSON, let addresses = json["snodes"] as? [String] else {
// Logger.warn("[Loki] Failed to parse targets from: \(rawResponse).")
// return []
// }
// return addresses.map { Target(address: $0, port: defaultSnodePort) }
} }
private static func updateLastMessageHashValueIfPossible(for target: Target, from rawMessages: [JSON]) { private static func updateLastMessageHashValueIfPossible(for target: Target, from rawMessages: [JSON]) {
@ -118,7 +129,7 @@ import PromiseKit
} }
private static func removeDuplicates(from rawMessages: [JSON]) -> [JSON] { private static func removeDuplicates(from rawMessages: [JSON]) -> [JSON] {
var receivedMessageHashValues = getReceivedMessageHashValues() var receivedMessageHashValues = getReceivedMessageHashValues() ?? []
return rawMessages.filter { rawMessage in return rawMessages.filter { rawMessage in
guard let hashValue = rawMessage["hash"] as? String else { guard let hashValue = rawMessage["hash"] as? String else {
Logger.warn("[Loki] Missing hash value for message: \(rawMessage).") Logger.warn("[Loki] Missing hash value for message: \(rawMessage).")
@ -161,8 +172,8 @@ import PromiseKit
} }
} }
private static func getReceivedMessageHashValues() -> Set<String> { private static func getReceivedMessageHashValues() -> Set<String>? {
var result: Set<String> = [] var result: Set<String>? = nil
storage.dbReadConnection.read { transaction in storage.dbReadConnection.read { transaction in
result = storage.getReceivedMessageHashes(with: transaction) result = storage.getReceivedMessageHashes(with: transaction)
} }

@ -95,7 +95,7 @@ NS_ASSUME_NONNULL_BEGIN
*/ */
- (void)setLastMessageHashForServiceNode:(NSString *)serviceNode hash:(NSString *)hash expiresAt:(u_int64_t)expiresAt transaction:(YapDatabaseReadWriteTransaction *)transaction NS_SWIFT_NAME(setLastMessageHash(forServiceNode:hash:expiresAt:transaction:)); - (void)setLastMessageHashForServiceNode:(NSString *)serviceNode hash:(NSString *)hash expiresAt:(u_int64_t)expiresAt transaction:(YapDatabaseReadWriteTransaction *)transaction NS_SWIFT_NAME(setLastMessageHash(forServiceNode:hash:expiresAt:transaction:));
- (NSSet<NSString *> *)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction; - (NSSet<NSString *> *_Nullable)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction;
- (void)setReceivedMessageHashes:(NSSet<NSString *> *)receivedMessageHashes withTransaction:(YapDatabaseReadWriteTransaction *)transaction; - (void)setReceivedMessageHashes:(NSSet<NSString *> *)receivedMessageHashes withTransaction:(YapDatabaseReadWriteTransaction *)transaction;
@end @end

@ -153,7 +153,7 @@
[transaction removeObjectForKey:serviceNode inCollection:LKLastMessageHashCollection]; [transaction removeObjectForKey:serviceNode inCollection:LKLastMessageHashCollection];
} }
- (NSSet<NSString *> *)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction { - (NSSet<NSString *> *_Nullable)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction {
return (NSSet *)[[transaction objectForKey:LKReceivedMessageHashesKey inCollection:LKReceivedMessageHashesCollection] as:NSSet.class]; return (NSSet *)[[transaction objectForKey:LKReceivedMessageHashesKey inCollection:LKReceivedMessageHashesCollection] as:NSSet.class];
} }

@ -1103,11 +1103,11 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException";
return messageSend.failure(error); return messageSend.failure(error);
} }
// Update the state to show that proof of work is being calculated // Update the state to show that the proof of work is being calculated
[self setIsCalculatingProofOfWorkForMessage:messageSend]; [self setIsCalculatingProofOfWorkForMessage:messageSend];
// Convert the message to a Loki message and send it using the Loki messaging API // Convert the message to a Loki message and send it using the Loki messaging API
NSDictionary *signalMessage = deviceMessages.firstObject; NSDictionary *signalMessage = deviceMessages.firstObject;
// Update the thread's friend request status if needed // Update the message and thread if needed
NSInteger *messageType = ((NSNumber *)signalMessage[@"type"]).integerValue; NSInteger *messageType = ((NSNumber *)signalMessage[@"type"]).integerValue;
if (messageType == TSFriendRequestMessageType) { if (messageType == TSFriendRequestMessageType) {
[message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusRequestSending withTransaction:nil]; [message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusRequestSending withTransaction:nil];
@ -1116,32 +1116,35 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException";
BOOL isPoWRequired = YES; // TODO: Base on message type BOOL isPoWRequired = YES; // TODO: Base on message type
[[LokiAPI objc_sendSignalMessage:signalMessage to:recipient.recipientId timestamp:message.timestamp requiringPoW:isPoWRequired] [[LokiAPI objc_sendSignalMessage:signalMessage to:recipient.recipientId timestamp:message.timestamp requiringPoW:isPoWRequired]
.thenOn(OWSDispatch.sendingQueue, ^(id result) { .thenOn(OWSDispatch.sendingQueue, ^(id result) {
// Loki NSSet<AnyPromise *> *promises = (NSSet<AnyPromise *> *)result;
// ======== __block BOOL isSuccess = NO;
NSUInteger promiseCount = promises.count;
__block NSUInteger errorCount = 0;
for (AnyPromise *promise in promises) {
[promise
.thenOn(OWSDispatch.sendingQueue, ^(id result) {
if (isSuccess) { return; } // Succeed as soon as the first promise succeeds
isSuccess = YES;
// Update the message and thread if needed
if (messageType == TSFriendRequestMessageType) { if (messageType == TSFriendRequestMessageType) {
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusRequestSent withTransaction:transaction]; [message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusRequestSent withTransaction:transaction];
[message.thread removeOutgoingFriendRequestMessagesWithTransaction:transaction]; [message.thread removeOutgoingFriendRequestMessagesWithTransaction:transaction];
// Set the expiration date
NSTimeInterval expirationInterval = 72 * kHourInterval; NSTimeInterval expirationInterval = 72 * kHourInterval;
NSDate *expireDate = [[NSDate new] dateByAddingTimeInterval:expirationInterval]; NSDate *expirationDate = [[NSDate new] dateByAddingTimeInterval:expirationInterval];
[message saveFriendRequestExpiresAt:[NSDate ows_millisecondsSince1970ForDate:expireDate] withTransaction:transaction]; [message saveFriendRequestExpiresAt:[NSDate ows_millisecondsSince1970ForDate:expirationDate] withTransaction:transaction];
}]; }];
} }
// ========
// Invoke the completion handler // Invoke the completion handler
[self messageSendDidSucceed:messageSend [self messageSendDidSucceed:messageSend deviceMessages:deviceMessages wasSentByUD:false wasSentByWebsocket:false];
deviceMessages:deviceMessages
wasSentByUD:false
wasSentByWebsocket:false];
}) })
.catchOn(OWSDispatch.sendingQueue, ^(NSError *error) { .catchOn(OWSDispatch.sendingQueue, ^(NSError *error) {
// Loki errorCount += 1;
// ======== if (errorCount != promiseCount) { return; } // Only error out if all promises failed
// Update the thread if needed
if (messageType == TSFriendRequestMessageType) { if (messageType == TSFriendRequestMessageType) {
[message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusNone withTransaction:nil]; [message.thread saveFriendRequestStatus:TSThreadFriendRequestStatusNone withTransaction:nil];
} }
// ========
// Handle the error // Handle the error
NSUInteger statusCode = 0; NSUInteger statusCode = 0;
NSData *_Nullable responseData = nil; NSData *_Nullable responseData = nil;
@ -1151,16 +1154,15 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException";
if (underlyingError) { if (underlyingError) {
responseData = underlyingError.userInfo[AFNetworkingOperationFailingURLResponseDataErrorKey]; responseData = underlyingError.userInfo[AFNetworkingOperationFailingURLResponseDataErrorKey];
} else { } else {
OWSFailDebug(@"Missing underlying error: %@", error); OWSFailDebug(@"Missing underlying error: %@.", error);
} }
} else { } else {
OWSFailDebug(@"Unexpected error: %@", error); OWSFailDebug(@"Unexpected error: %@.", error);
} }
[self messageSendDidFail:messageSend [self messageSendDidFail:messageSend deviceMessages:deviceMessages statusCode:statusCode error:error responseData:responseData];
deviceMessages:deviceMessages }) retainUntilComplete];
statusCode:statusCode }
error:error
responseData:responseData];
}) retainUntilComplete]; }) retainUntilComplete];
// Loki: Original code // Loki: Original code

Loading…
Cancel
Save