Modify OWSOutgoingReceiptManager to handle read receipts.

pull/1/head
Matthew Chen 7 years ago
parent 010ce1f6c2
commit 5cf8909a28

@ -53,6 +53,7 @@ NS_ASSUME_NONNULL_BEGIN
} }
_messageTimestamps = [messageTimestamps copy]; _messageTimestamps = [messageTimestamps copy];
_receiptType = receiptType;
return self; return self;
} }

@ -15,6 +15,8 @@ NS_ASSUME_NONNULL_BEGIN
- (void)enqueueDeliveryReceiptForEnvelope:(SSKProtoEnvelope *)envelope; - (void)enqueueDeliveryReceiptForEnvelope:(SSKProtoEnvelope *)envelope;
- (void)enqueueReadReceiptForEnvelope:(NSString *)messageAuthorId timestamp:(uint64_t)timestamp;
@end @end
NS_ASSUME_NONNULL_END NS_ASSUME_NONNULL_END

@ -15,7 +15,13 @@
NS_ASSUME_NONNULL_BEGIN NS_ASSUME_NONNULL_BEGIN
NSString *const kOutgoingReceiptManagerCollection = @"kOutgoingReceiptManagerCollection"; typedef NS_ENUM(NSUInteger, OWSReceiptType) {
OWSReceiptType_Delivery,
OWSReceiptType_Read,
};
NSString *const kOutgoingDeliveryReceiptManagerCollection = @"kOutgoingDeliveryReceiptManagerCollection";
NSString *const kOutgoingReadReceiptManagerCollection = @"kOutgoingReadReceiptManagerCollection";
@interface OWSOutgoingReceiptManager () @interface OWSOutgoingReceiptManager ()
@ -23,7 +29,7 @@ NSString *const kOutgoingReceiptManagerCollection = @"kOutgoingReceiptManagerCol
@property (nonatomic) Reachability *reachability; @property (nonatomic) Reachability *reachability;
// Should only be accessed on the serialQueue. // This property should only be accessed on the serialQueue.
@property (nonatomic) BOOL isProcessing; @property (nonatomic) BOOL isProcessing;
@end @end
@ -82,7 +88,7 @@ NSString *const kOutgoingReceiptManagerCollection = @"kOutgoingReceiptManagerCol
static dispatch_queue_t _serialQueue; static dispatch_queue_t _serialQueue;
static dispatch_once_t onceToken; static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{ dispatch_once(&onceToken, ^{
_serialQueue = dispatch_queue_create("org.whispersystems.deliveryReceipts", DISPATCH_QUEUE_SERIAL); _serialQueue = dispatch_queue_create("org.whispersystems.outgoingReceipts", DISPATCH_QUEUE_SERIAL);
}); });
return _serialQueue; return _serialQueue;
@ -104,7 +110,7 @@ NSString *const kOutgoingReceiptManagerCollection = @"kOutgoingReceiptManagerCol
} }
- (void)process { - (void)process {
OWSLogVerbose(@"Processing outbound delivery receipts."); OWSLogVerbose(@"Processing outbound receipts.");
if (!self.reachability.isReachable) { if (!self.reachability.isReachable) {
// No network availability; abort. // No network availability; abort.
@ -112,42 +118,87 @@ NSString *const kOutgoingReceiptManagerCollection = @"kOutgoingReceiptManagerCol
return; return;
} }
NSMutableDictionary<NSString *, NSSet<NSNumber *> *> *deliveryReceiptMap = [NSMutableDictionary new]; NSMutableArray<AnyPromise *> *sendPromises = [NSMutableArray array];
[sendPromises addObjectsFromArray:[self sendReceiptsForCollection:kOutgoingDeliveryReceiptManagerCollection
receiptType:OWSReceiptType_Delivery]];
[sendPromises addObjectsFromArray:[self sendReceiptsForCollection:kOutgoingReadReceiptManagerCollection
receiptType:OWSReceiptType_Read]];
if (sendPromises.count < 1) {
// No work to do; abort.
self.isProcessing = NO;
return;
}
AnyPromise *completionPromise = PMKJoin(sendPromises);
completionPromise.always(^() {
// Wait N seconds before conducting another pass.
// This allows time for a batch to accumulate.
//
// We want a value high enough to allow us to effectively de-duplicate
// receipts without being so high that we incur so much latency that
// the user notices.
const CGFloat kProcessingFrequencySeconds = 3.f;
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(kProcessingFrequencySeconds * NSEC_PER_SEC)),
self.serialQueue,
^{
[self process];
});
});
[completionPromise retainUntilComplete];
}
- (NSArray<AnyPromise *> *)sendReceiptsForCollection:(NSString *)collection receiptType:(OWSReceiptType)receiptType {
NSMutableDictionary<NSString *, NSSet<NSNumber *> *> *queuedReceiptMap = [NSMutableDictionary new];
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *transaction) { [self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *transaction) {
[transaction enumerateKeysAndObjectsInCollection:kOutgoingReceiptManagerCollection [transaction enumerateKeysAndObjectsInCollection:collection
usingBlock:^(NSString *key, id object, BOOL *stop) { usingBlock:^(NSString *key, id object, BOOL *stop) {
NSString *recipientId = key; NSString *recipientId = key;
NSSet<NSNumber *> *timestamps = object; NSSet<NSNumber *> *timestamps = object;
deliveryReceiptMap[recipientId] = [timestamps copy]; queuedReceiptMap[recipientId] = [timestamps copy];
}]; }];
}]; }];
NSMutableArray<AnyPromise *> *sendPromises = [NSMutableArray array]; NSMutableArray<AnyPromise *> *sendPromises = [NSMutableArray array];
for (NSString *recipientId in deliveryReceiptMap) { for (NSString *recipientId in queuedReceiptMap) {
NSSet<NSNumber *> *timestamps = deliveryReceiptMap[recipientId]; NSSet<NSNumber *> *timestamps = queuedReceiptMap[recipientId];
if (timestamps.count < 1) { if (timestamps.count < 1) {
OWSFailDebug(@"Missing timestamps."); OWSFailDebug(@"Missing timestamps.");
continue; continue;
} }
TSThread *thread = [TSContactThread getOrCreateThreadWithContactId:recipientId]; TSThread *thread = [TSContactThread getOrCreateThreadWithContactId:recipientId];
OWSReceiptsForSenderMessage *message = OWSReceiptsForSenderMessage *message;
[OWSReceiptsForSenderMessage deliveryReceiptsForSenderMessageWithThread:thread NSString *receiptName;
messageTimestamps:timestamps.allObjects]; switch (receiptType) {
case OWSReceiptType_Delivery:
message =
[OWSReceiptsForSenderMessage deliveryReceiptsForSenderMessageWithThread:thread
messageTimestamps:timestamps.allObjects];
receiptName = @"Delivery";
break;
case OWSReceiptType_Read:
message = [OWSReceiptsForSenderMessage readReceiptsForSenderMessageWithThread:thread
messageTimestamps:timestamps.allObjects];
receiptName = @"Read";
break;
}
AnyPromise *sendPromise = [AnyPromise promiseWithResolverBlock:^(PMKResolver resolve) { AnyPromise *sendPromise = [AnyPromise promiseWithResolverBlock:^(PMKResolver resolve) {
[self.messageSender enqueueMessage:message [self.messageSender enqueueMessage:message
success:^{ success:^{
OWSLogInfo(@"Successfully sent %lu delivery receipts to sender.", (unsigned long)timestamps.count); OWSLogInfo(
@"Successfully sent %lu %@ receipts to sender.", (unsigned long)timestamps.count, receiptName);
[self dequeueDeliveryReceiptsWithRecipientId:recipientId timestamps:timestamps]; [self dequeueReceiptsWithRecipientId:recipientId timestamps:timestamps collection:collection];
// The value doesn't matter, we just need any non-NSError value. // The value doesn't matter, we just need any non-NSError value.
resolve(@(1)); resolve(@(1));
} }
failure:^(NSError *error) { failure:^(NSError *error) {
OWSLogError(@"Failed to send delivery receipts to sender with error: %@", error); OWSLogError(@"Failed to send %@ receipts to sender with error: %@", receiptName, error);
resolve(error); resolve(error);
}]; }];
@ -155,38 +206,24 @@ NSString *const kOutgoingReceiptManagerCollection = @"kOutgoingReceiptManagerCol
[sendPromises addObject:sendPromise]; [sendPromises addObject:sendPromise];
} }
if (sendPromises.count < 1) { return [sendPromises copy];
// No work to do; abort.
self.isProcessing = NO;
return;
}
AnyPromise *completionPromise = PMKJoin(sendPromises);
completionPromise.always(^() {
// Wait N seconds before processing delivery receipts again.
// This allows time for a batch to accumulate.
//
// We want a value high enough to allow us to effectively de-duplicate,
// delivery receipts without being so high that we risk not sending delivery
// receipts due to app exit.
const CGFloat kProcessingFrequencySeconds = 3.f;
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(kProcessingFrequencySeconds * NSEC_PER_SEC)),
self.serialQueue,
^{
[self process];
});
});
[completionPromise retainUntilComplete];
} }
- (void)enqueueDeliveryReceiptForEnvelope:(SSKProtoEnvelope *)envelope { - (void)enqueueDeliveryReceiptForEnvelope:(SSKProtoEnvelope *)envelope {
OWSLogVerbose(@""); [self enqueueReceiptWithRecipientId:envelope.source
timestamp:envelope.timestamp
collection:kOutgoingDeliveryReceiptManagerCollection];
}
[self enqueueDeliveryReceiptWithRecipientId:envelope.source timestamp:envelope.timestamp]; - (void)enqueueReadReceiptForEnvelope:(NSString *)messageAuthorId timestamp:(uint64_t)timestamp {
[self enqueueReceiptWithRecipientId:messageAuthorId
timestamp:timestamp
collection:kOutgoingReadReceiptManagerCollection];
} }
- (void)enqueueDeliveryReceiptWithRecipientId:(NSString *)recipientId timestamp:(uint64_t)timestamp { - (void)enqueueReceiptWithRecipientId:(NSString *)recipientId
OWSLogVerbose(@""); timestamp:(uint64_t)timestamp
collection:(NSString *)collection {
if (recipientId.length < 1) { if (recipientId.length < 1) {
OWSFailDebug(@"Invalid recipient id."); OWSFailDebug(@"Invalid recipient id.");
@ -198,20 +235,21 @@ NSString *const kOutgoingReceiptManagerCollection = @"kOutgoingReceiptManagerCol
} }
dispatch_async(self.serialQueue, ^{ dispatch_async(self.serialQueue, ^{
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
NSSet<NSNumber *> *_Nullable oldTimestamps = [transaction objectForKey:recipientId NSSet<NSNumber *> *_Nullable oldTimestamps = [transaction objectForKey:recipientId inCollection:collection];
inCollection:kOutgoingReceiptManagerCollection];
NSMutableSet<NSNumber *> *newTimestamps NSMutableSet<NSNumber *> *newTimestamps
= (oldTimestamps ? [oldTimestamps mutableCopy] : [NSMutableSet new]); = (oldTimestamps ? [oldTimestamps mutableCopy] : [NSMutableSet new]);
[newTimestamps addObject:@(timestamp)]; [newTimestamps addObject:@(timestamp)];
[transaction setObject:newTimestamps forKey:recipientId inCollection:kOutgoingReceiptManagerCollection]; [transaction setObject:newTimestamps forKey:recipientId inCollection:collection];
}]; }];
[self scheduleProcessing]; [self scheduleProcessing];
}); });
} }
- (void)dequeueDeliveryReceiptsWithRecipientId:(NSString *)recipientId timestamps:(NSSet<NSNumber *> *)timestamps { - (void)dequeueReceiptsWithRecipientId:(NSString *)recipientId
timestamps:(NSSet<NSNumber *> *)timestamps
collection:(NSString *)collection {
if (recipientId.length < 1) { if (recipientId.length < 1) {
OWSFailDebug(@"Invalid recipient id."); OWSFailDebug(@"Invalid recipient id.");
return; return;
@ -222,16 +260,15 @@ NSString *const kOutgoingReceiptManagerCollection = @"kOutgoingReceiptManagerCol
} }
dispatch_async(self.serialQueue, ^{ dispatch_async(self.serialQueue, ^{
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
NSSet<NSNumber *> *_Nullable oldTimestamps = [transaction objectForKey:recipientId NSSet<NSNumber *> *_Nullable oldTimestamps = [transaction objectForKey:recipientId inCollection:collection];
inCollection:kOutgoingReceiptManagerCollection];
NSMutableSet<NSNumber *> *newTimestamps NSMutableSet<NSNumber *> *newTimestamps
= (oldTimestamps ? [oldTimestamps mutableCopy] : [NSMutableSet new]); = (oldTimestamps ? [oldTimestamps mutableCopy] : [NSMutableSet new]);
[newTimestamps minusSet:timestamps]; [newTimestamps minusSet:timestamps];
if (newTimestamps.count > 0) { if (newTimestamps.count > 0) {
[transaction setObject:newTimestamps forKey:recipientId inCollection:kOutgoingReceiptManagerCollection]; [transaction setObject:newTimestamps forKey:recipientId inCollection:collection];
} else { } else {
[transaction removeObjectForKey:recipientId inCollection:kOutgoingReceiptManagerCollection]; [transaction removeObjectForKey:recipientId inCollection:collection];
} }
}]; }];
}); });

@ -7,6 +7,7 @@
#import "NSNotificationCenter+OWS.h" #import "NSNotificationCenter+OWS.h"
#import "OWSLinkedDeviceReadReceipt.h" #import "OWSLinkedDeviceReadReceipt.h"
#import "OWSMessageSender.h" #import "OWSMessageSender.h"
#import "OWSOutgoingReceiptManager.h"
#import "OWSPrimaryStorage.h" #import "OWSPrimaryStorage.h"
#import "OWSReadReceiptsForLinkedDevicesMessage.h" #import "OWSReadReceiptsForLinkedDevicesMessage.h"
#import "OWSReceiptsForSenderMessage.h" #import "OWSReceiptsForSenderMessage.h"
@ -125,12 +126,6 @@ NSString *const OWSReadReceiptManagerAreReadReceiptsEnabled = @"areReadReceiptsE
// Should only be accessed while synchronized on the OWSReadReceiptManager. // Should only be accessed while synchronized on the OWSReadReceiptManager.
@property (nonatomic, readonly) NSMutableDictionary<NSString *, OWSLinkedDeviceReadReceipt *> *toLinkedDevicesReadReceiptMap; @property (nonatomic, readonly) NSMutableDictionary<NSString *, OWSLinkedDeviceReadReceipt *> *toLinkedDevicesReadReceiptMap;
// A map of "recipient id"-to-"timestamp list" for read receipts that
// we will send to senders.
//
// Should only be accessed while synchronized on the OWSReadReceiptManager.
@property (nonatomic, readonly) NSMutableDictionary<NSString *, NSMutableSet<NSNumber *> *> *toSenderReadReceiptMap;
// Should only be accessed while synchronized on the OWSReadReceiptManager. // Should only be accessed while synchronized on the OWSReadReceiptManager.
@property (nonatomic) BOOL isProcessing; @property (nonatomic) BOOL isProcessing;
@ -159,7 +154,6 @@ NSString *const OWSReadReceiptManagerAreReadReceiptsEnabled = @"areReadReceiptsE
_dbConnection = primaryStorage.newDatabaseConnection; _dbConnection = primaryStorage.newDatabaseConnection;
_toLinkedDevicesReadReceiptMap = [NSMutableDictionary new]; _toLinkedDevicesReadReceiptMap = [NSMutableDictionary new];
_toSenderReadReceiptMap = [NSMutableDictionary new];
OWSSingletonAssert(); OWSSingletonAssert();
@ -228,31 +222,7 @@ NSString *const OWSReadReceiptManagerAreReadReceiptsEnabled = @"areReadReceiptsE
}]; }];
} }
NSDictionary<NSString *, NSMutableSet<NSNumber *> *> *toSenderReadReceiptMap = BOOL didWork = readReceiptsForLinkedDevices.count > 0;
[self.toSenderReadReceiptMap copy];
[self.toSenderReadReceiptMap removeAllObjects];
if (toSenderReadReceiptMap.count > 0) {
for (NSString *recipientId in toSenderReadReceiptMap) {
NSSet<NSNumber *> *timestamps = toSenderReadReceiptMap[recipientId];
OWSAssertDebug(timestamps.count > 0);
TSThread *thread = [TSContactThread getOrCreateThreadWithContactId:recipientId];
OWSReceiptsForSenderMessage *message =
[OWSReceiptsForSenderMessage readReceiptsForSenderMessageWithThread:thread
messageTimestamps:timestamps.allObjects];
[self.messageSender enqueueMessage:message
success:^{
OWSLogInfo(@"Successfully sent %lu read receipts to sender.", (unsigned long)timestamps.count);
}
failure:^(NSError *error) {
OWSLogError(@"Failed to send read receipts to sender with error: %@", error);
}];
}
[self.toSenderReadReceiptMap removeAllObjects];
}
BOOL didWork = (readReceiptsForLinkedDevices.count > 0 || toSenderReadReceiptMap.count > 0);
if (didWork) { if (didWork) {
// Wait N seconds before processing read receipts again. // Wait N seconds before processing read receipts again.
@ -323,12 +293,8 @@ NSString *const OWSReadReceiptManagerAreReadReceiptsEnabled = @"areReadReceiptsE
if ([self areReadReceiptsEnabled]) { if ([self areReadReceiptsEnabled]) {
OWSLogVerbose(@"Enqueuing read receipt for sender."); OWSLogVerbose(@"Enqueuing read receipt for sender.");
NSMutableSet<NSNumber *> *_Nullable timestamps = self.toSenderReadReceiptMap[messageAuthorId]; [SSKEnvironment.shared.outgoingReceiptManager enqueueReadReceiptForEnvelope:messageAuthorId
if (!timestamps) { timestamp:message.timestamp];
timestamps = [NSMutableSet new];
self.toSenderReadReceiptMap[messageAuthorId] = timestamps;
}
[timestamps addObject:@(message.timestamp)];
} }
[self scheduleProcessing]; [self scheduleProcessing];

Loading…
Cancel
Save