Use transactions in the jobs.

// FREEBIE
pull/1/head
Matthew Chen 8 years ago
parent 96dc0e4fdb
commit bbc7c44c93

@ -157,6 +157,8 @@ typedef NS_ENUM(NSInteger, TSGroupMetaMessage) {
// This isn't a perfect arrangement, but in practice this will prevent // This isn't a perfect arrangement, but in practice this will prevent
// data loss and will resolve all known issues. // data loss and will resolve all known issues.
- (void)updateWithMessageState:(TSOutgoingMessageState)messageState; - (void)updateWithMessageState:(TSOutgoingMessageState)messageState;
- (void)updateWithMessageState:(TSOutgoingMessageState)messageState
transaction:(YapDatabaseReadWriteTransaction *)transaction;
- (void)updateWithSendingError:(NSError *)error; - (void)updateWithSendingError:(NSError *)error;
- (void)updateWithHasSyncedTranscript:(BOOL)hasSyncedTranscript; - (void)updateWithHasSyncedTranscript:(BOOL)hasSyncedTranscript;
- (void)updateWithCustomMessage:(NSString *)customMessage transaction:(YapDatabaseReadWriteTransaction *)transaction; - (void)updateWithCustomMessage:(NSString *)customMessage transaction:(YapDatabaseReadWriteTransaction *)transaction;

@ -257,11 +257,19 @@ NSString *const kTSOutgoingMessageSentRecipientAll = @"kTSOutgoingMessageSentRec
- (void)updateWithMessageState:(TSOutgoingMessageState)messageState - (void)updateWithMessageState:(TSOutgoingMessageState)messageState
{ {
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self updateWithMessageState:messageState transaction:transaction];
}];
}
- (void)updateWithMessageState:(TSOutgoingMessageState)messageState
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(transaction);
[self applyChangeToSelfAndLatestOutgoingMessage:transaction [self applyChangeToSelfAndLatestOutgoingMessage:transaction
changeBlock:^(TSOutgoingMessage *message) { changeBlock:^(TSOutgoingMessage *message) {
[message setMessageState:messageState]; [message setMessageState:messageState];
}]; }];
}];
} }
- (void)updateWithHasSyncedTranscript:(BOOL)hasSyncedTranscript - (void)updateWithHasSyncedTranscript:(BOOL)hasSyncedTranscript

@ -1,38 +1,39 @@
// Created by Michael Kirk on 9/23/16. //
// Copyright © 2016 Open Whisper Systems. All rights reserved. // Copyright (c) 2017 Open Whisper Systems. All rights reserved.
//
NS_ASSUME_NONNULL_BEGIN NS_ASSUME_NONNULL_BEGIN
@class TSStorageManager; @class TSStorageManager;
@class TSMessage; @class TSMessage;
@class TSThread; @class TSThread;
@class YapDatabaseReadTransaction;
@class YapDatabaseReadWriteTransaction;
@interface OWSDisappearingMessagesFinder : NSObject @interface OWSDisappearingMessagesFinder : NSObject
- (instancetype)init NS_UNAVAILABLE; - (void)enumerateExpiredMessagesWithBlock:(void (^_Nonnull)(TSMessage *message))block
- (instancetype)initWithStorageManager:(TSStorageManager *)storageManager NS_DESIGNATED_INITIALIZER; transaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction;
- (void)enumerateUnstartedExpiringMessagesInThread:(TSThread *)thread
+ (instancetype)defaultInstance; block:(void (^_Nonnull)(TSMessage *message))block
transaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction;
- (void)enumerateExpiredMessagesWithBlock:(void (^_Nonnull)(TSMessage *message))block;
- (void)enumerateUnstartedExpiringMessagesInThread:(TSThread *)thread block:(void (^_Nonnull)(TSMessage *message))block;
/** /**
* @return * @return
* uint64_t millisecond timestamp wrapped in a number. Retrieve with `unsignedLongLongvalue`. * uint64_t millisecond timestamp wrapped in a number. Retrieve with `unsignedLongLongvalue`.
* or nil if there are no upcoming expired messages * or nil if there are no upcoming expired messages
*/ */
- (nullable NSNumber *)nextExpirationTimestamp; - (nullable NSNumber *)nextExpirationTimestampWithTransaction:(YapDatabaseReadTransaction *_Nonnull)transaction;
/** /**
* Database extensions required for class to work. * Database extensions required for class to work.
*/ */
- (void)asyncRegisterDatabaseExtensions; + (void)asyncRegisterDatabaseExtensions:(TSStorageManager *)storageManager;
/** /**
* Only use the sync version for testing, generally we'll want to register extensions async * Only use the sync version for testing, generally we'll want to register extensions async
*/ */
- (void)blockingRegisterDatabaseExtensions; + (void)blockingRegisterDatabaseExtensions:(TSStorageManager *)storageManager;
@end @end

@ -1,5 +1,6 @@
// Created by Michael Kirk on 9/23/16. //
// Copyright © 2016 Open Whisper Systems. All rights reserved. // Copyright (c) 2017 Open Whisper Systems. All rights reserved.
//
#import "OWSDisappearingMessagesFinder.h" #import "OWSDisappearingMessagesFinder.h"
#import "NSDate+millisecondTimeStamp.h" #import "NSDate+millisecondTimeStamp.h"
@ -17,40 +18,13 @@ static NSString *const OWSDisappearingMessageFinderThreadIdColumn = @"thread_id"
static NSString *const OWSDisappearingMessageFinderExpiresAtColumn = @"expires_at"; static NSString *const OWSDisappearingMessageFinderExpiresAtColumn = @"expires_at";
static NSString *const OWSDisappearingMessageFinderExpiresAtIndex = @"index_messages_on_expires_at_and_thread_id_v2"; static NSString *const OWSDisappearingMessageFinderExpiresAtIndex = @"index_messages_on_expires_at_and_thread_id_v2";
@interface OWSDisappearingMessagesFinder ()
@property (nonatomic, readonly) TSStorageManager *storageManager;
@property (nonatomic, readonly) YapDatabaseConnection *dbConnection;
@end
@implementation OWSDisappearingMessagesFinder @implementation OWSDisappearingMessagesFinder
- (instancetype)initWithStorageManager:(TSStorageManager *)storageManager
{
self = [super init];
if (!self) {
return self;
}
_storageManager = storageManager;
_dbConnection = [storageManager newDatabaseConnection];
return self;
}
+ (instancetype)defaultInstance
{
static OWSDisappearingMessagesFinder *defaultInstance = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
defaultInstance = [[self alloc] initWithStorageManager:[TSStorageManager sharedManager]];
});
return defaultInstance;
}
- (NSArray<NSString *> *)fetchUnstartedExpiringMessageIdsInThread:(TSThread *)thread - (NSArray<NSString *> *)fetchUnstartedExpiringMessageIdsInThread:(TSThread *)thread
transaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
OWSAssert(transaction);
NSMutableArray<NSString *> *messageIds = [NSMutableArray new]; NSMutableArray<NSString *> *messageIds = [NSMutableArray new];
NSString *formattedString = [NSString stringWithFormat:@"WHERE %@ = 0 AND %@ = \"%@\"", NSString *formattedString = [NSString stringWithFormat:@"WHERE %@ = 0 AND %@ = \"%@\"",
OWSDisappearingMessageFinderExpiresAtColumn, OWSDisappearingMessageFinderExpiresAtColumn,
@ -58,19 +32,19 @@ static NSString *const OWSDisappearingMessageFinderExpiresAtIndex = @"index_mess
thread.uniqueId]; thread.uniqueId];
YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString]; YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString];
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
[[transaction ext:OWSDisappearingMessageFinderExpiresAtIndex] [[transaction ext:OWSDisappearingMessageFinderExpiresAtIndex]
enumerateKeysMatchingQuery:query enumerateKeysMatchingQuery:query
usingBlock:^void(NSString *collection, NSString *key, BOOL *stop) { usingBlock:^void(NSString *collection, NSString *key, BOOL *stop) {
[messageIds addObject:key]; [messageIds addObject:key];
}]; }];
}];
return [messageIds copy]; return [messageIds copy];
} }
- (NSArray<NSString *> *)fetchExpiredMessageIds - (NSArray<NSString *> *)fetchExpiredMessageIdsWithTransaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
OWSAssert(transaction);
NSMutableArray<NSString *> *messageIds = [NSMutableArray new]; NSMutableArray<NSString *> *messageIds = [NSMutableArray new];
uint64_t now = [NSDate ows_millisecondTimeStamp]; uint64_t now = [NSDate ows_millisecondTimeStamp];
@ -80,33 +54,31 @@ static NSString *const OWSDisappearingMessageFinderExpiresAtIndex = @"index_mess
OWSDisappearingMessageFinderExpiresAtColumn, OWSDisappearingMessageFinderExpiresAtColumn,
now]; now];
YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString]; YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString];
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
[[transaction ext:OWSDisappearingMessageFinderExpiresAtIndex] [[transaction ext:OWSDisappearingMessageFinderExpiresAtIndex]
enumerateKeysMatchingQuery:query enumerateKeysMatchingQuery:query
usingBlock:^void(NSString *collection, NSString *key, BOOL *stop) { usingBlock:^void(NSString *collection, NSString *key, BOOL *stop) {
[messageIds addObject:key]; [messageIds addObject:key];
}]; }];
}];
return [messageIds copy]; return [messageIds copy];
} }
- (nullable NSNumber *)nextExpirationTimestamp - (nullable NSNumber *)nextExpirationTimestampWithTransaction:(YapDatabaseReadTransaction *_Nonnull)transaction
{ {
OWSAssert(transaction);
NSString *formattedString = [NSString stringWithFormat:@"WHERE %@ > 0 ORDER BY %@ ASC", NSString *formattedString = [NSString stringWithFormat:@"WHERE %@ > 0 ORDER BY %@ ASC",
OWSDisappearingMessageFinderExpiresAtColumn, OWSDisappearingMessageFinderExpiresAtColumn,
OWSDisappearingMessageFinderExpiresAtColumn]; OWSDisappearingMessageFinderExpiresAtColumn];
YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString]; YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString];
__block TSMessage *firstMessage; __block TSMessage *firstMessage;
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
[[transaction ext:OWSDisappearingMessageFinderExpiresAtIndex] [[transaction ext:OWSDisappearingMessageFinderExpiresAtIndex]
enumerateKeysAndObjectsMatchingQuery:query enumerateKeysAndObjectsMatchingQuery:query
usingBlock:^void(NSString *collection, NSString *key, id object, BOOL *stop) { usingBlock:^void(NSString *collection, NSString *key, id object, BOOL *stop) {
firstMessage = (TSMessage *)object; firstMessage = (TSMessage *)object;
*stop = YES; *stop = YES;
}]; }];
}];
if (firstMessage && firstMessage.expiresAt > 0) { if (firstMessage && firstMessage.expiresAt > 0) {
return [NSNumber numberWithUnsignedLongLong:firstMessage.expiresAt]; return [NSNumber numberWithUnsignedLongLong:firstMessage.expiresAt];
@ -115,10 +87,15 @@ static NSString *const OWSDisappearingMessageFinderExpiresAtIndex = @"index_mess
return nil; return nil;
} }
- (void)enumerateUnstartedExpiringMessagesInThread:(TSThread *)thread block:(void (^_Nonnull)(TSMessage *message))block - (void)enumerateUnstartedExpiringMessagesInThread:(TSThread *)thread
block:(void (^_Nonnull)(TSMessage *message))block
transaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
for (NSString *expiringMessageId in [self fetchUnstartedExpiringMessageIdsInThread:thread]) { OWSAssert(transaction);
TSMessage *_Nullable message = [TSMessage fetchObjectWithUniqueID:expiringMessageId];
for (NSString *expiringMessageId in
[self fetchUnstartedExpiringMessageIdsInThread:thread transaction:transaction]) {
TSMessage *_Nullable message = [TSMessage fetchObjectWithUniqueID:expiringMessageId transaction:transaction];
if ([message isKindOfClass:[TSMessage class]]) { if ([message isKindOfClass:[TSMessage class]]) {
block(message); block(message);
} else { } else {
@ -132,23 +109,30 @@ static NSString *const OWSDisappearingMessageFinderExpiresAtIndex = @"index_mess
* We don't want to instantiate potentially many messages at once. * We don't want to instantiate potentially many messages at once.
*/ */
- (NSArray<TSMessage *> *)fetchUnstartedExpiringMessagesInThread:(TSThread *)thread - (NSArray<TSMessage *> *)fetchUnstartedExpiringMessagesInThread:(TSThread *)thread
transaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
OWSAssert(transaction);
NSMutableArray<TSMessage *> *messages = [NSMutableArray new]; NSMutableArray<TSMessage *> *messages = [NSMutableArray new];
[self enumerateUnstartedExpiringMessagesInThread:thread [self enumerateUnstartedExpiringMessagesInThread:thread
block:^(TSMessage *_Nonnull message) { block:^(TSMessage *_Nonnull message) {
[messages addObject:message]; [messages addObject:message];
}]; }
transaction:transaction];
return [messages copy]; return [messages copy];
} }
- (void)enumerateExpiredMessagesWithBlock:(void (^_Nonnull)(TSMessage *message))block - (void)enumerateExpiredMessagesWithBlock:(void (^_Nonnull)(TSMessage *message))block
transaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
OWSAssert(transaction);
// Since we can't directly mutate the enumerated expired messages, we store only their ids in hopes of saving a // Since we can't directly mutate the enumerated expired messages, we store only their ids in hopes of saving a
// little memory and then enumerate the (larger) TSMessage objects one at a time. // little memory and then enumerate the (larger) TSMessage objects one at a time.
for (NSString *expiredMessageId in [self fetchExpiredMessageIds]) { for (NSString *expiredMessageId in [self fetchExpiredMessageIdsWithTransaction:transaction]) {
TSMessage *_Nullable message = [TSMessage fetchObjectWithUniqueID:expiredMessageId]; TSMessage *_Nullable message = [TSMessage fetchObjectWithUniqueID:expiredMessageId transaction:transaction];
if ([message isKindOfClass:[TSMessage class]]) { if ([message isKindOfClass:[TSMessage class]]) {
block(message); block(message);
} else { } else {
@ -161,19 +145,22 @@ static NSString *const OWSDisappearingMessageFinderExpiresAtIndex = @"index_mess
* Don't use this in production. Useful for testing. * Don't use this in production. Useful for testing.
* We don't want to instantiate potentially many messages at once. * We don't want to instantiate potentially many messages at once.
*/ */
- (NSArray<TSMessage *> *)fetchExpiredMessages - (NSArray<TSMessage *> *)fetchExpiredMessagesWithTransaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
OWSAssert(transaction);
NSMutableArray<TSMessage *> *messages = [NSMutableArray new]; NSMutableArray<TSMessage *> *messages = [NSMutableArray new];
[self enumerateExpiredMessagesWithBlock:^(TSMessage *_Nonnull message) { [self enumerateExpiredMessagesWithBlock:^(TSMessage *_Nonnull message) {
[messages addObject:message]; [messages addObject:message];
}]; }
transaction:transaction];
return [messages copy]; return [messages copy];
} }
#pragma mark - YapDatabaseExtension #pragma mark - YapDatabaseExtension
- (YapDatabaseSecondaryIndex *)indexDatabaseExtension + (YapDatabaseSecondaryIndex *)indexDatabaseExtension
{ {
YapDatabaseSecondaryIndexSetup *setup = [YapDatabaseSecondaryIndexSetup new]; YapDatabaseSecondaryIndexSetup *setup = [YapDatabaseSecondaryIndexSetup new];
[setup addColumn:OWSDisappearingMessageFinderExpiresAtColumn withType:YapDatabaseSecondaryIndexTypeInteger]; [setup addColumn:OWSDisappearingMessageFinderExpiresAtColumn withType:YapDatabaseSecondaryIndexTypeInteger];
@ -202,15 +189,15 @@ static NSString *const OWSDisappearingMessageFinderExpiresAtIndex = @"index_mess
} }
// Useful for tests, don't use in app startup path because it's slow. // Useful for tests, don't use in app startup path because it's slow.
- (void)blockingRegisterDatabaseExtensions + (void)blockingRegisterDatabaseExtensions:(TSStorageManager *)storageManager
{ {
[self.storageManager.database registerExtension:[self indexDatabaseExtension] [storageManager.database registerExtension:[self indexDatabaseExtension]
withName:OWSDisappearingMessageFinderExpiresAtIndex]; withName:OWSDisappearingMessageFinderExpiresAtIndex];
} }
- (void)asyncRegisterDatabaseExtensions + (void)asyncRegisterDatabaseExtensions:(TSStorageManager *)storageManager
{ {
[self.storageManager.database asyncRegisterExtension:[self indexDatabaseExtension] [storageManager.database asyncRegisterExtension:[self indexDatabaseExtension]
withName:OWSDisappearingMessageFinderExpiresAtIndex withName:OWSDisappearingMessageFinderExpiresAtIndex
completionBlock:^(BOOL ready) { completionBlock:^(BOOL ready) {
if (ready) { if (ready) {

@ -17,6 +17,9 @@ NS_ASSUME_NONNULL_BEGIN
@interface OWSDisappearingMessagesJob () @interface OWSDisappearingMessagesJob ()
// This property should only be accessed on the serialQueue.
@property (nonatomic, readonly) TSStorageManager *storageManager;
// This property should only be accessed on the serialQueue. // This property should only be accessed on the serialQueue.
@property (nonatomic, readonly) OWSDisappearingMessagesFinder *disappearingMessagesFinder; @property (nonatomic, readonly) OWSDisappearingMessagesFinder *disappearingMessagesFinder;
@ -48,7 +51,8 @@ NS_ASSUME_NONNULL_BEGIN
return self; return self;
} }
_disappearingMessagesFinder = [[OWSDisappearingMessagesFinder alloc] initWithStorageManager:storageManager]; _storageManager = storageManager;
_disappearingMessagesFinder = [OWSDisappearingMessagesFinder new];
OWSSingletonAssert(); OWSSingletonAssert();
@ -84,16 +88,20 @@ NS_ASSUME_NONNULL_BEGIN
uint64_t now = [NSDate ows_millisecondTimeStamp]; uint64_t now = [NSDate ows_millisecondTimeStamp];
__block uint expirationCount = 0; __block uint expirationCount = 0;
[self.storageManager.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) {
[self.disappearingMessagesFinder enumerateExpiredMessagesWithBlock:^(TSMessage *message) { [self.disappearingMessagesFinder enumerateExpiredMessagesWithBlock:^(TSMessage *message) {
// sanity check // sanity check
if (message.expiresAt > now) { if (message.expiresAt > now) {
DDLogError(@"%@ Refusing to remove message which doesn't expire until: %lld", self.tag, message.expiresAt); DDLogError(
@"%@ Refusing to remove message which doesn't expire until: %lld", self.tag, message.expiresAt);
return; return;
} }
DDLogDebug(@"%@ Removing message which expired at: %lld", self.tag, message.expiresAt); DDLogDebug(@"%@ Removing message which expired at: %lld", self.tag, message.expiresAt);
[message remove]; [message removeWithTransaction:transaction];
expirationCount++; expirationCount++;
}
transaction:transaction];
}]; }];
DDLogDebug(@"%@ Removed %u expired messages", self.tag, expirationCount); DDLogDebug(@"%@ Removed %u expired messages", self.tag, expirationCount);
@ -106,7 +114,11 @@ NS_ASSUME_NONNULL_BEGIN
[self run]; [self run];
uint64_t now = [NSDate ows_millisecondTimeStamp]; uint64_t now = [NSDate ows_millisecondTimeStamp];
NSNumber *nextExpirationTimestampNumber = [self.disappearingMessagesFinder nextExpirationTimestamp]; __block NSNumber *nextExpirationTimestampNumber;
[self.storageManager.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
nextExpirationTimestampNumber =
[self.disappearingMessagesFinder nextExpirationTimestampWithTransaction:transaction];
}];
if (!nextExpirationTimestampNumber) { if (!nextExpirationTimestampNumber) {
// In theory we could kill the loop here. It should resume when the next expiring message is saved, // In theory we could kill the loop here. It should resume when the next expiring message is saved,
// But this is a safeguard for any race conditions that exist while running the job as a new message is saved. // But this is a safeguard for any race conditions that exist while running the job as a new message is saved.
@ -142,15 +154,19 @@ NS_ASSUME_NONNULL_BEGIN
[self setExpirationForMessage:message expirationStartedAt:[NSDate ows_millisecondTimeStamp]]; [self setExpirationForMessage:message expirationStartedAt:[NSDate ows_millisecondTimeStamp]];
} }
+ (void)setExpirationForMessage:(TSMessage *)message expirationStartedAt:(uint64_t)expirationStartedAt - (void)setExpirationForMessage:(TSMessage *)message expirationStartedAt:(uint64_t)expirationStartedAt
{ {
dispatch_async(self.serialQueue, ^{ [self.storageManager.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) {
[[self sharedJob] setExpirationForMessage:message expirationStartedAt:expirationStartedAt]; [self setExpirationForMessage:message expirationStartedAt:expirationStartedAt transaction:transaction];
}); }];
} }
- (void)setExpirationForMessage:(TSMessage *)message expirationStartedAt:(uint64_t)expirationStartedAt - (void)setExpirationForMessage:(TSMessage *)message
expirationStartedAt:(uint64_t)expirationStartedAt
transaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
OWSAssert(transaction);
if (!message.isExpiringMessage) { if (!message.isExpiringMessage) {
return; return;
} }
@ -161,7 +177,7 @@ NS_ASSUME_NONNULL_BEGIN
// Don't clobber if multiple actions simultaneously triggered expiration. // Don't clobber if multiple actions simultaneously triggered expiration.
if (message.expireStartedAt == 0 || message.expireStartedAt > expirationStartedAt) { if (message.expireStartedAt == 0 || message.expireStartedAt > expirationStartedAt) {
message.expireStartedAt = expirationStartedAt; message.expireStartedAt = expirationStartedAt;
[message save]; [message saveWithTransaction:transaction];
} }
// Necessary that the async expiration run happens *after* the message is saved with expiration configuration. // Necessary that the async expiration run happens *after* the message is saved with expiration configuration.
@ -178,15 +194,21 @@ NS_ASSUME_NONNULL_BEGIN
- (void)setExpirationsForThread:(TSThread *)thread - (void)setExpirationsForThread:(TSThread *)thread
{ {
uint64_t now = [NSDate ows_millisecondTimeStamp]; uint64_t now = [NSDate ows_millisecondTimeStamp];
[self.storageManager.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) {
[self.disappearingMessagesFinder [self.disappearingMessagesFinder
enumerateUnstartedExpiringMessagesInThread:thread enumerateUnstartedExpiringMessagesInThread:thread
block:^(TSMessage *_Nonnull message) { block:^(TSMessage *_Nonnull message) {
DDLogWarn(@"%@ Starting expiring message which should have already " DDLogWarn(
@"%@ Starting expiring message which should have already "
@"been started.", @"been started.",
self.tag); self.tag);
// specify "now" in case D.M. have since been disabled, but we have // specify "now" in case D.M. have since been disabled, but we have
// existing unstarted expiring messages that still need to expire. // existing unstarted expiring messages that still need to expire.
[self setExpirationForMessage:message expirationStartedAt:now]; [self setExpirationForMessage:message
expirationStartedAt:now
transaction:transaction];
}
transaction:transaction];
}]; }];
} }

@ -36,33 +36,36 @@ static NSString *const OWSFailedAttachmentDownloadsJobAttachmentStateIndex = @"i
return self; return self;
} }
- (NSArray<NSString *> *)fetchAttemptingOutAttachmentIds:(YapDatabaseConnection *)dbConnection - (NSArray<NSString *> *)fetchAttemptingOutAttachmentIdsWithTransaction:
(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
OWSAssert(transaction);
NSMutableArray<NSString *> *attachmentIds = [NSMutableArray new]; NSMutableArray<NSString *> *attachmentIds = [NSMutableArray new];
NSString *formattedString = [NSString stringWithFormat:@"WHERE %@ != %d", NSString *formattedString = [NSString stringWithFormat:@"WHERE %@ != %d",
OWSFailedAttachmentDownloadsJobAttachmentStateColumn, OWSFailedAttachmentDownloadsJobAttachmentStateColumn,
(int)TSAttachmentPointerStateFailed]; (int)TSAttachmentPointerStateFailed];
YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString]; YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString];
[dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
[[transaction ext:OWSFailedAttachmentDownloadsJobAttachmentStateIndex] [[transaction ext:OWSFailedAttachmentDownloadsJobAttachmentStateIndex]
enumerateKeysMatchingQuery:query enumerateKeysMatchingQuery:query
usingBlock:^void(NSString *collection, NSString *key, BOOL *stop) { usingBlock:^void(NSString *collection, NSString *key, BOOL *stop) {
[attachmentIds addObject:key]; [attachmentIds addObject:key];
}]; }];
}];
return [attachmentIds copy]; return [attachmentIds copy];
} }
- (void)enumerateAttemptingOutAttachmentsWithBlock:(void (^_Nonnull)(TSAttachmentPointer *attachment))block - (void)enumerateAttemptingOutAttachmentsWithBlock:(void (^_Nonnull)(TSAttachmentPointer *attachment))block
transaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
YapDatabaseConnection *dbConnection = [self.storageManager newDatabaseConnection]; OWSAssert(transaction);
// Since we can't directly mutate the enumerated attachments, we store only their ids in hopes // Since we can't directly mutate the enumerated attachments, we store only their ids in hopes
// of saving a little memory and then enumerate the (larger) TSAttachment objects one at a time. // of saving a little memory and then enumerate the (larger) TSAttachment objects one at a time.
for (NSString *attachmentId in [self fetchAttemptingOutAttachmentIds:dbConnection]) { for (NSString *attachmentId in [self fetchAttemptingOutAttachmentIdsWithTransaction:transaction]) {
TSAttachmentPointer *_Nullable attachment = [TSAttachmentPointer fetchObjectWithUniqueID:attachmentId]; TSAttachmentPointer *_Nullable attachment =
[TSAttachmentPointer fetchObjectWithUniqueID:attachmentId transaction:transaction];
if ([attachment isKindOfClass:[TSAttachmentPointer class]]) { if ([attachment isKindOfClass:[TSAttachmentPointer class]]) {
block(attachment); block(attachment);
} else { } else {
@ -74,14 +77,18 @@ static NSString *const OWSFailedAttachmentDownloadsJobAttachmentStateIndex = @"i
- (void)run - (void)run
{ {
__block uint count = 0; __block uint count = 0;
[[self.storageManager newDatabaseConnection]
readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) {
[self enumerateAttemptingOutAttachmentsWithBlock:^(TSAttachmentPointer *attachment) { [self enumerateAttemptingOutAttachmentsWithBlock:^(TSAttachmentPointer *attachment) {
// sanity check // sanity check
if (attachment.state != TSAttachmentPointerStateFailed) { if (attachment.state != TSAttachmentPointerStateFailed) {
DDLogDebug(@"%@ marking attachment as failed", self.tag); DDLogDebug(@"%@ marking attachment as failed", self.tag);
attachment.state = TSAttachmentPointerStateFailed; attachment.state = TSAttachmentPointerStateFailed;
[attachment save]; [attachment saveWithTransaction:transaction];
count++; count++;
} }
}
transaction:transaction];
}]; }];
DDLogDebug(@"%@ Marked %u attachments as unsent", self.tag, count); DDLogDebug(@"%@ Marked %u attachments as unsent", self.tag, count);

@ -37,33 +37,36 @@ static NSString *const OWSFailedMessagesJobMessageStateIndex = @"index_outoing_m
return self; return self;
} }
- (NSArray<NSString *> *)fetchAttemptingOutMessageIds:(YapDatabaseConnection *)dbConnection - (NSArray<NSString *> *)fetchAttemptingOutMessageIdsWithTransaction:
(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
OWSAssert(transaction);
NSMutableArray<NSString *> *messageIds = [NSMutableArray new]; NSMutableArray<NSString *> *messageIds = [NSMutableArray new];
NSString *formattedString = [NSString stringWithFormat:@"WHERE %@ == %d", NSString *formattedString = [NSString stringWithFormat:@"WHERE %@ == %d",
OWSFailedMessagesJobMessageStateColumn, OWSFailedMessagesJobMessageStateColumn,
(int)TSOutgoingMessageStateAttemptingOut]; (int)TSOutgoingMessageStateAttemptingOut];
YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString]; YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:formattedString];
[dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
[[transaction ext:OWSFailedMessagesJobMessageStateIndex] [[transaction ext:OWSFailedMessagesJobMessageStateIndex]
enumerateKeysMatchingQuery:query enumerateKeysMatchingQuery:query
usingBlock:^void(NSString *collection, NSString *key, BOOL *stop) { usingBlock:^void(NSString *collection, NSString *key, BOOL *stop) {
[messageIds addObject:key]; [messageIds addObject:key];
}]; }];
}];
return [messageIds copy]; return [messageIds copy];
} }
- (void)enumerateAttemptingOutMessagesWithBlock:(void (^_Nonnull)(TSOutgoingMessage *message))block - (void)enumerateAttemptingOutMessagesWithBlock:(void (^_Nonnull)(TSOutgoingMessage *message))block
transaction:(YapDatabaseReadWriteTransaction *_Nonnull)transaction
{ {
YapDatabaseConnection *dbConnection = [self.storageManager newDatabaseConnection]; OWSAssert(transaction);
// Since we can't directly mutate the enumerated "attempting out" expired messages, we store only their ids in hopes // Since we can't directly mutate the enumerated "attempting out" expired messages, we store only their ids in hopes
// of saving a little memory and then enumerate the (larger) TSMessage objects one at a time. // of saving a little memory and then enumerate the (larger) TSMessage objects one at a time.
for (NSString *expiredMessageId in [self fetchAttemptingOutMessageIds:dbConnection]) { for (NSString *expiredMessageId in [self fetchAttemptingOutMessageIdsWithTransaction:transaction]) {
TSOutgoingMessage *_Nullable message = [TSOutgoingMessage fetchObjectWithUniqueID:expiredMessageId]; TSOutgoingMessage *_Nullable message =
[TSOutgoingMessage fetchObjectWithUniqueID:expiredMessageId transaction:transaction];
if ([message isKindOfClass:[TSOutgoingMessage class]]) { if ([message isKindOfClass:[TSOutgoingMessage class]]) {
block(message); block(message);
} else { } else {
@ -75,17 +78,25 @@ static NSString *const OWSFailedMessagesJobMessageStateIndex = @"index_outoing_m
- (void)run - (void)run
{ {
__block uint count = 0; __block uint count = 0;
[[self.storageManager newDatabaseConnection]
readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) {
[self enumerateAttemptingOutMessagesWithBlock:^(TSOutgoingMessage *message) { [self enumerateAttemptingOutMessagesWithBlock:^(TSOutgoingMessage *message) {
// sanity check // sanity check
OWSAssert(message.messageState == TSOutgoingMessageStateAttemptingOut); OWSAssert(message.messageState == TSOutgoingMessageStateAttemptingOut);
if (message.messageState != TSOutgoingMessageStateAttemptingOut) { if (message.messageState != TSOutgoingMessageStateAttemptingOut) {
DDLogError(@"%@ Refusing to mark as unsent message with state: %d", self.tag, (int)message.messageState); DDLogError(
@"%@ Refusing to mark as unsent message with state: %d", self.tag, (int)message.messageState);
return; return;
} }
DDLogDebug(@"%@ marking message as unsent", self.tag); DDLogDebug(@"%@ marking message as unsent: %@", self.tag, message.uniqueId);
[message updateWithMessageState:TSOutgoingMessageStateUnsent]; [message updateWithMessageState:TSOutgoingMessageStateUnsent transaction:transaction];
OWSAssert(message.messageState == TSOutgoingMessageStateUnsent);
count++; count++;
}
transaction:transaction];
}]; }];
DDLogDebug(@"%@ Marked %u messages as unsent", self.tag, count); DDLogDebug(@"%@ Marked %u messages as unsent", self.tag, count);

@ -206,8 +206,7 @@ static NSString *keychainDBPassAccount = @"TSDatabasePass";
[[OWSIncomingMessageFinder new] asyncRegisterExtension]; [[OWSIncomingMessageFinder new] asyncRegisterExtension];
[TSDatabaseView asyncRegisterSecondaryDevicesDatabaseView]; [TSDatabaseView asyncRegisterSecondaryDevicesDatabaseView];
[OWSReadReceipt asyncRegisterIndexOnSenderIdAndTimestampWithDatabase:self.database]; [OWSReadReceipt asyncRegisterIndexOnSenderIdAndTimestampWithDatabase:self.database];
OWSDisappearingMessagesFinder *finder = [[OWSDisappearingMessagesFinder alloc] initWithStorageManager:self]; [OWSDisappearingMessagesFinder asyncRegisterDatabaseExtensions:self];
[finder asyncRegisterDatabaseExtensions];
OWSFailedMessagesJob *failedMessagesJob = [[OWSFailedMessagesJob alloc] initWithStorageManager:self]; OWSFailedMessagesJob *failedMessagesJob = [[OWSFailedMessagesJob alloc] initWithStorageManager:self];
[failedMessagesJob asyncRegisterDatabaseExtensions]; [failedMessagesJob asyncRegisterDatabaseExtensions];
OWSFailedAttachmentDownloadsJob *failedAttachmentDownloadsMessagesJob = OWSFailedAttachmentDownloadsJob *failedAttachmentDownloadsMessagesJob =

Loading…
Cancel
Save