diff --git a/Signal/src/AppDelegate.m b/Signal/src/AppDelegate.m index 899f00896..c2ef3c3f5 100644 --- a/Signal/src/AppDelegate.m +++ b/Signal/src/AppDelegate.m @@ -1108,8 +1108,7 @@ static NSTimeInterval launchStartedAt; [SSKEnvironment.shared.messageReceiver handleAnyUnprocessedEnvelopesAsync]; [SSKEnvironment.shared.batchMessageProcessor handleAnyUnprocessedEnvelopesAsync]; - // TODO - // - incorporate reachability check + [SSKEnvironment.shared.reachabilityManager setup]; [SSKEnvironment.shared.messageSenderJobQueue setup]; [AppEnvironment.shared.sessionResetJobQueue setup]; diff --git a/Signal/src/Jobs/SessionResetJob.swift b/Signal/src/Jobs/SessionResetJob.swift index 9db7dfbf1..7f68eaf5c 100644 --- a/Signal/src/Jobs/SessionResetJob.swift +++ b/Signal/src/Jobs/SessionResetJob.swift @@ -20,21 +20,15 @@ public class SessionResetJobQueue: NSObject, JobQueue { public typealias DurableOperationType = SessionResetOperation public let jobRecordLabel: String = "SessionReset" public static let maxRetries: UInt = 10 + public let requiresInternet: Bool = true + public var runningOperations: [SessionResetOperation] = [] @objc public func setup() { defaultSetup() } - public var isReady: Bool = false { - didSet { - if isReady { - DispatchQueue.global().async { - self.workStep() - } - } - } - } + public var isSetup: Bool = false public func didMarkAsReady(oldJobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) { // no special handling @@ -68,7 +62,7 @@ public class SessionResetOperation: OWSOperation, DurableOperation { weak public var durableOperationDelegate: SessionResetJobQueue? - public var operation: Operation { + public var operation: OWSOperation { return self } @@ -151,7 +145,7 @@ public class SessionResetOperation: OWSOperation, DurableOperation { } } - override public func retryDelay() -> dispatch_time_t { + override public func retryInterval() -> TimeInterval { // Arbitrary backoff factor... // With backOffFactor of 1.9 // try 1 delay: 0.00s @@ -164,7 +158,8 @@ public class SessionResetOperation: OWSOperation, DurableOperation { let maxBackoff = kHourInterval let seconds = 0.1 * min(maxBackoff, pow(backoffFactor, Double(self.jobRecord.failureCount))) - return UInt64(seconds) * NSEC_PER_SEC + + return seconds } override public func didFail(error: Error) { diff --git a/SignalMessaging/environment/AppSetup.m b/SignalMessaging/environment/AppSetup.m index 1ee77ad7b..0ee536ec8 100644 --- a/SignalMessaging/environment/AppSetup.m +++ b/SignalMessaging/environment/AppSetup.m @@ -85,6 +85,7 @@ NS_ASSUME_NONNULL_BEGIN [[OWSOutgoingReceiptManager alloc] initWithPrimaryStorage:primaryStorage]; OWSSyncManager *syncManager = [[OWSSyncManager alloc] initDefault]; + id reachabilityManager = [SSKReachabilityManagerImpl new]; OWSSounds *sounds = [[OWSSounds alloc] initWithPrimaryStorage:primaryStorage]; LockInteractionController *lockInteractionController = [[LockInteractionController alloc] initDefault]; OWSWindowManager *windowManager = [[OWSWindowManager alloc] initDefault]; @@ -115,6 +116,7 @@ NS_ASSUME_NONNULL_BEGIN contactDiscoveryService:contactDiscoveryService readReceiptManager:readReceiptManager outgoingReceiptManager:outgoingReceiptManager + reachabilityManager:reachabilityManager syncManager:syncManager]]; appSpecificSingletonBlock(); diff --git a/SignalServiceKit/src/Network/MessageSenderJobQueue.swift b/SignalServiceKit/src/Network/MessageSenderJobQueue.swift index 9b17aeb1b..7976a3a0f 100644 --- a/SignalServiceKit/src/Network/MessageSenderJobQueue.swift +++ b/SignalServiceKit/src/Network/MessageSenderJobQueue.swift @@ -66,6 +66,8 @@ public class MessageSenderJobQueue: NSObject, JobQueue { public typealias DurableOperationType = MessageSenderOperation public static let jobRecordLabel: String = "MessageSender" public static let maxRetries: UInt = 10 + public let requiresInternet: Bool = true + public var runningOperations: [MessageSenderOperation] = [] public var jobRecordLabel: String { return type(of: self).jobRecordLabel @@ -76,16 +78,7 @@ public class MessageSenderJobQueue: NSObject, JobQueue { defaultSetup() } - @objc - public var isReady: Bool = false { - didSet { - if isReady { - DispatchQueue.global().async { - self.workStep() - } - } - } - } + public var isSetup: Bool = false public func didMarkAsReady(oldJobRecord: SSKMessageSenderJobRecord, transaction: YapDatabaseReadWriteTransaction) { if let messageId = oldJobRecord.messageId, let message = TSOutgoingMessage.fetch(uniqueId: messageId, transaction: transaction) { @@ -145,7 +138,7 @@ public class MessageSenderOperation: OWSOperation, DurableOperation { weak public var durableOperationDelegate: MessageSenderJobQueue? - public var operation: Operation { + public var operation: OWSOperation { return self } @@ -192,7 +185,7 @@ public class MessageSenderOperation: OWSOperation, DurableOperation { } } - override public func retryDelay() -> dispatch_time_t { + override public func retryInterval() -> TimeInterval { guard !CurrentAppContext().isRunningTests else { return 0 } @@ -209,7 +202,7 @@ public class MessageSenderOperation: OWSOperation, DurableOperation { let maxBackoff = kHourInterval let seconds = 0.1 * min(maxBackoff, pow(backoffFactor, Double(self.jobRecord.failureCount))) - return UInt64(seconds) * NSEC_PER_SEC + return seconds } override public func didFail(error: Error) { diff --git a/SignalServiceKit/src/Network/ReachabilityManager.swift b/SignalServiceKit/src/Network/ReachabilityManager.swift new file mode 100644 index 000000000..2d02f4570 --- /dev/null +++ b/SignalServiceKit/src/Network/ReachabilityManager.swift @@ -0,0 +1,57 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +import Foundation + +@objc(SSKReachabilityType) +public enum ReachabilityType: Int { + case any, wifi, cellular +} + +@objc +public protocol SSKReachabilityManager { + var observationContext: AnyObject { get } + func setup() + + var isReachable: Bool { get } + func isReachable(via reachabilityType: ReachabilityType) -> Bool +} + +@objc +public class SSKReachabilityManagerImpl: NSObject, SSKReachabilityManager { + + public let reachability: Reachability + public var observationContext: AnyObject { + return self.reachability + } + + public var isReachable: Bool { + return isReachable(via: .any) + } + + public func isReachable(via reachabilityType: ReachabilityType) -> Bool { + switch reachabilityType { + case .any: + return reachability.isReachable() + case .wifi: + return reachability.isReachableViaWiFi() + case .cellular: + return reachability.isReachableViaWWAN() + } + } + + @objc + override public init() { + self.reachability = Reachability.forInternetConnection() + } + + @objc + public func setup() { + guard reachability.startNotifier() else { + owsFailDebug("failed to start notifier") + return + } + Logger.debug("started notifier") + } +} diff --git a/SignalServiceKit/src/SSKEnvironment.h b/SignalServiceKit/src/SSKEnvironment.h index 7e76a4302..ada5b1431 100644 --- a/SignalServiceKit/src/SSKEnvironment.h +++ b/SignalServiceKit/src/SSKEnvironment.h @@ -31,6 +31,7 @@ NS_ASSUME_NONNULL_BEGIN @protocol OWSCallMessageHandler; @protocol ProfileManagerProtocol; @protocol OWSUDManager; +@protocol SSKReachabilityManager; @protocol OWSSyncManagerProtocol; @interface SSKEnvironment : NSObject @@ -56,6 +57,7 @@ NS_ASSUME_NONNULL_BEGIN contactDiscoveryService:(ContactDiscoveryService *)contactDiscoveryService readReceiptManager:(OWSReadReceiptManager *)readReceiptManager outgoingReceiptManager:(OWSOutgoingReceiptManager *)outgoingReceiptManager + reachabilityManager:(id)reachabilityManager syncManager:(id)syncManager NS_DESIGNATED_INITIALIZER; - (instancetype)init NS_UNAVAILABLE; @@ -91,6 +93,7 @@ NS_ASSUME_NONNULL_BEGIN @property (nonatomic, readonly) OWSReadReceiptManager *readReceiptManager; @property (nonatomic, readonly) OWSOutgoingReceiptManager *outgoingReceiptManager; @property (nonatomic, readonly) id syncManager; +@property (nonatomic, readonly) id reachabilityManager; // This property is configured after Environment is created. @property (atomic, nullable) id callMessageHandler; diff --git a/SignalServiceKit/src/SSKEnvironment.m b/SignalServiceKit/src/SSKEnvironment.m index 07355297e..006e09f1e 100644 --- a/SignalServiceKit/src/SSKEnvironment.m +++ b/SignalServiceKit/src/SSKEnvironment.m @@ -33,6 +33,7 @@ static SSKEnvironment *sharedSSKEnvironment; @property (nonatomic) OWSReadReceiptManager *readReceiptManager; @property (nonatomic) OWSOutgoingReceiptManager *outgoingReceiptManager; @property (nonatomic) id syncManager; +@property (nonatomic) id reachabilityManager; @end @@ -68,6 +69,7 @@ static SSKEnvironment *sharedSSKEnvironment; contactDiscoveryService:(ContactDiscoveryService *)contactDiscoveryService readReceiptManager:(OWSReadReceiptManager *)readReceiptManager outgoingReceiptManager:(OWSOutgoingReceiptManager *)outgoingReceiptManager + reachabilityManager:(id)reachabilityManager syncManager:(id)syncManager { self = [super init]; @@ -97,6 +99,7 @@ static SSKEnvironment *sharedSSKEnvironment; OWSAssertDebug(readReceiptManager); OWSAssertDebug(outgoingReceiptManager); OWSAssertDebug(syncManager); + OWSAssertDebug(reachabilityManager); _contactsManager = contactsManager; _messageSender = messageSender; @@ -120,6 +123,7 @@ static SSKEnvironment *sharedSSKEnvironment; _readReceiptManager = readReceiptManager; _outgoingReceiptManager = outgoingReceiptManager; _syncManager = syncManager; + _reachabilityManager = reachabilityManager; return self; } diff --git a/SignalServiceKit/src/SignalServiceKit.h b/SignalServiceKit/src/SignalServiceKit.h index eb4382c5f..cd8c4bcd1 100644 --- a/SignalServiceKit/src/SignalServiceKit.h +++ b/SignalServiceKit/src/SignalServiceKit.h @@ -3,6 +3,7 @@ // // Anything used by Swift outside of the framework must be imported. +#import #import #import #import diff --git a/SignalServiceKit/src/TestUtils/MockSSKEnvironment.m b/SignalServiceKit/src/TestUtils/MockSSKEnvironment.m index 65966b2f1..69eb3eafa 100644 --- a/SignalServiceKit/src/TestUtils/MockSSKEnvironment.m +++ b/SignalServiceKit/src/TestUtils/MockSSKEnvironment.m @@ -73,6 +73,7 @@ NS_ASSUME_NONNULL_BEGIN OWSReadReceiptManager *readReceiptManager = [[OWSReadReceiptManager alloc] initWithPrimaryStorage:primaryStorage]; OWSOutgoingReceiptManager *outgoingReceiptManager = [[OWSOutgoingReceiptManager alloc] initWithPrimaryStorage:primaryStorage]; + id reachabilityManager = [SSKReachabilityManagerImpl new]; id syncManager = [[OWSMockSyncManager alloc] init]; self = [super initWithContactsManager:contactsManager @@ -96,6 +97,7 @@ NS_ASSUME_NONNULL_BEGIN contactDiscoveryService:contactDiscoveryService readReceiptManager:readReceiptManager outgoingReceiptManager:outgoingReceiptManager + reachabilityManager:reachabilityManager syncManager:syncManager]; if (!self) { return nil; diff --git a/SignalServiceKit/src/Util/JobQueue.swift b/SignalServiceKit/src/Util/JobQueue.swift index 607278b58..95cd6655d 100644 --- a/SignalServiceKit/src/Util/JobQueue.swift +++ b/SignalServiceKit/src/Util/JobQueue.swift @@ -37,7 +37,7 @@ public protocol DurableOperation: class { var jobRecord: JobRecordType { get } var durableOperationDelegate: DurableOperationDelegateType? { get set } - var operation: Operation { get } + var operation: OWSOperation { get } var remainingRetries: UInt { get set } } @@ -67,15 +67,22 @@ public protocol JobQueue: DurableOperationDelegate { // MARK: Required + var runningOperations: [DurableOperationType] { get set } var jobRecordLabel: String { get } - var isReady: Bool { get set } + var isSetup: Bool { get set } func setup() func didMarkAsReady(oldJobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) func operationQueue(jobRecord: JobRecordType) -> OperationQueue func buildOperation(jobRecord: JobRecordType, transaction: YapDatabaseReadTransaction) throws -> DurableOperationType + /// When `requiresInternet` is true, we immediately run any jobs which are waiting for retry upon detecting Reachability. + /// + /// Because `Reachability` isn't 100% reliable, the jobs will be attempted regardless of what we think our current Reachability is. + /// However, because these jobs will likely fail many times in succession, their `retryInterval` could be quite long by the time we + /// are back online. + var requiresInternet: Bool { get } static var maxRetries: UInt { get } } @@ -91,6 +98,10 @@ public extension JobQueue { return JobRecordFinder() } + var reachabilityManager: SSKReachabilityManager { + return SSKEnvironment.shared.reachabilityManager + } + // MARK: func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) { @@ -105,12 +116,11 @@ public extension JobQueue { func workStep() { Logger.debug("") - guard isReady else { + guard isSetup else { if !CurrentAppContext().isRunningTests { - owsFailDebug("not ready") + owsFailDebug("not setup") } - Logger.error("not ready") return } @@ -132,6 +142,8 @@ public extension JobQueue { let remainingRetries = self.remainingRetries(durableOperation: durableOperation) durableOperation.remainingRetries = remainingRetries + self.runningOperations.append(durableOperation) + Logger.debug("adding operation: \(durableOperation) with remainingRetries: \(remainingRetries)") operationQueue.addOperation(durableOperation.operation) } catch JobError.assertionFailure(let description) { @@ -181,13 +193,31 @@ public extension JobQueue { /// `setup` is called from objc, and default implementations from a protocol /// cannot be marked as @objc. func defaultSetup() { - guard !isReady else { + guard !isSetup else { owsFailDebug("already ready already") return } self.restartOldJobs() - self.isReady = true + if self.requiresInternet { + NotificationCenter.default.addObserver(forName: .reachabilityChanged, + object: self.reachabilityManager.observationContext, + queue: nil) { _ in + + if self.reachabilityManager.isReachable { + Logger.verbose("isReachable: true") + self.becameReachable() + } else { + Logger.verbose("isReachable: false") + } + } + } + + self.isSetup = true + + DispatchQueue.global().async { + self.workStep() + } } func remainingRetries(durableOperation: DurableOperationType) -> UInt { @@ -201,9 +231,18 @@ public extension JobQueue { return maxRetries - failureCount } + func becameReachable() { + guard requiresInternet else { + return + } + + self.runningOperations.first?.operation.runAnyQueuedRetry() + } + // MARK: DurableOperationDelegate func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction) { + self.runningOperations = self.runningOperations.filter { $0 !== operation } operation.jobRecord.remove(with: transaction) } @@ -217,6 +256,7 @@ public extension JobQueue { } func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction) { + self.runningOperations = self.runningOperations.filter { $0 !== operation } operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction) } } diff --git a/SignalServiceKit/src/Util/OWSOperation.h b/SignalServiceKit/src/Util/OWSOperation.h index 463a7e3cb..ebeeaa53f 100644 --- a/SignalServiceKit/src/Util/OWSOperation.h +++ b/SignalServiceKit/src/Util/OWSOperation.h @@ -52,10 +52,14 @@ typedef NS_ENUM(NSInteger, OWSOperationState) { - (void)didFailWithError:(NSError *)error NS_SWIFT_NAME(didFail(error:)); // How long to wait before retry, if possible -- (dispatch_time_t)retryDelay; +- (NSTimeInterval)retryInterval; #pragma mark - Success/Error - Do Not Override +// Runs now if a retry timer has been set by a previous failure, +// otherwise assumes we're currently running and does nothing. +- (void)runAnyQueuedRetry; + // Report that the operation completed successfully. // // Each invocation of `run` must make exactly one call to one of: `reportSuccess`, `reportCancelled`, or `reportError:` diff --git a/SignalServiceKit/src/Util/OWSOperation.m b/SignalServiceKit/src/Util/OWSOperation.m index 6e3da8a92..b04da4498 100644 --- a/SignalServiceKit/src/Util/OWSOperation.m +++ b/SignalServiceKit/src/Util/OWSOperation.m @@ -4,6 +4,7 @@ #import "OWSOperation.h" #import "NSError+MessageSending.h" +#import "NSTimer+OWS.h" #import "OWSBackgroundTask.h" #import "OWSError.h" @@ -17,6 +18,8 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; @property (nullable) NSError *failingError; @property (atomic) OWSOperationState operationState; @property (nonatomic) OWSBackgroundTask *backgroundTask; +@property (nonatomic) NSTimer *_Nullable retryTimer; +@property (nonatomic, readonly) dispatch_queue_t retryTimerSerialQueue; @end @@ -31,7 +34,8 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; _operationState = OWSOperationStateNew; _backgroundTask = [OWSBackgroundTask backgroundTaskWithLabel:self.logTag]; - + _retryTimerSerialQueue = dispatch_queue_create("SignalServiceKit.OWSOperation.retryTimer", DISPATCH_QUEUE_SERIAL); + // Operations are not retryable by default. _remainingRetries = 0; @@ -125,6 +129,22 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; [self run]; } +- (void)runAnyQueuedRetry +{ + __block NSTimer *_Nullable retryTimer; + dispatch_sync(self.retryTimerSerialQueue, ^{ + retryTimer = self.retryTimer; + self.retryTimer = nil; + [retryTimer invalidate]; + }); + + if (retryTimer != nil) { + [self run]; + } else { + OWSLogVerbose(@"not re-running since operation is already running."); + } +} + #pragma mark - Public Methods // These methods are not intended to be subclassed @@ -170,15 +190,20 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; self.remainingRetries--; - dispatch_after(dispatch_time(DISPATCH_TIME_NOW, self.retryDelay), dispatch_get_main_queue(), ^{ - [self run]; + dispatch_sync(self.retryTimerSerialQueue, ^{ + [self.retryTimer invalidate]; + self.retryTimer = [NSTimer weakScheduledTimerWithTimeInterval:self.retryInterval + target:self + selector:@selector(runAnyQueuedRetry) + userInfo:nil + repeats:NO]; }); } // Override in subclass if you want something more sophisticated, e.g. exponential backoff -- (dispatch_time_t)retryDelay +- (NSTimeInterval)retryInterval { - return (0.1 * NSEC_PER_SEC); + return 0.1; } #pragma mark - Life Cycle diff --git a/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift b/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift index 93e7ac429..d5ecbc6bc 100644 --- a/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift +++ b/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift @@ -127,7 +127,7 @@ class MessageSenderJobQueueTest: SSKBaseTestSwift { error.isRetryable = true self.messageSender.stubbedFailingError = error let expectation = sentExpectation(message: message) { - jobQueue.isReady = false + jobQueue.isSetup = false } jobQueue.setup() @@ -188,7 +188,7 @@ class MessageSenderJobQueueTest: SSKBaseTestSwift { error.isRetryable = false self.messageSender.stubbedFailingError = error let expectation = sentExpectation(message: message) { - jobQueue.isReady = false + jobQueue.isSetup = false } jobQueue.setup() self.wait(for: [expectation], timeout: 0.1) diff --git a/SignalServiceKit/tests/Util/JobQueueTest.swift b/SignalServiceKit/tests/Util/JobQueueTest.swift index 5765b6dbe..72023d6f1 100644 --- a/SignalServiceKit/tests/Util/JobQueueTest.swift +++ b/SignalServiceKit/tests/Util/JobQueueTest.swift @@ -47,13 +47,7 @@ class TestJobQueue: JobQueue { // no special handling } - var isReady: Bool = false { - didSet { - DispatchQueue.global().async { - self.workStep() - } - } - } + var isSetup: Bool = false let operationQueue = OperationQueue() @@ -161,8 +155,7 @@ class JobQueueTest: SSKBaseTestSwift { } // Verify re-queue - - jobQueue.isReady = false + jobQueue.isSetup = false jobQueue.setup() self.readWrite { transaction in @@ -181,7 +174,7 @@ class JobQueueTest: SSKBaseTestSwift { rerunGroup.leave() } - jobQueue.isReady = true + jobQueue.isSetup = true switch rerunGroup.wait(timeout: .now() + 1.0) { case .timedOut: