diff --git a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m index 032b014a0..2f5674cd4 100644 --- a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m +++ b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m @@ -132,6 +132,7 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc - (void)addJobWithEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData { + // We need to persist the decrypted envelope data ASAP to prevent data loss. [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { OWSMessageContentJob *job = [[OWSMessageContentJob alloc] initWithEnvelopeData:envelopeData plaintextData:plaintextData]; @@ -280,6 +281,7 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc { OWSAssert(envelopeData); + // We need to persist the decrypted envelope data ASAP to prevent data loss. [self.finder addJobWithEnvelopeData:envelopeData plaintextData:plaintextData]; } @@ -322,7 +324,15 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc self.tag, jobs.count, [OWSMessageContentJob numberOfKeysInCollection]); - [self drainQueueWorkStep]; + + // Wait a bit in hopes of increasing the batch size. + // This delay won't affect the first message to arrive when this queue is idle, + // so by definition we're receiving more than one message and can benefit from + // batching. + dispatch_after( + dispatch_time(DISPATCH_TIME_NOW, (int64_t)0.1f * NSEC_PER_SEC), dispatch_get_main_queue(), ^{ + [self drainQueueWorkStep]; + }); }); }]; } @@ -429,6 +439,7 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc { OWSAssert(envelopeData); + // We need to persist the decrypted envelope data ASAP to prevent data loss. [self.processingQueue enqueueEnvelopeData:envelopeData plaintextData:plaintextData]; [self.processingQueue drainQueue]; } diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m index 5e7e7feb3..a3e7ce5ea 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.m +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -105,27 +105,16 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin return self; } -- (NSArray *)nextJobsForBatchSize:(NSUInteger)maxBatchSize +- (OWSMessageDecryptJob *_Nullable)nextJob { - NSMutableArray *jobs = [NSMutableArray new]; + __block OWSMessageDecryptJob *_Nullable job = nil; [self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) { YapDatabaseViewTransaction *viewTransaction = [transaction ext:OWSMessageDecryptJobFinderExtensionName]; OWSAssert(viewTransaction != nil); - [viewTransaction enumerateKeysAndObjectsInGroup:OWSMessageDecryptJobFinderExtensionGroup - usingBlock:^(NSString *_Nonnull collection, - NSString *_Nonnull key, - id _Nonnull object, - NSUInteger index, - BOOL *_Nonnull stop) { - OWSMessageDecryptJob *job = object; - [jobs addObject:job]; - if (jobs.count >= maxBatchSize) { - *stop = YES; - } - }]; + job = [viewTransaction firstObjectInGroup:OWSMessageDecryptJobFinderExtensionGroup]; }]; - return [jobs copy]; + return job; } - (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope @@ -135,10 +124,10 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin }]; } -- (void)removeJobsWithIds:(NSArray *)uniqueIds +- (void)removeJobWithId:(NSString *)uniqueId { [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { - [transaction removeObjectsForKeys:uniqueIds inCollection:[OWSMessageDecryptJob collection]]; + [transaction removeObjectForKey:uniqueId inCollection:[OWSMessageDecryptJob collection]]; }]; } @@ -287,71 +276,45 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin { AssertIsOnMainThread(); - NSArray *jobs = [self.finder nextJobsForBatchSize:kIncomingMessageBatchSize]; - OWSAssert(jobs); - if (jobs.count < 1) { + OWSMessageDecryptJob *_Nullable job = [self.finder nextJob]; + if (!job) { self.isDrainingQueue = NO; DDLogVerbose(@"%@ Queue is drained.", self.tag); return; } - [self processJobs:jobs - completion:^{ - dispatch_async(dispatch_get_main_queue(), ^{ - [self.finder removeJobsWithIds:jobs.uniqueIds]; - DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.", - self.tag, - jobs.count, - [OWSMessageDecryptJob numberOfKeysInCollection]); - [self drainQueueWorkStep]; - }); - }]; + [self processJob:job + completion:^(BOOL success) { + [self.finder removeJobWithId:job.uniqueId]; + DDLogVerbose(@"%@ %@ job. %lu jobs left.", + self.tag, + success ? @"decrypted" : @"failed to decrypt", + (unsigned long)[OWSMessageDecryptJob numberOfKeysInCollection]); + [self drainQueueWorkStep]; + }]; } -- (void)processJobs:(NSArray *)jobs completion:(void (^)())completion +- (void)processJob:(OWSMessageDecryptJob *)job completion:(void (^)(BOOL))completion { - [self processJobs:jobs - unprocessedJobs:[jobs mutableCopy] - plaintextDataMap:[NSMutableDictionary new] - completion:completion]; -} - -- (void)processJobs:(NSArray *)jobs - unprocessedJobs:(NSMutableArray *)unprocessedJobs - plaintextDataMap:(NSMutableDictionary *)plaintextDataMap - completion:(void (^)())completion -{ - OWSAssert(jobs.count > 0); - OWSAssert(unprocessedJobs.count <= jobs.count); - - if (unprocessedJobs.count < 1) { - for (OWSMessageDecryptJob *job in jobs) { - NSData *_Nullable plaintextData = plaintextDataMap[job.uniqueId]; - [self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData]; - } - completion(); - return; - } + OWSAssert(job); + OWSSignalServiceProtosEnvelope *envelope = job.envelopeProto; dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ - OWSAssert(unprocessedJobs.count > 0); - OWSMessageDecryptJob *job = unprocessedJobs.firstObject; - [unprocessedJobs removeObjectAtIndex:0]; - [self.messageDecrypter decryptEnvelope:job.envelopeProto + [self.messageDecrypter decryptEnvelope:envelope successBlock:^(NSData *_Nullable plaintextData) { - if (plaintextData) { - plaintextDataMap[job.uniqueId] = plaintextData; - } - [self processJobs:jobs - unprocessedJobs:unprocessedJobs - plaintextDataMap:plaintextDataMap - completion:completion]; + + // We can't decrypt the same message twice, so we need to persist + // the decrypted envelope data ASAP to prevent data loss. + [self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData]; + + dispatch_async(dispatch_get_main_queue(), ^{ + completion(YES); + }); } failureBlock:^{ - [self processJobs:jobs - unprocessedJobs:unprocessedJobs - plaintextDataMap:plaintextDataMap - completion:completion]; + dispatch_async(dispatch_get_main_queue(), ^{ + completion(NO); + }); }]; }); }