Merge branch 'mkirk/durable-queue-reachability' into master

pull/1/head
Michael Kirk 7 years ago
commit 9ad77399cb

@ -1108,8 +1108,7 @@ static NSTimeInterval launchStartedAt;
[SSKEnvironment.shared.messageReceiver handleAnyUnprocessedEnvelopesAsync];
[SSKEnvironment.shared.batchMessageProcessor handleAnyUnprocessedEnvelopesAsync];
// TODO
// - incorporate reachability check
[SSKEnvironment.shared.reachabilityManager setup];
[SSKEnvironment.shared.messageSenderJobQueue setup];
[AppEnvironment.shared.sessionResetJobQueue setup];

@ -20,21 +20,15 @@ public class SessionResetJobQueue: NSObject, JobQueue {
public typealias DurableOperationType = SessionResetOperation
public let jobRecordLabel: String = "SessionReset"
public static let maxRetries: UInt = 10
public let requiresInternet: Bool = true
public var runningOperations: [SessionResetOperation] = []
@objc
public func setup() {
defaultSetup()
}
public var isReady: Bool = false {
didSet {
if isReady {
DispatchQueue.global().async {
self.workStep()
}
}
}
}
public var isSetup: Bool = false
public func didMarkAsReady(oldJobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) {
// no special handling
@ -68,7 +62,7 @@ public class SessionResetOperation: OWSOperation, DurableOperation {
weak public var durableOperationDelegate: SessionResetJobQueue?
public var operation: Operation {
public var operation: OWSOperation {
return self
}
@ -151,7 +145,7 @@ public class SessionResetOperation: OWSOperation, DurableOperation {
}
}
override public func retryDelay() -> dispatch_time_t {
override public func retryInterval() -> TimeInterval {
// Arbitrary backoff factor...
// With backOffFactor of 1.9
// try 1 delay: 0.00s
@ -164,7 +158,8 @@ public class SessionResetOperation: OWSOperation, DurableOperation {
let maxBackoff = kHourInterval
let seconds = 0.1 * min(maxBackoff, pow(backoffFactor, Double(self.jobRecord.failureCount)))
return UInt64(seconds) * NSEC_PER_SEC
return seconds
}
override public func didFail(error: Error) {

@ -85,6 +85,7 @@ NS_ASSUME_NONNULL_BEGIN
[[OWSOutgoingReceiptManager alloc] initWithPrimaryStorage:primaryStorage];
OWSSyncManager *syncManager = [[OWSSyncManager alloc] initDefault];
id<SSKReachabilityManager> reachabilityManager = [SSKReachabilityManagerImpl new];
OWSSounds *sounds = [[OWSSounds alloc] initWithPrimaryStorage:primaryStorage];
LockInteractionController *lockInteractionController = [[LockInteractionController alloc] initDefault];
OWSWindowManager *windowManager = [[OWSWindowManager alloc] initDefault];
@ -115,6 +116,7 @@ NS_ASSUME_NONNULL_BEGIN
contactDiscoveryService:contactDiscoveryService
readReceiptManager:readReceiptManager
outgoingReceiptManager:outgoingReceiptManager
reachabilityManager:reachabilityManager
syncManager:syncManager]];
appSpecificSingletonBlock();

@ -66,6 +66,8 @@ public class MessageSenderJobQueue: NSObject, JobQueue {
public typealias DurableOperationType = MessageSenderOperation
public static let jobRecordLabel: String = "MessageSender"
public static let maxRetries: UInt = 10
public let requiresInternet: Bool = true
public var runningOperations: [MessageSenderOperation] = []
public var jobRecordLabel: String {
return type(of: self).jobRecordLabel
@ -76,16 +78,7 @@ public class MessageSenderJobQueue: NSObject, JobQueue {
defaultSetup()
}
@objc
public var isReady: Bool = false {
didSet {
if isReady {
DispatchQueue.global().async {
self.workStep()
}
}
}
}
public var isSetup: Bool = false
public func didMarkAsReady(oldJobRecord: SSKMessageSenderJobRecord, transaction: YapDatabaseReadWriteTransaction) {
if let messageId = oldJobRecord.messageId, let message = TSOutgoingMessage.fetch(uniqueId: messageId, transaction: transaction) {
@ -145,7 +138,7 @@ public class MessageSenderOperation: OWSOperation, DurableOperation {
weak public var durableOperationDelegate: MessageSenderJobQueue?
public var operation: Operation {
public var operation: OWSOperation {
return self
}
@ -192,7 +185,7 @@ public class MessageSenderOperation: OWSOperation, DurableOperation {
}
}
override public func retryDelay() -> dispatch_time_t {
override public func retryInterval() -> TimeInterval {
guard !CurrentAppContext().isRunningTests else {
return 0
}
@ -209,7 +202,7 @@ public class MessageSenderOperation: OWSOperation, DurableOperation {
let maxBackoff = kHourInterval
let seconds = 0.1 * min(maxBackoff, pow(backoffFactor, Double(self.jobRecord.failureCount)))
return UInt64(seconds) * NSEC_PER_SEC
return seconds
}
override public func didFail(error: Error) {

@ -0,0 +1,57 @@
//
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
//
import Foundation
@objc(SSKReachabilityType)
public enum ReachabilityType: Int {
case any, wifi, cellular
}
@objc
public protocol SSKReachabilityManager {
var observationContext: AnyObject { get }
func setup()
var isReachable: Bool { get }
func isReachable(via reachabilityType: ReachabilityType) -> Bool
}
@objc
public class SSKReachabilityManagerImpl: NSObject, SSKReachabilityManager {
public let reachability: Reachability
public var observationContext: AnyObject {
return self.reachability
}
public var isReachable: Bool {
return isReachable(via: .any)
}
public func isReachable(via reachabilityType: ReachabilityType) -> Bool {
switch reachabilityType {
case .any:
return reachability.isReachable()
case .wifi:
return reachability.isReachableViaWiFi()
case .cellular:
return reachability.isReachableViaWWAN()
}
}
@objc
override public init() {
self.reachability = Reachability.forInternetConnection()
}
@objc
public func setup() {
guard reachability.startNotifier() else {
owsFailDebug("failed to start notifier")
return
}
Logger.debug("started notifier")
}
}

@ -31,6 +31,7 @@ NS_ASSUME_NONNULL_BEGIN
@protocol OWSCallMessageHandler;
@protocol ProfileManagerProtocol;
@protocol OWSUDManager;
@protocol SSKReachabilityManager;
@protocol OWSSyncManagerProtocol;
@interface SSKEnvironment : NSObject
@ -56,6 +57,7 @@ NS_ASSUME_NONNULL_BEGIN
contactDiscoveryService:(ContactDiscoveryService *)contactDiscoveryService
readReceiptManager:(OWSReadReceiptManager *)readReceiptManager
outgoingReceiptManager:(OWSOutgoingReceiptManager *)outgoingReceiptManager
reachabilityManager:(id<SSKReachabilityManager>)reachabilityManager
syncManager:(id<OWSSyncManagerProtocol>)syncManager NS_DESIGNATED_INITIALIZER;
- (instancetype)init NS_UNAVAILABLE;
@ -91,6 +93,7 @@ NS_ASSUME_NONNULL_BEGIN
@property (nonatomic, readonly) OWSReadReceiptManager *readReceiptManager;
@property (nonatomic, readonly) OWSOutgoingReceiptManager *outgoingReceiptManager;
@property (nonatomic, readonly) id<OWSSyncManagerProtocol> syncManager;
@property (nonatomic, readonly) id<SSKReachabilityManager> reachabilityManager;
// This property is configured after Environment is created.
@property (atomic, nullable) id<OWSCallMessageHandler> callMessageHandler;

@ -33,6 +33,7 @@ static SSKEnvironment *sharedSSKEnvironment;
@property (nonatomic) OWSReadReceiptManager *readReceiptManager;
@property (nonatomic) OWSOutgoingReceiptManager *outgoingReceiptManager;
@property (nonatomic) id<OWSSyncManagerProtocol> syncManager;
@property (nonatomic) id<SSKReachabilityManager> reachabilityManager;
@end
@ -68,6 +69,7 @@ static SSKEnvironment *sharedSSKEnvironment;
contactDiscoveryService:(ContactDiscoveryService *)contactDiscoveryService
readReceiptManager:(OWSReadReceiptManager *)readReceiptManager
outgoingReceiptManager:(OWSOutgoingReceiptManager *)outgoingReceiptManager
reachabilityManager:(id<SSKReachabilityManager>)reachabilityManager
syncManager:(id<OWSSyncManagerProtocol>)syncManager
{
self = [super init];
@ -97,6 +99,7 @@ static SSKEnvironment *sharedSSKEnvironment;
OWSAssertDebug(readReceiptManager);
OWSAssertDebug(outgoingReceiptManager);
OWSAssertDebug(syncManager);
OWSAssertDebug(reachabilityManager);
_contactsManager = contactsManager;
_messageSender = messageSender;
@ -120,6 +123,7 @@ static SSKEnvironment *sharedSSKEnvironment;
_readReceiptManager = readReceiptManager;
_outgoingReceiptManager = outgoingReceiptManager;
_syncManager = syncManager;
_reachabilityManager = reachabilityManager;
return self;
}

@ -3,6 +3,7 @@
//
// Anything used by Swift outside of the framework must be imported.
#import <Reachability/Reachability.h>
#import <SignalServiceKit/OWSFileSystem.h>
#import <SignalServiceKit/OWSOperation.h>
#import <SignalServiceKit/OWSSyncManagerProtocol.h>

@ -73,6 +73,7 @@ NS_ASSUME_NONNULL_BEGIN
OWSReadReceiptManager *readReceiptManager = [[OWSReadReceiptManager alloc] initWithPrimaryStorage:primaryStorage];
OWSOutgoingReceiptManager *outgoingReceiptManager =
[[OWSOutgoingReceiptManager alloc] initWithPrimaryStorage:primaryStorage];
id<SSKReachabilityManager> reachabilityManager = [SSKReachabilityManagerImpl new];
id<OWSSyncManagerProtocol> syncManager = [[OWSMockSyncManager alloc] init];
self = [super initWithContactsManager:contactsManager
@ -96,6 +97,7 @@ NS_ASSUME_NONNULL_BEGIN
contactDiscoveryService:contactDiscoveryService
readReceiptManager:readReceiptManager
outgoingReceiptManager:outgoingReceiptManager
reachabilityManager:reachabilityManager
syncManager:syncManager];
if (!self) {
return nil;

@ -37,7 +37,7 @@ public protocol DurableOperation: class {
var jobRecord: JobRecordType { get }
var durableOperationDelegate: DurableOperationDelegateType? { get set }
var operation: Operation { get }
var operation: OWSOperation { get }
var remainingRetries: UInt { get set }
}
@ -67,15 +67,22 @@ public protocol JobQueue: DurableOperationDelegate {
// MARK: Required
var runningOperations: [DurableOperationType] { get set }
var jobRecordLabel: String { get }
var isReady: Bool { get set }
var isSetup: Bool { get set }
func setup()
func didMarkAsReady(oldJobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction)
func operationQueue(jobRecord: JobRecordType) -> OperationQueue
func buildOperation(jobRecord: JobRecordType, transaction: YapDatabaseReadTransaction) throws -> DurableOperationType
/// When `requiresInternet` is true, we immediately run any jobs which are waiting for retry upon detecting Reachability.
///
/// Because `Reachability` isn't 100% reliable, the jobs will be attempted regardless of what we think our current Reachability is.
/// However, because these jobs will likely fail many times in succession, their `retryInterval` could be quite long by the time we
/// are back online.
var requiresInternet: Bool { get }
static var maxRetries: UInt { get }
}
@ -91,6 +98,10 @@ public extension JobQueue {
return JobRecordFinder()
}
var reachabilityManager: SSKReachabilityManager {
return SSKEnvironment.shared.reachabilityManager
}
// MARK:
func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) {
@ -105,12 +116,11 @@ public extension JobQueue {
func workStep() {
Logger.debug("")
guard isReady else {
guard isSetup else {
if !CurrentAppContext().isRunningTests {
owsFailDebug("not ready")
owsFailDebug("not setup")
}
Logger.error("not ready")
return
}
@ -132,6 +142,8 @@ public extension JobQueue {
let remainingRetries = self.remainingRetries(durableOperation: durableOperation)
durableOperation.remainingRetries = remainingRetries
self.runningOperations.append(durableOperation)
Logger.debug("adding operation: \(durableOperation) with remainingRetries: \(remainingRetries)")
operationQueue.addOperation(durableOperation.operation)
} catch JobError.assertionFailure(let description) {
@ -181,13 +193,31 @@ public extension JobQueue {
/// `setup` is called from objc, and default implementations from a protocol
/// cannot be marked as @objc.
func defaultSetup() {
guard !isReady else {
guard !isSetup else {
owsFailDebug("already ready already")
return
}
self.restartOldJobs()
self.isReady = true
if self.requiresInternet {
NotificationCenter.default.addObserver(forName: .reachabilityChanged,
object: self.reachabilityManager.observationContext,
queue: nil) { _ in
if self.reachabilityManager.isReachable {
Logger.verbose("isReachable: true")
self.becameReachable()
} else {
Logger.verbose("isReachable: false")
}
}
}
self.isSetup = true
DispatchQueue.global().async {
self.workStep()
}
}
func remainingRetries(durableOperation: DurableOperationType) -> UInt {
@ -201,9 +231,18 @@ public extension JobQueue {
return maxRetries - failureCount
}
func becameReachable() {
guard requiresInternet else {
return
}
self.runningOperations.first?.operation.runAnyQueuedRetry()
}
// MARK: DurableOperationDelegate
func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction) {
self.runningOperations = self.runningOperations.filter { $0 !== operation }
operation.jobRecord.remove(with: transaction)
}
@ -217,6 +256,7 @@ public extension JobQueue {
}
func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction) {
self.runningOperations = self.runningOperations.filter { $0 !== operation }
operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction)
}
}

@ -52,10 +52,14 @@ typedef NS_ENUM(NSInteger, OWSOperationState) {
- (void)didFailWithError:(NSError *)error NS_SWIFT_NAME(didFail(error:));
// How long to wait before retry, if possible
- (dispatch_time_t)retryDelay;
- (NSTimeInterval)retryInterval;
#pragma mark - Success/Error - Do Not Override
// Runs now if a retry timer has been set by a previous failure,
// otherwise assumes we're currently running and does nothing.
- (void)runAnyQueuedRetry;
// Report that the operation completed successfully.
//
// Each invocation of `run` must make exactly one call to one of: `reportSuccess`, `reportCancelled`, or `reportError:`

@ -4,6 +4,7 @@
#import "OWSOperation.h"
#import "NSError+MessageSending.h"
#import "NSTimer+OWS.h"
#import "OWSBackgroundTask.h"
#import "OWSError.h"
@ -17,6 +18,8 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished";
@property (nullable) NSError *failingError;
@property (atomic) OWSOperationState operationState;
@property (nonatomic) OWSBackgroundTask *backgroundTask;
@property (nonatomic) NSTimer *_Nullable retryTimer;
@property (nonatomic, readonly) dispatch_queue_t retryTimerSerialQueue;
@end
@ -31,6 +34,7 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished";
_operationState = OWSOperationStateNew;
_backgroundTask = [OWSBackgroundTask backgroundTaskWithLabel:self.logTag];
_retryTimerSerialQueue = dispatch_queue_create("SignalServiceKit.OWSOperation.retryTimer", DISPATCH_QUEUE_SERIAL);
// Operations are not retryable by default.
_remainingRetries = 0;
@ -125,6 +129,22 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished";
[self run];
}
- (void)runAnyQueuedRetry
{
__block NSTimer *_Nullable retryTimer;
dispatch_sync(self.retryTimerSerialQueue, ^{
retryTimer = self.retryTimer;
self.retryTimer = nil;
[retryTimer invalidate];
});
if (retryTimer != nil) {
[self run];
} else {
OWSLogVerbose(@"not re-running since operation is already running.");
}
}
#pragma mark - Public Methods
// These methods are not intended to be subclassed
@ -170,15 +190,20 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished";
self.remainingRetries--;
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, self.retryDelay), dispatch_get_main_queue(), ^{
[self run];
dispatch_sync(self.retryTimerSerialQueue, ^{
[self.retryTimer invalidate];
self.retryTimer = [NSTimer weakScheduledTimerWithTimeInterval:self.retryInterval
target:self
selector:@selector(runAnyQueuedRetry)
userInfo:nil
repeats:NO];
});
}
// Override in subclass if you want something more sophisticated, e.g. exponential backoff
- (dispatch_time_t)retryDelay
- (NSTimeInterval)retryInterval
{
return (0.1 * NSEC_PER_SEC);
return 0.1;
}
#pragma mark - Life Cycle

@ -127,7 +127,7 @@ class MessageSenderJobQueueTest: SSKBaseTestSwift {
error.isRetryable = true
self.messageSender.stubbedFailingError = error
let expectation = sentExpectation(message: message) {
jobQueue.isReady = false
jobQueue.isSetup = false
}
jobQueue.setup()
@ -188,7 +188,7 @@ class MessageSenderJobQueueTest: SSKBaseTestSwift {
error.isRetryable = false
self.messageSender.stubbedFailingError = error
let expectation = sentExpectation(message: message) {
jobQueue.isReady = false
jobQueue.isSetup = false
}
jobQueue.setup()
self.wait(for: [expectation], timeout: 0.1)

@ -47,13 +47,7 @@ class TestJobQueue: JobQueue {
// no special handling
}
var isReady: Bool = false {
didSet {
DispatchQueue.global().async {
self.workStep()
}
}
}
var isSetup: Bool = false
let operationQueue = OperationQueue()
@ -161,8 +155,7 @@ class JobQueueTest: SSKBaseTestSwift {
}
// Verify re-queue
jobQueue.isReady = false
jobQueue.isSetup = false
jobQueue.setup()
self.readWrite { transaction in
@ -181,7 +174,7 @@ class JobQueueTest: SSKBaseTestSwift {
rerunGroup.leave()
}
jobQueue.isReady = true
jobQueue.isSetup = true
switch rerunGroup.wait(timeout: .now() + 1.0) {
case .timedOut:

Loading…
Cancel
Save