// C o p y r i g h t © 2 0 2 2 R a n g e p r o o f P t y L t d . A l l r i g h t s r e s e r v e d .
import Foundation
import Combine
import GRDB
import SessionUtilitiesKit
import SessionSnodeKit
// MARK: - L o g . C a t e g o r y
private extension Log . Category {
static let cat : Log . Category = . create ( " MessageSendJob " , defaultLevel : . info )
}
// MARK: - M e s s a g e S e n d J o b
public enum MessageSendJob : JobExecutor {
public static var maxFailureCount : Int = 10
public static var requiresThreadId : Bool = true
public static let requiresInteractionId : Bool = false // S o m e m e s s a g e s d o n ' t h a v e i n t e r a c t i o n s
public static func run (
_ job : Job ,
queue : DispatchQueue ,
success : @ escaping ( Job , Bool ) -> Void ,
failure : @ escaping ( Job , Error , Bool ) -> Void ,
deferred : @ escaping ( Job ) -> Void ,
using dependencies : Dependencies
) {
guard
let detailsData : Data = job . details ,
let details : Details = try ? JSONDecoder ( using : dependencies ) . decode ( Details . self , from : detailsData )
else { return failure ( job , JobRunnerError . missingRequiredDetails , true ) }
// / W e n e e d t o i n c l u d e ` f i l e I d s ` w h e n s e n d i n g m e s s a g e s w i t h a t t a c h m e n t s t o O p e n G r o u p s s o e x t r a c t t h e m f r o m a n y
// / a s s o c i a t e d a t t a c h m e n t s
var messageFileIds : [ String ] = [ ]
let messageType : String = {
switch details . destination {
case . syncMessage : return " \( type ( of : details . message ) ) (SyncMessage) "
default : return " \( type ( of : details . message ) ) "
}
} ( )
// / E n s u r e a n y a s s o c i a t e d a t t a c h m e n t s h a v e a l r e a d y b e e n u p l o a d e d b e f o r e s e n d i n g t h e m e s s a g e
// /
// / * * N o t e : * * R e a c t i o n s r e f e r e n c e t h e i r o r i g i n a l m e s s a g e s o w e n e e d t o i g n o r e t h i s l o g i c f o r r e a c t i o n m e s s a g e s t o e n s u r e w e d o n ' t
// / i n c o r r e c t l y r e - u p l o a d i n c o m i n g a t t a c h m e n t s t h a t t h e u s e r r e a c t e d t o , w e a l s o w a n t t o e x c l u d e " s y n c " m e s s a g e s s i n c e t h e y s h o u l d
// / a l r e a d y h a v e a t t a c h m e n t s i n a v a l i d s t a t e
if
details . message is VisibleMessage ,
( details . message as ? VisibleMessage ) ? . reaction = = nil
{
guard
let jobId : Int64 = job . id ,
let interactionId : Int64 = job . interactionId
else { return failure ( job , JobRunnerError . missingRequiredDetails , true ) }
// R e t r i e v e t h e c u r r e n t a t t a c h m e n t s t a t e
typealias AttachmentState = ( error : Error ? , pendingUploadAttachmentIds : [ String ] , preparedFileIds : [ String ] )
let attachmentState : AttachmentState = dependencies [ singleton : . storage ]
. read { db in
// I f t h e o r i g i n a l i n t e r a c t i o n n o l o n g e r e x i s t s t h e n d o n ' t b o t h e r s e n d i n g t h e m e s s a g e ( i e . t h e
// m e s s a g e w a s d e l e t e d b e f o r e i t e v e n g o t s e n t )
guard try Interaction . exists ( db , id : interactionId ) else {
Log . warn ( . cat , " Failing ( \( job . id ? ? - 1 ) ) due to missing interaction " )
return ( StorageError . objectNotFound , [ ] , [ ] )
}
// G e t t h e c u r r e n t s t a t e o f t h e a t t a c h m e n t s
let allAttachmentStateInfo : [ Attachment . StateInfo ] = try Attachment
. stateInfo ( interactionId : interactionId )
. fetchAll ( db )
let maybeFileIds : [ String ? ] = allAttachmentStateInfo
. sorted { lhs , rhs in lhs . albumIndex < rhs . albumIndex }
. map { Attachment . fileId ( for : $0 . downloadUrl ) }
let fileIds : [ String ] = maybeFileIds . compactMap { $0 }
// I f t h e r e w e r e f a i l e d a t t a c h m e n t s t h e n t h i s j o b s h o u l d f a i l ( c a n ' t s e n d a
// m e s s a g e w h i c h h a s a s s o c i a t e d a t t a c h m e n t s i f t h e a t t a c h m e n t s f a i l t o u p l o a d )
guard ! allAttachmentStateInfo . contains ( where : { $0 . state = = . failedDownload } ) else {
Log . info ( . cat , " Failing ( \( job . id ? ? - 1 ) ) due to failed attachment upload " )
return ( AttachmentError . notUploaded , [ ] , fileIds )
}
// / F i n d a l l a t t a c h m e n t I d s f o r a t t a c h m e n t s w h i c h n e e d t o b e u p l o a d e d
// /
// / * * N o t e : * * I f t h e r e a r e a n y ' d o w n l o a d e d ' a t t a c h m e n t s t h e n t h e y a l s o n e e d t o b e u p l o a d e d ( a s a
// / ' d o w n l o a d e d ' a t t a c h m e n t w i l l b e o n t h e c u r r e n t u s e r s d e v i c e b u t n o t o n t h e m e s s a g e r e c i p i e n t s
// / d e v i c e - b o t h ` L i n k P r e v i e w ` a n d ` Q u o t e ` c a n h a v e t h i s c a s e )
let pendingUploadAttachmentIds : [ String ] = allAttachmentStateInfo
. filter { attachment -> Bool in
// N o n - m e d i a q u o t e s w o n ' t h a v e t h u m b n a i l s s o s o d o n ' t t r y t o u p l o a d t h e m
guard attachment . downloadUrl != Attachment . nonMediaQuoteFileId else { return false }
switch attachment . state {
case . uploading , . pendingDownload , . downloading , . failedUpload , . downloaded :
return true
// I f w e ' v e s o m e h o w g o t a n a t t a c h m e n t t h a t i s i n a n ' u p l o a d e d ' s t a t e b u t d o e s n ' t
// h a v e a ' d o w n l o a d U r l ' t h e n i t ' s i n v a l i d a n d n e e d s t o b e r e - u p l o a d e d
case . uploaded : return ( attachment . downloadUrl = = nil )
default : return false
}
}
. map { $0 . attachmentId }
return ( nil , pendingUploadAttachmentIds , fileIds )
}
. defaulting ( to : ( MessageSenderError . invalidMessage , [ ] , [ ] ) )
// / I f w e g o t a n e r r o r w h e n t r y i n g t o r e t r i e v e t h e a t t a c h m e n t s t a t e t h e n t h i s j o b i s a c t u a l l y i n v a l i d s o i t
// / s h o u l d p e r m a n e n t l y f a i l
guard attachmentState . error = = nil else {
Log . error ( . cat , " Failed due to invalid attachment state " )
return failure ( job , ( attachmentState . error ? ? MessageSenderError . invalidMessage ) , true )
}
// / I f w e h a v e a n y p e n d i n g ( o r f a i l e d ) a t t a c h m e n t u p l o a d s t h e n w e s h o u l d c r e a t e j o b s f o r t h e m a n d i n s e r t t h e m i n t o t h e
// / q u e u e b e f o r e t h e c u r r e n t j o b a n d d e f e r i t ( t h i s w i l l m e a n t h e c u r r e n t j o b w i l l r e - r u n a f t e r t h e s e i n s e r t e d j o b s c o m p l e t e )
guard attachmentState . pendingUploadAttachmentIds . isEmpty else {
dependencies [ singleton : . storage ] . write { db in
try attachmentState . pendingUploadAttachmentIds
. filter { attachmentId in
// D o n ' t a d d a n e w j o b i f t h e r e i s o n e a l r e a d y i n t h e q u e u e
! dependencies [ singleton : . jobRunner ] . hasJob (
of : . attachmentUpload ,
with : AttachmentUploadJob . Details (
messageSendJobId : jobId ,
attachmentId : attachmentId
)
)
}
. compactMap { attachmentId -> ( jobId : Int64 , job : Job ) ? in
dependencies [ singleton : . jobRunner ]
. insert (
db ,
job : Job (
variant : . attachmentUpload ,
behaviour : . runOnce ,
threadId : job . threadId ,
interactionId : interactionId ,
details : AttachmentUploadJob . Details (
messageSendJobId : jobId ,
attachmentId : attachmentId
)
) ,
before : job
)
}
. forEach { otherJobId , _ in
// C r e a t e t h e d e p e n d e n c y b e t w e e n t h e j o b s
try JobDependencies (
jobId : jobId ,
dependantId : otherJobId
)
. insert ( db )
}
}
Log . info ( . cat , " Deferring ( \( job . id ? ? - 1 ) ) due to pending attachment uploads " )
return deferred ( job )
}
// S t o r e t h e f i l e I d s s o t h e y c a n b e s e n t w i t h t h e o p e n g r o u p m e s s a g e c o n t e n t
messageFileIds = attachmentState . preparedFileIds
}
// S t o r e t h e s e n t T i m e s t a m p f r o m t h e m e s s a g e i n c a s e i t f a i l s d u e t o a c l o c k O u t O f S y n c e r r o r
let originalSentTimestamp : UInt64 ? = details . message . sentTimestamp
let startTime : TimeInterval = dependencies . dateNow . timeIntervalSince1970
// / P e r f o r m t h e a c t u a l m e s s a g e s e n d i n g - t h i s w i l l t i m e o u t i f t h e e n t i r e p r o c e s s t a k e s l o n g e r t h a n ` N e t w o r k . d e f a u l t T i m e o u t * 2 `
// / w h i c h c a n o c c u r i f i t n e e d s t o b u i l d a n e w o n i o n p a t h ( w h i c h d o e s n ' t a c t u a l l y h a v e a n y l i m i t s s o c a n t a k e f o r e v e r i n r a r e c a s e s )
// /
// / * * N o t e : * * N o n e e d t o u p l o a d a t t a c h m e n t s a s p a r t o f t h i s p r o c e s s a s t h e a b o v e l o g i c s p l i t s t h a t o u t i n t o i t ' s o w n j o b
// / s o w e s h o u l d n ' t g e t h e r e u n t i l a t t a c h m e n t s h a v e a l r e a d y b e e n u p l o a d e d
dependencies [ singleton : . storage ]
. writePublisher { db -> Network . PreparedRequest < Void > in
try MessageSender . preparedSend (
db ,
message : details . message ,
to : details . destination ,
namespace : details . destination . defaultNamespace ,
interactionId : job . interactionId ,
fileIds : messageFileIds ,
using : dependencies
)
}
. flatMap { $0 . send ( using : dependencies ) }
. subscribe ( on : queue , using : dependencies )
. receive ( on : queue , using : dependencies )
. sinkUntilComplete (
receiveCompletion : { result in
switch result {
case . finished :
Log . info ( . cat , " Completed sending \( messageType ) ( \( job . id ? ? - 1 ) ) after \( . seconds ( dependencies . dateNow . timeIntervalSince1970 - startTime ) , unit : . s ) . " )
success ( job , false )
case . failure ( let error ) :
Log . info ( . cat , " Failed to send \( messageType ) ( \( job . id ? ? - 1 ) ) after \( . seconds ( dependencies . dateNow . timeIntervalSince1970 - startTime ) , unit : . s ) due to error: \( error ) . " )
// A c t u a l e r r o r h a n d l i n g
switch ( error , details . message ) {
case ( let senderError as MessageSenderError , _ ) where ! senderError . isRetryable :
failure ( job , error , true )
case ( SnodeAPIError . rateLimited , _ ) :
failure ( job , error , true )
case ( SnodeAPIError . clockOutOfSync , _ ) :
Log . error ( . cat , " \( originalSentTimestamp != nil ? " Permanently Failing " : " Failing " ) to send \( messageType ) ( \( job . id ? ? - 1 ) ) due to clock out of sync issue. " )
failure ( job , error , ( originalSentTimestamp != nil ) )
// D o n ' t b o t h e r r e t r y i n g ( i t c a n j u s t s e n d a n e w o n e l a t e r b u t a l l o w i n g r e t r i e s
// c a n r e s u l t i n a l a r g e n u m b e r o f ` M e s s a g e S e n d J o b s ` b a c k i n g u p )
case ( _ , is TypingIndicator ) :
failure ( job , error , true )
default :
if details . message is VisibleMessage {
guard
let interactionId : Int64 = job . interactionId ,
dependencies [ singleton : . storage ] . read ( { db in try Interaction . exists ( db , id : interactionId ) } ) = = true
else {
// T h e m e s s a g e h a s b e e n d e l e t e d s o p e r m a n e n t l y f a i l t h e j o b
return failure ( job , error , true )
}
}
failure ( job , error , false )
}
}
}
)
}
}
// MARK: - M e s s a g e S e n d J o b . D e t a i l s
extension MessageSendJob {
public struct Details : Codable {
private enum CodingKeys : String , CodingKey {
case destination
case message
@ available ( * , deprecated , message : " replaced by 'Message.Destination.syncMessage' " ) case isSyncMessage
case variant
}
public let destination : Message . Destination
public let message : Message
public let variant : Message . Variant ?
// MARK: - I n i t i a l i z a t i o n
public init (
destination : Message . Destination ,
message : Message
) {
self . destination = destination
self . message = message
self . variant = Message . Variant ( from : message )
}
// MARK: - C o d a b l e
public init ( from decoder : Decoder ) throws {
let container : KeyedDecodingContainer < CodingKeys > = try decoder . container ( keyedBy : CodingKeys . self )
guard let variant : Message . Variant = try ? container . decode ( Message . Variant . self , forKey : . variant ) else {
Log . error ( . cat , " Unable to decode messageSend job due to missing variant " )
throw StorageError . decodingFailed
}
self = Details (
destination : try container . decode ( Message . Destination . self , forKey : . destination ) ,
message : try variant . decode ( from : container , forKey : . message )
)
}
public func encode ( to encoder : Encoder ) throws {
var container : KeyedEncodingContainer < CodingKeys > = encoder . container ( keyedBy : CodingKeys . self )
guard let variant : Message . Variant = Message . Variant ( from : message ) else {
Log . error ( . cat , " Unable to encode messageSend job due to unsupported variant " )
throw StorageError . objectNotFound
}
try container . encode ( destination , forKey : . destination )
try container . encode ( message , forKey : . message )
try container . encode ( variant , forKey : . variant )
}
}
}