Implement message syncing

pull/333/head
Niels Andriesse 4 years ago
parent 74fd3eb812
commit eeef067f57

@ -27,16 +27,21 @@ extension Storage {
/// Returns the ID of the `TSIncomingMessage` that was constructed.
public func persist(_ message: VisibleMessage, quotedMessage: TSQuotedMessage?, linkPreview: OWSLinkPreview?, groupPublicKey: String?, openGroupID: String?, using transaction: Any) -> String? {
let transaction = transaction as! YapDatabaseReadWriteTransaction
guard let threadID = getOrCreateThread(for: message.sender!, groupPublicKey: groupPublicKey, openGroupID: openGroupID, using: transaction),
guard let threadID = getOrCreateThread(for: message.syncTarget ?? message.sender!, groupPublicKey: groupPublicKey, openGroupID: openGroupID, using: transaction),
let thread = TSThread.fetch(uniqueId: threadID, transaction: transaction) else { return nil }
let message = TSIncomingMessage.from(message, quotedMessage: quotedMessage, linkPreview: linkPreview, associatedWith: thread)
message.save(with: transaction)
message.attachments(with: transaction).forEach { attachment in
attachment.albumMessageId = message.uniqueId!
let tsMessage: TSMessage
if message.sender == getUserPublicKey() {
tsMessage = TSOutgoingMessage.from(message, associatedWith: thread, using: transaction)
} else {
tsMessage = TSIncomingMessage.from(message, quotedMessage: quotedMessage, linkPreview: linkPreview, associatedWith: thread)
}
tsMessage.save(with: transaction)
tsMessage.attachments(with: transaction).forEach { attachment in
attachment.albumMessageId = tsMessage.uniqueId!
attachment.save(with: transaction)
}
DispatchQueue.main.async { message.touch() } // FIXME: Hack for a thread updating issue
return message.uniqueId!
DispatchQueue.main.async { tsMessage.touch() } // FIXME: Hack for a thread updating issue
return tsMessage.uniqueId!
}
/// Returns the IDs of the saved attachments.
@ -49,22 +54,22 @@ extension Storage {
}
/// Also touches the associated message.
public func setAttachmentState(to state: TSAttachmentPointerState, for pointer: TSAttachmentPointer, associatedWith tsIncomingMessageID: String, using transaction: Any) {
public func setAttachmentState(to state: TSAttachmentPointerState, for pointer: TSAttachmentPointer, associatedWith tsMessageID: String, using transaction: Any) {
let transaction = transaction as! YapDatabaseReadWriteTransaction
// Workaround for some YapDatabase funkiness where pointer at this point can actually be a TSAttachmentStream
guard pointer.responds(to: #selector(setter: TSAttachmentPointer.state)) else { return }
pointer.state = state
pointer.save(with: transaction)
guard let tsIncomingMessage = TSIncomingMessage.fetch(uniqueId: tsIncomingMessageID, transaction: transaction) else { return }
tsIncomingMessage.touch(with: transaction)
guard let tsMessage = TSMessage.fetch(uniqueId: tsMessageID, transaction: transaction) else { return }
tsMessage.touch(with: transaction)
}
/// Also touches the associated message.
public func persist(_ stream: TSAttachmentStream, associatedWith tsIncomingMessageID: String, using transaction: Any) {
public func persist(_ stream: TSAttachmentStream, associatedWith tsMessageID: String, using transaction: Any) {
let transaction = transaction as! YapDatabaseReadWriteTransaction
stream.save(with: transaction)
guard let tsIncomingMessage = TSIncomingMessage.fetch(uniqueId: tsIncomingMessageID, transaction: transaction) else { return }
tsIncomingMessage.touch(with: transaction)
guard let tsMessage = TSMessage.fetch(uniqueId: tsMessageID, transaction: transaction) else { return }
tsMessage.touch(with: transaction)
}
private static let receivedMessageTimestampsCollection = "ReceivedMessageTimestampsCollection"

@ -4,7 +4,7 @@ import SignalCoreKit
public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility
public let attachmentID: String
public let tsIncomingMessageID: String
public let tsMessageID: String
public var delegate: JobDelegate?
public var id: String?
public var failureCount: UInt = 0
@ -24,25 +24,25 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject
public static let maxFailureCount: UInt = 20
// MARK: Initialization
public init(attachmentID: String, tsIncomingMessageID: String) {
public init(attachmentID: String, tsMessageID: String) {
self.attachmentID = attachmentID
self.tsIncomingMessageID = tsIncomingMessageID
self.tsMessageID = tsMessageID
}
// MARK: Coding
public init?(coder: NSCoder) {
guard let attachmentID = coder.decodeObject(forKey: "attachmentID") as! String?,
let tsIncomingMessageID = coder.decodeObject(forKey: "tsIncomingMessageID") as! String?,
let tsMessageID = coder.decodeObject(forKey: "tsIncomingMessageID") as! String?,
let id = coder.decodeObject(forKey: "id") as! String? else { return nil }
self.attachmentID = attachmentID
self.tsIncomingMessageID = tsIncomingMessageID
self.tsMessageID = tsMessageID
self.id = id
self.failureCount = coder.decodeObject(forKey: "failureCount") as! UInt? ?? 0
}
public func encode(with coder: NSCoder) {
coder.encode(attachmentID, forKey: "attachmentID")
coder.encode(tsIncomingMessageID, forKey: "tsIncomingMessageID")
coder.encode(tsMessageID, forKey: "tsIncomingMessageID")
coder.encode(id, forKey: "id")
coder.encode(failureCount, forKey: "failureCount")
}
@ -54,21 +54,21 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject
}
let storage = SNMessagingKitConfiguration.shared.storage
storage.write(with: { transaction in
storage.setAttachmentState(to: .downloading, for: pointer, associatedWith: self.tsIncomingMessageID, using: transaction)
storage.setAttachmentState(to: .downloading, for: pointer, associatedWith: self.tsMessageID, using: transaction)
}, completion: { })
let temporaryFilePath = URL(fileURLWithPath: OWSTemporaryDirectoryAccessibleAfterFirstAuth() + UUID().uuidString)
let handleFailure: (Swift.Error) -> Void = { error in // Intentionally capture self
OWSFileSystem.deleteFile(temporaryFilePath.absoluteString)
if let error = error as? Error, case .noAttachment = error {
storage.write(with: { transaction in
storage.setAttachmentState(to: .failed, for: pointer, associatedWith: self.tsIncomingMessageID, using: transaction)
storage.setAttachmentState(to: .failed, for: pointer, associatedWith: self.tsMessageID, using: transaction)
}, completion: { })
self.handlePermanentFailure(error: error)
} else if let error = error as? DotNetAPI.Error, case .parsingFailed = error {
// No need to retry if the response is invalid. Most likely this means we (incorrectly)
// got a "Cannot GET ..." error from the file server.
storage.write(with: { transaction in
storage.setAttachmentState(to: .failed, for: pointer, associatedWith: self.tsIncomingMessageID, using: transaction)
storage.setAttachmentState(to: .failed, for: pointer, associatedWith: self.tsMessageID, using: transaction)
}, completion: { })
self.handlePermanentFailure(error: error)
} else {
@ -99,7 +99,7 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject
}
OWSFileSystem.deleteFile(temporaryFilePath.absoluteString)
storage.write(with: { transaction in
storage.persist(stream, associatedWith: self.tsIncomingMessageID, using: transaction)
storage.persist(stream, associatedWith: self.tsMessageID, using: transaction)
}, completion: { })
}.catch(on: DispatchQueue.global()) { error in
handleFailure(error)

@ -4,8 +4,18 @@ import SessionUtilitiesKit
@objc(from:associatedWith:)
static func from(_ visibleMessage: VisibleMessage, associatedWith thread: TSThread) -> TSOutgoingMessage {
return from(visibleMessage, associatedWith: thread, using: nil)
}
static func from(_ visibleMessage: VisibleMessage, associatedWith thread: TSThread, using transaction: YapDatabaseReadWriteTransaction? = nil) -> TSOutgoingMessage {
var expiration: UInt32 = 0
if let disappearingMessagesConfiguration = OWSDisappearingMessagesConfiguration.fetch(uniqueId: thread.uniqueId!) {
let disappearingMessagesConfigurationOrNil: OWSDisappearingMessagesConfiguration?
if let transaction = transaction {
disappearingMessagesConfigurationOrNil = OWSDisappearingMessagesConfiguration.fetch(uniqueId: thread.uniqueId!, transaction: transaction)
} else {
disappearingMessagesConfigurationOrNil = OWSDisappearingMessagesConfiguration.fetch(uniqueId: thread.uniqueId!)
}
if let disappearingMessagesConfiguration = disappearingMessagesConfigurationOrNil {
expiration = disappearingMessagesConfiguration.isEnabled ? disappearingMessagesConfiguration.durationSeconds : 0
}
return TSOutgoingMessage(

@ -2,6 +2,10 @@ import SessionUtilitiesKit
@objc(SNVisibleMessage)
public final class VisibleMessage : Message {
/// In the case of a sync message, the public key of the person the message was targeted at.
///
/// - Note: `nil` if this isn't a sync message.
public var syncTarget: String?
@objc public var text: String?
@objc public var attachmentIDs: [String] = []
@objc public var quote: Quote?
@ -9,6 +13,8 @@ public final class VisibleMessage : Message {
@objc public var contact: Contact?
@objc public var profile: Profile?
public override var isSelfSendValid: Bool { true }
// MARK: Initialization
public override init() { super.init() }
@ -23,6 +29,7 @@ public final class VisibleMessage : Message {
// MARK: Coding
public required init?(coder: NSCoder) {
super.init(coder: coder)
if let syncTarget = coder.decodeObject(forKey: "syncTarget") as! String? { self.syncTarget = syncTarget }
if let text = coder.decodeObject(forKey: "body") as! String? { self.text = text }
if let attachmentIDs = coder.decodeObject(forKey: "attachments") as! [String]? { self.attachmentIDs = attachmentIDs }
if let quote = coder.decodeObject(forKey: "quote") as! Quote? { self.quote = quote }
@ -33,6 +40,7 @@ public final class VisibleMessage : Message {
public override func encode(with coder: NSCoder) {
super.encode(with: coder)
coder.encode(syncTarget, forKey: "syncTarget")
coder.encode(text, forKey: "body")
coder.encode(attachmentIDs, forKey: "attachments")
coder.encode(quote, forKey: "quote")
@ -51,6 +59,7 @@ public final class VisibleMessage : Message {
if let linkPreviewProto = dataMessage.preview.first, let linkPreview = LinkPreview.fromProto(linkPreviewProto) { result.linkPreview = linkPreview }
// TODO: Contact
if let profile = Profile.fromProto(dataMessage) { result.profile = profile }
result.syncTarget = dataMessage.syncTarget
return result
}
@ -101,6 +110,10 @@ public final class VisibleMessage : Message {
SNLog("Couldn't construct visible message proto from: \(self).")
return nil
}
// Sync target
if let syncTarget = syncTarget {
dataMessage.setSyncTarget(syncTarget)
}
// Build
do {
proto.setDataMessage(try dataMessage.build())

@ -2695,6 +2695,9 @@ extension SNProtoDataMessageClosedGroupUpdateV2.SNProtoDataMessageClosedGroupUpd
if let _value = closedGroupUpdateV2 {
builder.setClosedGroupUpdateV2(_value)
}
if let _value = syncTarget {
builder.setSyncTarget(_value)
}
if let _value = publicChatInfo {
builder.setPublicChatInfo(_value)
}
@ -2773,6 +2776,10 @@ extension SNProtoDataMessageClosedGroupUpdateV2.SNProtoDataMessageClosedGroupUpd
proto.closedGroupUpdateV2 = valueParam.proto
}
@objc public func setSyncTarget(_ valueParam: String) {
proto.syncTarget = valueParam
}
@objc public func setPublicChatInfo(_ valueParam: SNProtoPublicChatInfo) {
proto.publicChatInfo = valueParam.proto
}
@ -2845,6 +2852,16 @@ extension SNProtoDataMessageClosedGroupUpdateV2.SNProtoDataMessageClosedGroupUpd
return proto.hasTimestamp
}
@objc public var syncTarget: String? {
guard proto.hasSyncTarget else {
return nil
}
return proto.syncTarget
}
@objc public var hasSyncTarget: Bool {
return proto.hasSyncTarget
}
private init(proto: SessionProtos_DataMessage,
attachments: [SNProtoAttachmentPointer],
group: SNProtoGroupContext?,

File diff suppressed because it is too large Load Diff

@ -1,10 +1,9 @@
// DO NOT EDIT.
// swift-format-ignore-file
//
// Generated by the Swift generator plugin for the protocol buffer compiler.
// Source: WebSocketResources.proto
//
// For information on using the generated types, please see the documentation:
// For information on using the generated types, please see the documenation:
// https://github.com/apple/swift-protobuf/
//*
@ -21,7 +20,7 @@ import SwiftProtobuf
// If the compiler emits an error on this type, it is because this file
// was generated by a version of the `protoc` Swift plug-in that is
// incompatible with the version of SwiftProtobuf to which you are linking.
// Please ensure that you are building against the same version of the API
// Please ensure that your are building against the same version of the API
// that was used to generate this file.
fileprivate struct _GeneratedWithProtocGenSwiftVersion: SwiftProtobuf.ProtobufAPIVersionCheck {
struct _2: SwiftProtobuf.ProtobufAPIVersion_2 {}
@ -146,31 +145,31 @@ struct WebSocketProtos_WebSocketMessage {
/// @required
var type: WebSocketProtos_WebSocketMessage.TypeEnum {
get {return _type ?? .unknown}
set {_type = newValue}
get {return _storage._type ?? .unknown}
set {_uniqueStorage()._type = newValue}
}
/// Returns true if `type` has been explicitly set.
var hasType: Bool {return self._type != nil}
var hasType: Bool {return _storage._type != nil}
/// Clears the value of `type`. Subsequent reads from it will return its default value.
mutating func clearType() {self._type = nil}
mutating func clearType() {_uniqueStorage()._type = nil}
var request: WebSocketProtos_WebSocketRequestMessage {
get {return _request ?? WebSocketProtos_WebSocketRequestMessage()}
set {_request = newValue}
get {return _storage._request ?? WebSocketProtos_WebSocketRequestMessage()}
set {_uniqueStorage()._request = newValue}
}
/// Returns true if `request` has been explicitly set.
var hasRequest: Bool {return self._request != nil}
var hasRequest: Bool {return _storage._request != nil}
/// Clears the value of `request`. Subsequent reads from it will return its default value.
mutating func clearRequest() {self._request = nil}
mutating func clearRequest() {_uniqueStorage()._request = nil}
var response: WebSocketProtos_WebSocketResponseMessage {
get {return _response ?? WebSocketProtos_WebSocketResponseMessage()}
set {_response = newValue}
get {return _storage._response ?? WebSocketProtos_WebSocketResponseMessage()}
set {_uniqueStorage()._response = newValue}
}
/// Returns true if `response` has been explicitly set.
var hasResponse: Bool {return self._response != nil}
var hasResponse: Bool {return _storage._response != nil}
/// Clears the value of `response`. Subsequent reads from it will return its default value.
mutating func clearResponse() {self._response = nil}
mutating func clearResponse() {_uniqueStorage()._response = nil}
var unknownFields = SwiftProtobuf.UnknownStorage()
@ -205,9 +204,7 @@ struct WebSocketProtos_WebSocketMessage {
init() {}
fileprivate var _type: WebSocketProtos_WebSocketMessage.TypeEnum? = nil
fileprivate var _request: WebSocketProtos_WebSocketRequestMessage? = nil
fileprivate var _response: WebSocketProtos_WebSocketResponseMessage? = nil
fileprivate var _storage = _StorageClass.defaultInstance
}
#if swift(>=4.2)
@ -336,34 +333,70 @@ extension WebSocketProtos_WebSocketMessage: SwiftProtobuf.Message, SwiftProtobuf
3: .same(proto: "response"),
]
fileprivate class _StorageClass {
var _type: WebSocketProtos_WebSocketMessage.TypeEnum? = nil
var _request: WebSocketProtos_WebSocketRequestMessage? = nil
var _response: WebSocketProtos_WebSocketResponseMessage? = nil
static let defaultInstance = _StorageClass()
private init() {}
init(copying source: _StorageClass) {
_type = source._type
_request = source._request
_response = source._response
}
}
fileprivate mutating func _uniqueStorage() -> _StorageClass {
if !isKnownUniquelyReferenced(&_storage) {
_storage = _StorageClass(copying: _storage)
}
return _storage
}
mutating func decodeMessage<D: SwiftProtobuf.Decoder>(decoder: inout D) throws {
while let fieldNumber = try decoder.nextFieldNumber() {
switch fieldNumber {
case 1: try decoder.decodeSingularEnumField(value: &self._type)
case 2: try decoder.decodeSingularMessageField(value: &self._request)
case 3: try decoder.decodeSingularMessageField(value: &self._response)
default: break
_ = _uniqueStorage()
try withExtendedLifetime(_storage) { (_storage: _StorageClass) in
while let fieldNumber = try decoder.nextFieldNumber() {
switch fieldNumber {
case 1: try decoder.decodeSingularEnumField(value: &_storage._type)
case 2: try decoder.decodeSingularMessageField(value: &_storage._request)
case 3: try decoder.decodeSingularMessageField(value: &_storage._response)
default: break
}
}
}
}
func traverse<V: SwiftProtobuf.Visitor>(visitor: inout V) throws {
if let v = self._type {
try visitor.visitSingularEnumField(value: v, fieldNumber: 1)
}
if let v = self._request {
try visitor.visitSingularMessageField(value: v, fieldNumber: 2)
}
if let v = self._response {
try visitor.visitSingularMessageField(value: v, fieldNumber: 3)
try withExtendedLifetime(_storage) { (_storage: _StorageClass) in
if let v = _storage._type {
try visitor.visitSingularEnumField(value: v, fieldNumber: 1)
}
if let v = _storage._request {
try visitor.visitSingularMessageField(value: v, fieldNumber: 2)
}
if let v = _storage._response {
try visitor.visitSingularMessageField(value: v, fieldNumber: 3)
}
}
try unknownFields.traverse(visitor: &visitor)
}
static func ==(lhs: WebSocketProtos_WebSocketMessage, rhs: WebSocketProtos_WebSocketMessage) -> Bool {
if lhs._type != rhs._type {return false}
if lhs._request != rhs._request {return false}
if lhs._response != rhs._response {return false}
if lhs._storage !== rhs._storage {
let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in
let _storage = _args.0
let rhs_storage = _args.1
if _storage._type != rhs_storage._type {return false}
if _storage._request != rhs_storage._request {return false}
if _storage._response != rhs_storage._response {return false}
return true
}
if !storagesAreEqual {return false}
}
if lhs.unknownFields != rhs.unknownFields {return false}
return true
}

@ -204,6 +204,7 @@ message DataMessage {
repeated Preview preview = 10;
optional LokiProfile profile = 101;
optional ClosedGroupUpdateV2 closedGroupUpdateV2 = 104;
optional string syncTarget = 105;
optional PublicChatInfo publicChatInfo = 999;
}

@ -192,7 +192,7 @@ extension MessageReceiver {
}
}
// Get or create thread
guard let threadID = storage.getOrCreateThread(for: message.sender!, groupPublicKey: message.groupPublicKey, openGroupID: openGroupID, using: transaction) else { throw Error.noThread }
guard let threadID = storage.getOrCreateThread(for: message.syncTarget ?? message.sender!, groupPublicKey: message.groupPublicKey, openGroupID: openGroupID, using: transaction) else { throw Error.noThread }
// Parse quote if needed
var tsQuotedMessage: TSQuotedMessage? = nil
if message.quote != nil && proto.dataMessage?.quote != nil, let thread = TSThread.fetch(uniqueId: threadID, transaction: transaction) {
@ -210,12 +210,12 @@ extension MessageReceiver {
}
}
// Persist the message
guard let tsIncomingMessageID = storage.persist(message, quotedMessage: tsQuotedMessage, linkPreview: owsLinkPreview,
guard let tsMessageID = storage.persist(message, quotedMessage: tsQuotedMessage, linkPreview: owsLinkPreview,
groupPublicKey: message.groupPublicKey, openGroupID: openGroupID, using: transaction) else { throw Error.noThread }
message.threadID = threadID
// Start attachment downloads if needed
attachmentsToDownload.forEach { attachmentID in
let downloadJob = AttachmentDownloadJob(attachmentID: attachmentID, tsIncomingMessageID: tsIncomingMessageID)
let downloadJob = AttachmentDownloadJob(attachmentID: attachmentID, tsMessageID: tsMessageID)
if isMainAppAndActive {
JobQueue.shared.add(downloadJob, using: transaction)
} else {
@ -227,10 +227,10 @@ extension MessageReceiver {
cancelTypingIndicatorsIfNeeded(for: message.sender!)
}
// Notify the user if needed
guard (isMainAppAndActive || isBackgroundPoll), let tsIncomingMessage = TSIncomingMessage.fetch(uniqueId: tsIncomingMessageID, transaction: transaction),
let thread = TSThread.fetch(uniqueId: threadID, transaction: transaction) else { return tsIncomingMessageID }
guard (isMainAppAndActive || isBackgroundPoll), let tsIncomingMessage = TSMessage.fetch(uniqueId: tsMessageID, transaction: transaction) as? TSIncomingMessage,
let thread = TSThread.fetch(uniqueId: threadID, transaction: transaction) else { return tsMessageID }
SSKEnvironment.shared.notificationsManager!.notifyUser(for: tsIncomingMessage, in: thread, transaction: transaction)
return tsIncomingMessageID
return tsMessageID
}
private static func handleClosedGroupUpdateV2(_ message: ClosedGroupUpdate, using transaction: Any) {

@ -105,7 +105,7 @@ public final class MessageSender : NSObject {
}
// MARK: One-on-One Chats & Closed Groups
internal static func sendToSnodeDestination(_ destination: Message.Destination, message: Message, using transaction: Any) -> Promise<Void> {
internal static func sendToSnodeDestination(_ destination: Message.Destination, message: Message, using transaction: Any, isSyncMessage: Bool = false) -> Promise<Void> {
let (promise, seal) = Promise<Void>.pending()
let storage = SNMessagingKitConfiguration.shared.storage
let transaction = transaction as! YapDatabaseReadWriteTransaction
@ -137,8 +137,8 @@ public final class MessageSender : NSObject {
}
// Validate the message
guard message.isValid else { handleFailure(with: Error.invalidMessage, using: transaction); return promise }
// Stop here if this is a self-send (unless it's a configuration message)
guard !isSelfSend || message is ConfigurationMessage else {
// Stop here if this is a self-send (unless it's a configuration message or a sync message)
guard !isSelfSend || message is ConfigurationMessage || isSyncMessage else {
storage.write(with: { transaction in
MessageSender.handleSuccessfulMessageSend(message, to: destination, using: transaction)
seal.fulfill(())
@ -352,6 +352,15 @@ public final class MessageSender : NSObject {
tsMessage.update(withSentRecipient: recipient, wasSentByUD: true, transaction: transaction as! YapDatabaseReadWriteTransaction)
}
OWSDisappearingMessagesJob.shared().startAnyExpiration(for: tsMessage, expirationStartedAt: NSDate.millisecondTimestamp(), transaction: transaction as! YapDatabaseReadWriteTransaction)
// Sync the message if:
// it wasn't a self-send
// it was a visible message
let userPublicKey = getUserHexEncodedPublicKey()
if case .contact(let publicKey) = destination, publicKey != userPublicKey, let message = message as? VisibleMessage {
message.syncTarget = publicKey
// FIXME: Make this a job
sendToSnodeDestination(.contact(publicKey: userPublicKey), message: message, using: transaction, isSyncMessage: true).retainUntilComplete()
}
}
public static func handleFailedMessageSend(_ message: Message, with error: Swift.Error, using transaction: Any) {

Loading…
Cancel
Save