|
|
|
@ -110,11 +110,11 @@ NSString *const kDeliveryReceiptManagerCollection = @"kDeliveryReceiptManagerCol
|
|
|
|
|
usingBlock:^(NSString *key, id object, BOOL *stop) {
|
|
|
|
|
NSString *recipientId = key;
|
|
|
|
|
NSSet<NSNumber *> *timestamps = object;
|
|
|
|
|
deliveryReceiptMap[recipientId] = timestamps;
|
|
|
|
|
deliveryReceiptMap[recipientId] = [timestamps copy];
|
|
|
|
|
}];
|
|
|
|
|
}];
|
|
|
|
|
|
|
|
|
|
BOOL didWork = NO;
|
|
|
|
|
NSMutableArray<AnyPromise *> *sendPromises = [NSMutableArray array];
|
|
|
|
|
|
|
|
|
|
for (NSString *recipientId in deliveryReceiptMap) {
|
|
|
|
|
NSSet<NSNumber *> *timestamps = deliveryReceiptMap[recipientId];
|
|
|
|
@ -128,41 +128,47 @@ NSString *const kDeliveryReceiptManagerCollection = @"kDeliveryReceiptManagerCol
|
|
|
|
|
[OWSReceiptsForSenderMessage deliveryReceiptsForSenderMessageWithThread:thread
|
|
|
|
|
messageTimestamps:timestamps.allObjects];
|
|
|
|
|
|
|
|
|
|
[self.messageSender enqueueMessage:message
|
|
|
|
|
success:^{
|
|
|
|
|
OWSLogInfo(@"Successfully sent %lu delivery receipts to sender.", (unsigned long)timestamps.count);
|
|
|
|
|
}
|
|
|
|
|
failure:^(NSError *error) {
|
|
|
|
|
OWSLogError(@"Failed to send delivery receipts to sender with error: %@", error);
|
|
|
|
|
}];
|
|
|
|
|
AnyPromise *sendPromise = [AnyPromise promiseWithResolverBlock:^(PMKResolver resolve) {
|
|
|
|
|
[self.messageSender enqueueMessage:message
|
|
|
|
|
success:^{
|
|
|
|
|
OWSLogInfo(@"Successfully sent %lu delivery receipts to sender.", (unsigned long)timestamps.count);
|
|
|
|
|
|
|
|
|
|
didWork = YES;
|
|
|
|
|
}
|
|
|
|
|
[self dequeueDeliveryReceiptsWithRecipientId:recipientId timestamps:timestamps];
|
|
|
|
|
|
|
|
|
|
// Now that they've been processed, remove all enqueued delivery receipts.
|
|
|
|
|
//
|
|
|
|
|
// NOTE: we don't need to worry about race conditions; this
|
|
|
|
|
// collection will only be mutated on serialQueue.
|
|
|
|
|
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
|
|
|
|
|
[transaction removeAllObjectsInCollection:kDeliveryReceiptManagerCollection];
|
|
|
|
|
}];
|
|
|
|
|
// The value doesn't matter, we just need any non-NSError value.
|
|
|
|
|
resolve(@(1));
|
|
|
|
|
}
|
|
|
|
|
failure:^(NSError *error) {
|
|
|
|
|
OWSLogError(@"Failed to send delivery receipts to sender with error: %@", error);
|
|
|
|
|
|
|
|
|
|
if (!didWork) {
|
|
|
|
|
resolve(error);
|
|
|
|
|
}];
|
|
|
|
|
}];
|
|
|
|
|
[sendPromises addObject:sendPromise];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (sendPromises.count < 1) {
|
|
|
|
|
// No work to do; abort.
|
|
|
|
|
self.isProcessing = NO;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait N seconds before processing delivery receipts again.
|
|
|
|
|
// This allows time for a batch to accumulate.
|
|
|
|
|
//
|
|
|
|
|
// We want a value high enough to allow us to effectively de-duplicate,
|
|
|
|
|
// delivery receipts without being so high that we risk not sending delivery
|
|
|
|
|
// receipts due to app exit.
|
|
|
|
|
const CGFloat kProcessingFrequencySeconds = 3.f;
|
|
|
|
|
dispatch_after(
|
|
|
|
|
dispatch_time(DISPATCH_TIME_NOW, (int64_t)(kProcessingFrequencySeconds * NSEC_PER_SEC)), self.serialQueue, ^{
|
|
|
|
|
[self process];
|
|
|
|
|
});
|
|
|
|
|
AnyPromise *completionPromise = PMKJoin(sendPromises);
|
|
|
|
|
completionPromise.always(^() {
|
|
|
|
|
// Wait N seconds before processing delivery receipts again.
|
|
|
|
|
// This allows time for a batch to accumulate.
|
|
|
|
|
//
|
|
|
|
|
// We want a value high enough to allow us to effectively de-duplicate,
|
|
|
|
|
// delivery receipts without being so high that we risk not sending delivery
|
|
|
|
|
// receipts due to app exit.
|
|
|
|
|
const CGFloat kProcessingFrequencySeconds = 3.f;
|
|
|
|
|
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(kProcessingFrequencySeconds * NSEC_PER_SEC)),
|
|
|
|
|
self.serialQueue,
|
|
|
|
|
^{
|
|
|
|
|
[self process];
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
[completionPromise retainUntilComplete];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- (void)envelopeWasReceived:(SSKProtoEnvelope *)envelope {
|
|
|
|
|