mirror of https://github.com/oxen-io/session-ios
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
262 lines
8.8 KiB
Objective-C
262 lines
8.8 KiB
Objective-C
//
|
|
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
|
|
//
|
|
|
|
#import "OWSOutgoingReceiptManager.h"
|
|
#import <SessionMessagingKit/SessionMessagingKit-Swift.h>
|
|
#import "SSKEnvironment.h"
|
|
#import "AppReadiness.h"
|
|
#import "OWSPrimaryStorage.h"
|
|
#import "TSContactThread.h"
|
|
#import "TSYapDatabaseObject.h"
|
|
#import <PromiseKit/PromiseKit.h>
|
|
#import <YapDatabase/YapDatabase.h>
|
|
#import <Reachability/Reachability.h>
|
|
#import <SessionUtilitiesKit/SessionUtilitiesKit.h>
|
|
|
|
NS_ASSUME_NONNULL_BEGIN
|
|
|
|
typedef NS_ENUM(NSUInteger, OWSReceiptType) {
|
|
OWSReceiptType_Delivery,
|
|
OWSReceiptType_Read,
|
|
};
|
|
|
|
NSString *const kOutgoingDeliveryReceiptManagerCollection = @"kOutgoingDeliveryReceiptManagerCollection";
|
|
NSString *const kOutgoingReadReceiptManagerCollection = @"kOutgoingReadReceiptManagerCollection";
|
|
|
|
@interface OWSOutgoingReceiptManager ()
|
|
|
|
@property (nonatomic, readonly) YapDatabaseConnection *dbConnection;
|
|
|
|
@property (nonatomic) Reachability *reachability;
|
|
|
|
// This property should only be accessed on the serialQueue.
|
|
@property (nonatomic) BOOL isProcessing;
|
|
|
|
@end
|
|
|
|
#pragma mark -
|
|
|
|
@implementation OWSOutgoingReceiptManager
|
|
|
|
+ (instancetype)sharedManager
|
|
{
|
|
return SSKEnvironment.shared.outgoingReceiptManager;
|
|
}
|
|
|
|
- (instancetype)initWithPrimaryStorage:(OWSPrimaryStorage *)primaryStorage
|
|
{
|
|
self = [super init];
|
|
|
|
if (!self) {
|
|
return self;
|
|
}
|
|
|
|
self.reachability = [Reachability reachabilityForInternetConnection];
|
|
|
|
_dbConnection = primaryStorage.newDatabaseConnection;
|
|
|
|
[[NSNotificationCenter defaultCenter] addObserver:self
|
|
selector:@selector(reachabilityChanged)
|
|
name:kReachabilityChangedNotification
|
|
object:nil];
|
|
|
|
// Start processing.
|
|
[AppReadiness runNowOrWhenAppDidBecomeReady:^{
|
|
[self process];
|
|
}];
|
|
|
|
return self;
|
|
}
|
|
|
|
- (void)dealloc
|
|
{
|
|
[[NSNotificationCenter defaultCenter] removeObserver:self];
|
|
}
|
|
|
|
#pragma mark -
|
|
|
|
- (dispatch_queue_t)serialQueue
|
|
{
|
|
static dispatch_queue_t _serialQueue;
|
|
static dispatch_once_t onceToken;
|
|
dispatch_once(&onceToken, ^{
|
|
_serialQueue = dispatch_queue_create("org.whispersystems.outgoingReceipts", DISPATCH_QUEUE_SERIAL);
|
|
});
|
|
|
|
return _serialQueue;
|
|
}
|
|
|
|
// Schedules a processing pass, unless one is already scheduled.
|
|
- (void)process {
|
|
dispatch_async(self.serialQueue, ^{
|
|
if (self.isProcessing) {
|
|
return;
|
|
}
|
|
|
|
self.isProcessing = YES;
|
|
|
|
if (!self.reachability.isReachable) {
|
|
// No network availability; abort.
|
|
self.isProcessing = NO;
|
|
return;
|
|
}
|
|
|
|
NSMutableArray<AnyPromise *> *sendPromises = [NSMutableArray array];
|
|
[sendPromises addObjectsFromArray:[self sendReceiptsForReceiptType:OWSReceiptType_Read]];
|
|
|
|
if (sendPromises.count < 1) {
|
|
// No work to do; abort.
|
|
self.isProcessing = NO;
|
|
return;
|
|
}
|
|
|
|
AnyPromise *completionPromise = PMKJoin(sendPromises);
|
|
completionPromise.ensure(^() {
|
|
// Wait N seconds before conducting another pass.
|
|
// This allows time for a batch to accumulate.
|
|
//
|
|
// We want a value high enough to allow us to effectively de-duplicate
|
|
// receipts without being so high that we incur so much latency that
|
|
// the user notices.
|
|
const CGFloat kProcessingFrequencySeconds = 3.f;
|
|
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(kProcessingFrequencySeconds * NSEC_PER_SEC)),
|
|
self.serialQueue,
|
|
^{
|
|
self.isProcessing = NO;
|
|
|
|
[self process];
|
|
});
|
|
});
|
|
[completionPromise retainUntilComplete];
|
|
});
|
|
}
|
|
|
|
- (NSArray<AnyPromise *> *)sendReceiptsForReceiptType:(OWSReceiptType)receiptType {
|
|
if (receiptType == OWSReceiptType_Delivery) { return @[]; } // Don't send delivery receipts
|
|
|
|
NSString *collection = [self collectionForReceiptType:receiptType];
|
|
|
|
NSMutableDictionary<NSString *, NSSet<NSNumber *> *> *queuedReceiptMap = [NSMutableDictionary new];
|
|
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *transaction) {
|
|
[transaction enumerateKeysAndObjectsInCollection:collection
|
|
usingBlock:^(NSString *key, id object, BOOL *stop) {
|
|
NSString *recipientId = key;
|
|
NSSet<NSNumber *> *timestamps = object;
|
|
queuedReceiptMap[recipientId] = [timestamps copy];
|
|
}];
|
|
}];
|
|
|
|
NSMutableArray<AnyPromise *> *sendPromises = [NSMutableArray array];
|
|
|
|
for (NSString *recipientId in queuedReceiptMap) {
|
|
NSSet<NSNumber *> *timestampsAsSet = queuedReceiptMap[recipientId];
|
|
if (timestampsAsSet.count < 1) {
|
|
continue;
|
|
}
|
|
|
|
TSThread *thread = [TSContactThread getOrCreateThreadWithContactId:recipientId];
|
|
|
|
if (thread.isGroupThread) { // Don't send receipts in group threads
|
|
continue;
|
|
}
|
|
|
|
SNReadReceipt *readReceipt = [SNReadReceipt new];
|
|
NSMutableArray<NSNumber *> *timestamps = [NSMutableArray new];
|
|
for (NSNumber *timestamp in timestampsAsSet) {
|
|
[timestamps addObject:timestamp];
|
|
}
|
|
readReceipt.timestamps = timestamps;
|
|
[LKStorage writeWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
|
|
AnyPromise *promise = [SNMessageSender sendNonDurably:readReceipt inThread:thread usingTransaction:transaction]
|
|
.thenOn(self.serialQueue, ^(id object) {
|
|
[self dequeueReceiptsWithRecipientId:recipientId timestamps:timestampsAsSet receiptType:OWSReceiptType_Read];
|
|
});
|
|
[sendPromises addObject:promise];
|
|
}];
|
|
}
|
|
|
|
return [sendPromises copy];
|
|
}
|
|
|
|
- (void)enqueueDeliveryReceiptForEnvelope:(SNProtoEnvelope *)envelope
|
|
{
|
|
[self enqueueReceiptWithRecipientId:envelope.source
|
|
timestamp:envelope.timestamp
|
|
receiptType:OWSReceiptType_Delivery];
|
|
}
|
|
|
|
- (void)enqueueReadReceiptForEnvelope:(NSString *)messageAuthorId timestamp:(uint64_t)timestamp {
|
|
[self enqueueReceiptWithRecipientId:messageAuthorId timestamp:timestamp receiptType:OWSReceiptType_Read];
|
|
}
|
|
|
|
- (void)enqueueReceiptWithRecipientId:(NSString *)recipientId
|
|
timestamp:(uint64_t)timestamp
|
|
receiptType:(OWSReceiptType)receiptType {
|
|
NSString *collection = [self collectionForReceiptType:receiptType];
|
|
|
|
if (recipientId.length < 1) {
|
|
return;
|
|
}
|
|
if (timestamp < 1) {
|
|
return;
|
|
}
|
|
dispatch_async(self.serialQueue, ^{
|
|
[LKStorage writeSyncWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
|
|
NSSet<NSNumber *> *_Nullable oldTimestamps = [transaction objectForKey:recipientId inCollection:collection];
|
|
NSMutableSet<NSNumber *> *newTimestamps
|
|
= (oldTimestamps ? [oldTimestamps mutableCopy] : [NSMutableSet new]);
|
|
[newTimestamps addObject:@(timestamp)];
|
|
|
|
[transaction setObject:newTimestamps forKey:recipientId inCollection:collection];
|
|
}];
|
|
|
|
[self process];
|
|
});
|
|
}
|
|
|
|
- (void)dequeueReceiptsWithRecipientId:(NSString *)recipientId
|
|
timestamps:(NSSet<NSNumber *> *)timestamps
|
|
receiptType:(OWSReceiptType)receiptType {
|
|
NSString *collection = [self collectionForReceiptType:receiptType];
|
|
|
|
if (recipientId.length < 1) {
|
|
return;
|
|
}
|
|
if (timestamps.count < 1) {
|
|
return;
|
|
}
|
|
dispatch_async(self.serialQueue, ^{
|
|
[LKStorage writeSyncWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
|
|
NSSet<NSNumber *> *_Nullable oldTimestamps = [transaction objectForKey:recipientId inCollection:collection];
|
|
NSMutableSet<NSNumber *> *newTimestamps
|
|
= (oldTimestamps ? [oldTimestamps mutableCopy] : [NSMutableSet new]);
|
|
[newTimestamps minusSet:timestamps];
|
|
|
|
if (newTimestamps.count > 0) {
|
|
[transaction setObject:newTimestamps forKey:recipientId inCollection:collection];
|
|
} else {
|
|
[transaction removeObjectForKey:recipientId inCollection:collection];
|
|
}
|
|
}];
|
|
});
|
|
}
|
|
|
|
- (void)reachabilityChanged
|
|
{
|
|
[self process];
|
|
}
|
|
|
|
- (NSString *)collectionForReceiptType:(OWSReceiptType)receiptType {
|
|
switch (receiptType) {
|
|
case OWSReceiptType_Delivery:
|
|
return kOutgoingDeliveryReceiptManagerCollection;
|
|
case OWSReceiptType_Read:
|
|
return kOutgoingReadReceiptManagerCollection;
|
|
}
|
|
}
|
|
|
|
@end
|
|
|
|
NS_ASSUME_NONNULL_END
|