// C o p y r i g h t © 2 0 2 2 R a n g e p r o o f P t y L t d . A l l r i g h t s r e s e r v e d .
import Foundation
import Combine
import GRDB
import SignalCoreKit
import SessionUtilitiesKit
import SessionSnodeKit
public enum MessageSendJob : JobExecutor {
public static var maxFailureCount : Int = 10
public static var requiresThreadId : Bool = true
public static let requiresInteractionId : Bool = false // S o m e m e s s a g e s d o n ' t h a v e i n t e r a c t i o n s
public static func run (
_ job : Job ,
queue : DispatchQueue ,
success : @ escaping ( Job , Bool , Dependencies ) -> ( ) ,
failure : @ escaping ( Job , Error ? , Bool , Dependencies ) -> ( ) ,
deferred : @ escaping ( Job , Dependencies ) -> ( ) ,
using dependencies : Dependencies
) {
guard
let detailsData : Data = job . details ,
let details : Details = try ? JSONDecoder ( using : dependencies ) . decode ( Details . self , from : detailsData )
else {
SNLog ( " [MessageSendJob] Failing due to missing details " )
return failure ( job , JobRunnerError . missingRequiredDetails , true , dependencies )
}
// W e n e e d t o i n c l u d e ' f i l e I d s ' w h e n s e n d i n g m e s s a g e s w i t h a t t a c h m e n t s t o O p e n G r o u p s
// s o e x t r a c t t h e m f r o m a n y a s s o c i a t e d a t t a c h m e n t s
var messageFileIds : [ String ] = [ ]
// / E n s u r e a n y a s s o c i a t e d a t t a c h m e n t s h a v e a l r e a d y b e e n u p l o a d e d b e f o r e s e n d i n g t h e m e s s a g e
// /
// / * * N o t e : * * R e a c t i o n s r e f e r e n c e t h e i r o r i g i n a l m e s s a g e s o w e n e e d t o i g n o r e t h i s l o g i c f o r r e a c t i o n m e s s a g e s t o e n s u r e w e d o n ' t
// / i n c o r r e c t l y r e - u p l o a d i n c o m i n g a t t a c h m e n t s t h a t t h e u s e r r e a c t e d t o , w e a l s o w a n t t o e x c l u d e " s y n c " m e s s a g e s s i n c e t h e y s h o u l d
// / a l r e a d y h a v e a t t a c h m e n t s i n a v a l i d s t a t e
if
details . message is VisibleMessage ,
( details . message as ? VisibleMessage ) ? . reaction = = nil
{
guard
let jobId : Int64 = job . id ,
let interactionId : Int64 = job . interactionId
else {
SNLog ( " [MessageSendJob] Failing due to missing details " )
return failure ( job , JobRunnerError . missingRequiredDetails , true , dependencies )
}
// R e t r i e v e t h e c u r r e n t a t t a c h m e n t s t a t e
typealias AttachmentState = ( error : Error ? , pendingUploadAttachmentIds : [ String ] , preparedFileIds : [ String ] )
let attachmentState : AttachmentState = dependencies [ singleton : . storage ]
. read { db in
// I f t h e o r i g i n a l i n t e r a c t i o n n o l o n g e r e x i s t s t h e n d o n ' t b o t h e r s e n d i n g t h e m e s s a g e ( i e . t h e
// m e s s a g e w a s d e l e t e d b e f o r e i t e v e n g o t s e n t )
guard try Interaction . exists ( db , id : interactionId ) else {
SNLog ( " [MessageSendJob] Failing due to missing interaction " )
return ( StorageError . objectNotFound , [ ] , [ ] )
}
// G e t t h e c u r r e n t s t a t e o f t h e a t t a c h m e n t s
let allAttachmentStateInfo : [ Attachment . StateInfo ] = try Attachment
. stateInfo ( interactionId : interactionId )
. fetchAll ( db )
let maybeFileIds : [ String ? ] = allAttachmentStateInfo
. sorted { lhs , rhs in lhs . albumIndex < rhs . albumIndex }
. map { Attachment . fileId ( for : $0 . downloadUrl ) }
let fileIds : [ String ] = maybeFileIds . compactMap { $0 }
// I f t h e r e w e r e f a i l e d a t t a c h m e n t s t h e n t h i s j o b s h o u l d f a i l ( c a n ' t s e n d a
// m e s s a g e w h i c h h a s a s s o c i a t e d a t t a c h m e n t s i f t h e a t t a c h m e n t s f a i l t o u p l o a d )
guard ! allAttachmentStateInfo . contains ( where : { $0 . state = = . failedDownload } ) else {
SNLog ( " [MessageSendJob] Failing due to failed attachment upload " )
return ( AttachmentError . notUploaded , [ ] , fileIds )
}
// / F i n d a l l a t t a c h m e n t I d s f o r a t t a c h m e n t s w h i c h n e e d t o b e u p l o a d e d
// /
// / * * N o t e : * * I f t h e r e a r e a n y ' d o w n l o a d e d ' a t t a c h m e n t s t h e n t h e y a l s o n e e d t o b e u p l o a d e d ( a s a
// / ' d o w n l o a d e d ' a t t a c h m e n t w i l l b e o n t h e c u r r e n t u s e r s d e v i c e b u t n o t o n t h e m e s s a g e r e c i p i e n t s
// / d e v i c e - b o t h ` L i n k P r e v i e w ` a n d ` Q u o t e ` c a n h a v e t h i s c a s e )
let pendingUploadAttachmentIds : [ String ] = allAttachmentStateInfo
. filter { attachment -> Bool in
// N o n - m e d i a q u o t e s w o n ' t h a v e t h u m b n a i l s s o s o d o n ' t t r y t o u p l o a d t h e m
guard attachment . downloadUrl != Attachment . nonMediaQuoteFileId else { return false }
switch attachment . state {
case . uploading , . pendingDownload , . downloading , . failedUpload , . downloaded :
return true
// I f w e ' v e s o m e h o w g o t a n a t t a c h m e n t t h a t i s i n a n ' u p l o a d e d ' s t a t e b u t d o e s n ' t
// h a v e a ' d o w n l o a d U r l ' t h e n i t ' s i n v a l i d a n d n e e d s t o b e r e - u p l o a d e d
case . uploaded : return ( attachment . downloadUrl = = nil )
default : return false
}
}
. map { $0 . attachmentId }
return ( nil , pendingUploadAttachmentIds , fileIds )
}
. defaulting ( to : ( MessageSenderError . invalidMessage , [ ] , [ ] ) )
// / I f w e g o t a n e r r o r w h e n t r y i n g t o r e t r i e v e t h e a t t a c h m e n t s t a t e t h e n t h i s j o b i s a c t u a l l y i n v a l i d s o i t
// / s h o u l d p e r m a n e n t l y f a i l
guard attachmentState . error = = nil else {
SNLog ( " [MessageSendJob] Failed due to invalid attachment state " )
return failure ( job , ( attachmentState . error ? ? MessageSenderError . invalidMessage ) , true , dependencies )
}
// / I f w e h a v e a n y p e n d i n g ( o r f a i l e d ) a t t a c h m e n t u p l o a d s t h e n w e s h o u l d c r e a t e j o b s f o r t h e m a n d i n s e r t t h e m i n t o t h e
// / q u e u e b e f o r e t h e c u r r e n t j o b a n d d e f e r i t ( t h i s w i l l m e a n t h e c u r r e n t j o b w i l l r e - r u n a f t e r t h e s e i n s e r t e d j o b s c o m p l e t e )
guard attachmentState . pendingUploadAttachmentIds . isEmpty else {
dependencies [ singleton : . storage ] . write { db in
try attachmentState . pendingUploadAttachmentIds
. filter { attachmentId in
// D o n ' t a d d a n e w j o b i f t h e r e i s o n e a l r e a d y i n t h e q u e u e
! dependencies [ singleton : . jobRunner ] . hasJob (
of : . attachmentUpload ,
with : AttachmentUploadJob . Details (
messageSendJobId : jobId ,
attachmentId : attachmentId
)
)
}
. compactMap { attachmentId -> ( jobId : Int64 , job : Job ) ? in
dependencies [ singleton : . jobRunner ]
. insert (
db ,
job : Job (
variant : . attachmentUpload ,
behaviour : . runOnce ,
threadId : job . threadId ,
interactionId : interactionId ,
details : AttachmentUploadJob . Details (
messageSendJobId : jobId ,
attachmentId : attachmentId
)
) ,
before : job
)
}
. forEach { otherJobId , _ in
// C r e a t e t h e d e p e n d e n c y b e t w e e n t h e j o b s
try JobDependencies (
jobId : jobId ,
dependantId : otherJobId
)
. insert ( db )
}
}
SNLog ( " [MessageSendJob] Deferring due to pending attachment uploads " )
return deferred ( job , dependencies )
}
// S t o r e t h e f i l e I d s s o t h e y c a n b e s e n t w i t h t h e o p e n g r o u p m e s s a g e c o n t e n t
messageFileIds = attachmentState . preparedFileIds
}
// S t o r e t h e s e n t T i m e s t a m p f r o m t h e m e s s a g e i n c a s e i t f a i l s d u e t o a c l o c k O u t O f S y n c e r r o r
let originalSentTimestamp : UInt64 ? = details . message . sentTimestamp
// / P e r f o r m t h e a c t u a l m e s s a g e s e n d i n g - t h i s w i l l t i m e o u t i f t h e e n t i r e p r o c e s s t a k e s l o n g e r t h a n ` H T T P . d e f a u l t T i m e o u t * 2 `
// / w h i c h c a n o c c u r i f i t n e e d s t o b u i l d a n e w o n i o n p a t h ( w h i c h d o e s n ' t a c t u a l l y h a v e a n y l i m i t s s o c a n t a k e f o r e v e r i n r a r e c a s e s )
// /
// / * * N o t e : * * N o n e e d t o u p l o a d a t t a c h m e n t s a s p a r t o f t h i s p r o c e s s a s t h e a b o v e l o g i c s p l i t s t h a t o u t i n t o i t ' s o w n j o b
// / s o w e s h o u l d n ' t g e t h e r e u n t i l a t t a c h m e n t s h a v e a l r e a d y b e e n u p l o a d e d
dependencies [ singleton : . storage ]
. writePublisher { db -> HTTP . PreparedRequest < Void > in
try MessageSender . preparedSend (
db ,
message : details . message ,
to : details . destination ,
namespace : details . destination . defaultNamespace ,
interactionId : job . interactionId ,
fileIds : messageFileIds ,
using : dependencies
)
}
. flatMap { $0 . send ( using : dependencies ) }
. subscribe ( on : queue , using : dependencies )
. receive ( on : queue , using : dependencies )
. timeout ( . milliseconds ( Int ( HTTP . defaultTimeout * 2 * 1000 ) ) , scheduler : queue , customError : {
MessageSenderError . sendJobTimeout
} )
. sinkUntilComplete (
receiveCompletion : { result in
switch result {
case . finished : success ( job , false , dependencies )
case . failure ( let error ) :
switch error {
case MessageSenderError . sendJobTimeout :
SNLog ( " [MessageSendJob] Couldn't send message due to error: \( error ) (paths: \( dependencies [ cache : . onionRequestAPI ] . paths . prettifiedDescription ) ). " )
// I n t h i s c a s e t h e ` M e s s a g e S e n d e r ` p r o c e s s g e t s c a n c e l l e d s o w e n e e d t o
// c a l l ` h a n d l e F a i l e d M e s s a g e S e n d ` t o u p d a t e t h e s t a t u s e s c o r r e c t l y
dependencies [ singleton : . storage ] . read ( using : dependencies ) { db in
MessageSender . handleFailedMessageSend (
db ,
message : details . message ,
destination : details . destination ,
error : . sendJobTimeout ,
interactionId : job . interactionId ,
using : dependencies
)
}
default :
SNLog ( " [MessageSendJob] Couldn't send message due to error: \( error ) " )
}
// A c t u a l e r r o r h a n d l i n g
switch error {
case let senderError as MessageSenderError where ! senderError . isRetryable :
failure ( job , error , true , dependencies )
case OnionRequestAPIError . httpRequestFailedAtDestination ( let statusCode , _ , _ ) where statusCode = = 429 : // R a t e l i m i t e d
failure ( job , error , true , dependencies )
case SnodeAPIError . clockOutOfSync :
SNLog ( " [MessageSendJob] \( originalSentTimestamp != nil ? " Permanently Failing " : " Failing " ) to send \( type ( of : details . message ) ) due to clock out of sync issue. " )
failure ( job , error , ( originalSentTimestamp != nil ) , dependencies )
default :
if details . message is VisibleMessage {
guard
let interactionId : Int64 = job . interactionId ,
dependencies [ singleton : . storage ] . read ( { db in try Interaction . exists ( db , id : interactionId ) } ) = = true
else {
// T h e m e s s a g e h a s b e e n d e l e t e d s o p e r m a n e n t l y f a i l t h e j o b
return failure ( job , error , true , dependencies )
}
}
failure ( job , error , false , dependencies )
}
}
}
)
}
}
// MARK: - M e s s a g e S e n d J o b . D e t a i l s
extension MessageSendJob {
public struct Details : Codable {
private enum CodingKeys : String , CodingKey {
case destination
case message
@ available ( * , deprecated , message : " replaced by 'Message.Destination.syncMessage' " ) case isSyncMessage
case variant
}
public let destination : Message . Destination
public let message : Message
public let variant : Message . Variant ?
// MARK: - I n i t i a l i z a t i o n
public init (
destination : Message . Destination ,
message : Message
) {
self . destination = destination
self . message = message
self . variant = Message . Variant ( from : message )
}
// MARK: - C o d a b l e
public init ( from decoder : Decoder ) throws {
let container : KeyedDecodingContainer < CodingKeys > = try decoder . container ( keyedBy : CodingKeys . self )
guard let variant : Message . Variant = try ? container . decode ( Message . Variant . self , forKey : . variant ) else {
SNLog ( " Unable to decode messageSend job due to missing variant " )
throw StorageError . decodingFailed
}
let message : Message = try variant . decode ( from : container , forKey : . message )
var destination : Message . Destination = try container . decode ( Message . Destination . self , forKey : . destination )
// / H a n d l e t h e l e g a c y ' i s S y n c M e s s a g e ' f l a g - t h i s f l a g w a s d e p r e c a t e d i n ` 2 . 5 . 2 ` ( A p r i l 2 0 2 4 ) a n d c a n b e r e m o v e d i n a
// / s u b s e q u e n t r e l e a s e a f t e r M a y 2 0 2 4
if ( ( try ? container . decode ( Bool . self , forKey : . isSyncMessage ) ) ? ? false ) {
switch ( destination , message ) {
case ( . contact , let message as VisibleMessage ) :
guard let targetPublicKey : String = message . syncTarget else {
SNLog ( " Unable to decode messageSend job due to missing syncTarget " )
throw StorageError . decodingFailed
}
destination = . syncMessage ( originalRecipientPublicKey : targetPublicKey )
case ( . contact , let message as ExpirationTimerUpdate ) :
guard let targetPublicKey : String = message . syncTarget else {
SNLog ( " Unable to decode messageSend job due to missing syncTarget " )
throw StorageError . decodingFailed
}
destination = . syncMessage ( originalRecipientPublicKey : targetPublicKey )
case ( . contact ( let publicKey ) , _ ) :
SNLog ( " Sync message in messageSend job was missing explicit syncTarget (falling back to specified value) " )
destination = . syncMessage ( originalRecipientPublicKey : publicKey )
default :
SNLog ( " Unable to decode messageSend job due to invalid sync message state " )
throw StorageError . decodingFailed
}
}
self = Details (
destination : destination ,
message : message
)
}
public func encode ( to encoder : Encoder ) throws {
var container : KeyedEncodingContainer < CodingKeys > = encoder . container ( keyedBy : CodingKeys . self )
guard let variant : Message . Variant = Message . Variant ( from : message ) else {
SNLog ( " Unable to encode messageSend job due to unsupported variant " )
throw StorageError . objectNotFound
}
try container . encode ( destination , forKey : . destination )
try container . encode ( message , forKey : . message )
try container . encode ( variant , forKey : . variant )
}
}
}