From 1066089980fadf885cf854f3eae84521355af4ae Mon Sep 17 00:00:00 2001 From: Michael Kirk Date: Fri, 28 Jul 2017 15:57:06 -0400 Subject: [PATCH 1/4] Fix thread explosion Without this, when the user has a large message queue to process, things slow to a crawl as we spew more and more threads. Since it's on a serial queue anyway, there's no need to have multiple threads executing this code. // FREEBIE --- .../src/Messages/OWSMessageReceiver.m | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m index d144fb981..1a4e21e0e 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.m +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -186,6 +186,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces @property (nonatomic, readonly) TSMessagesManager *messagesManager; @property (nonatomic, readonly) OWSMessageProcessingJobFinder *finder; +@property (nonatomic) BOOL isDrainingQueue; - (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager finder:(OWSMessageProcessingJobFinder *)finder NS_DESIGNATED_INITIALIZER; @@ -207,6 +208,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces _messagesManager = messagesManager; _finder = finder; + _isDrainingQueue = NO; return self; } @@ -220,16 +222,34 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces - (void)drainQueue { - dispatch_async(self.class.serialGCDQueue, ^{ - OWSMessageProcessingJob *_Nullable job = [self.finder nextJob]; - if (job == nil) { - DDLogVerbose(@"%@ Queue is drained", self.tag); - return; - } + AssertIsOnMainThread(); + + if (self.isDrainingQueue) { + return; + } + self.isDrainingQueue = YES; + + [self drainQueueWorkStep]; +} +- (void)drainQueueWorkStep +{ + AssertIsOnMainThread(); + + OWSMessageProcessingJob *_Nullable job = [self.finder nextJob]; + if (job == nil) { + self.isDrainingQueue = NO; + DDLogVerbose(@"%@ Queue is drained", self.tag); + return; + } + + dispatch_async(self.class.serialGCDQueue, ^{ [self processJob:job completion:^{ - [self drainQueue]; + DDLogVerbose(@"%@ completed job. %lu jobs left.", + self.tag, + (unsigned long)[OWSMessageProcessingJob numberOfKeysInCollection]); + [self drainQueueWorkStep]; }]; }); } From 6a5c6a9fc9ad67e562fb565791eb8832d649578f Mon Sep 17 00:00:00 2001 From: Michael Kirk Date: Fri, 28 Jul 2017 16:05:48 -0400 Subject: [PATCH 2/4] didBecomeActive kicks the processing queue // FREEBIE --- Signal/src/AppDelegate.m | 3 +++ SignalServiceKit/src/Messages/OWSMessageReceiver.h | 1 + SignalServiceKit/src/Messages/OWSMessageReceiver.m | 5 +++++ 3 files changed, 9 insertions(+) diff --git a/Signal/src/AppDelegate.m b/Signal/src/AppDelegate.m index a50a7dd78..a9238ca88 100644 --- a/Signal/src/AppDelegate.m +++ b/Signal/src/AppDelegate.m @@ -517,6 +517,9 @@ static NSString *const kURLHostVerifyPrefix = @"verify"; // This will fetch new messages, if we're using domain // fronting. [[PushManager sharedManager] applicationDidBecomeActive]; + + // If there were any messages in our local queue which we hadn't yet processed. + [[OWSMessageReceiver sharedInstance] handleAnyUnprocessedEnvelopes]; }]; DDLogInfo(@"%@ applicationDidBecomeActive completed.", self.tag); diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.h b/SignalServiceKit/src/Messages/OWSMessageReceiver.h index 6418f7b3c..c4a7b367e 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.h +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.h @@ -13,6 +13,7 @@ NS_ASSUME_NONNULL_BEGIN + (void)syncRegisterDatabaseExtension:(YapDatabase *)database; - (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope; +- (void)handleAnyUnprocessedEnvelopes; @end diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m index 1a4e21e0e..021c49357 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.m +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -351,6 +351,11 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces #pragma mark - instance methods +- (void)handleAnyUnprocessedEnvelopes +{ + [self.processingQueue drainQueue]; +} + - (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope { // Drop any too-large messages on the floor. Well behaving clients should never send them. From 0b38b46683f7568d3f181f39992305d8391106f6 Mon Sep 17 00:00:00 2001 From: Michael Kirk Date: Fri, 28 Jul 2017 16:46:34 -0400 Subject: [PATCH 3/4] remove unnecessary dispatch // FREEBIE --- .../src/Messages/OWSMessageReceiver.m | 44 +++++++------------ 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m index 021c49357..d22ec1c83 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.m +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -243,38 +243,22 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces return; } - dispatch_async(self.class.serialGCDQueue, ^{ - [self processJob:job - completion:^{ - DDLogVerbose(@"%@ completed job. %lu jobs left.", - self.tag, - (unsigned long)[OWSMessageProcessingJob numberOfKeysInCollection]); - [self drainQueueWorkStep]; - }]; - }); + [self processJob:job + completion:^{ + DDLogVerbose(@"%@ completed job. %lu jobs left.", + self.tag, + (unsigned long)[OWSMessageProcessingJob numberOfKeysInCollection]); + [self drainQueueWorkStep]; + }]; } - (void)processJob:(OWSMessageProcessingJob *)job completion:(void (^)())completion { - dispatch_async(dispatch_get_main_queue(), ^{ - [self.messagesManager processEnvelope:job.envelopeProto - completion:^{ - [self.finder removeJobWithId:job.uniqueId]; - completion(); - }]; - }); -} - -#pragma mark Helpers - -+ (dispatch_queue_t)serialGCDQueue -{ - static dispatch_once_t onceToken; - static dispatch_queue_t queue; - dispatch_once(&onceToken, ^{ - queue = dispatch_queue_create("org.whispersystems.signal.messageProcessingQueue", NULL); - }); - return queue; + [self.messagesManager processEnvelope:job.envelopeProto + completion:^{ + [self.finder removeJobWithId:job.uniqueId]; + completion(); + }]; } #pragma mark Logging @@ -353,7 +337,9 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces - (void)handleAnyUnprocessedEnvelopes { - [self.processingQueue drainQueue]; + dispatch_async(dispatch_get_main_queue(), ^{ + [self.processingQueue drainQueue]; + }); } - (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope From a1966934255249bdda8f182fa09e4e3b81dc12e9 Mon Sep 17 00:00:00 2001 From: Michael Kirk Date: Fri, 28 Jul 2017 17:04:52 -0400 Subject: [PATCH 4/4] Make sure DB views are ready before kicking processing job // FREEBIE --- Signal/src/AppDelegate.m | 6 +++--- SignalServiceKit/src/Messages/OWSMessageReceiver.h | 2 +- SignalServiceKit/src/Messages/OWSMessageReceiver.m | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Signal/src/AppDelegate.m b/Signal/src/AppDelegate.m index a9238ca88..6748639f3 100644 --- a/Signal/src/AppDelegate.m +++ b/Signal/src/AppDelegate.m @@ -517,9 +517,6 @@ static NSString *const kURLHostVerifyPrefix = @"verify"; // This will fetch new messages, if we're using domain // fronting. [[PushManager sharedManager] applicationDidBecomeActive]; - - // If there were any messages in our local queue which we hadn't yet processed. - [[OWSMessageReceiver sharedInstance] handleAnyUnprocessedEnvelopes]; }]; DDLogInfo(@"%@ applicationDidBecomeActive completed.", self.tag); @@ -796,6 +793,9 @@ static NSString *const kURLHostVerifyPrefix = @"verify"; [AppVersion.instance appLaunchDidComplete]; [self ensureRootViewController]; + + // If there were any messages in our local queue which we hadn't yet processed. + [[OWSMessageReceiver sharedInstance] handleAnyUnprocessedEnvelopesAsync]; } - (void)ensureRootViewController diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.h b/SignalServiceKit/src/Messages/OWSMessageReceiver.h index c4a7b367e..5af585219 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.h +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.h @@ -13,7 +13,7 @@ NS_ASSUME_NONNULL_BEGIN + (void)syncRegisterDatabaseExtension:(YapDatabase *)database; - (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope; -- (void)handleAnyUnprocessedEnvelopes; +- (void)handleAnyUnprocessedEnvelopesAsync; @end diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m index d22ec1c83..91e979ae8 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.m +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -335,7 +335,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces #pragma mark - instance methods -- (void)handleAnyUnprocessedEnvelopes +- (void)handleAnyUnprocessedEnvelopesAsync { dispatch_async(dispatch_get_main_queue(), ^{ [self.processingQueue drainQueue];