// C o p y r i g h t © 2 0 2 2 R a n g e p r o o f P t y L t d . A l l r i g h t s r e s e r v e d .
import Foundation
import Combine
import GRDB
import Sodium
import SessionSnodeKit
import SessionUtilitiesKit
public class Poller {
private var cancellables : Atomic < [ String : AnyCancellable ] > = Atomic ( [ : ] )
internal var isPolling : Atomic < [ String : Bool ] > = Atomic ( [ : ] )
internal var pollCount : Atomic < [ String : Int ] > = Atomic ( [ : ] )
internal var failureCount : Atomic < [ String : Int ] > = Atomic ( [ : ] )
internal var targetSnode : Atomic < Snode ? > = Atomic ( nil )
private var usedSnodes : Atomic < Set < Snode > > = Atomic ( [ ] )
// MARK: - S e t t i n g s
// / T h e n a m e s p a c e s w h i c h t h i s p o l l e r q u e r i e s
internal var namespaces : [ SnodeAPI . Namespace ] {
preconditionFailure ( " abstract class - override in subclass " )
}
// / T h e n u m b e r o f t i m e s t h e p o l l e r c a n p o l l a s i n g l e s n o d e b e f o r e s w a p p i n g t o a n e w s n o d e
internal var maxNodePollCount : UInt {
preconditionFailure ( " abstract class - override in subclass " )
}
// MARK: - P u b l i c A P I
public init ( ) { }
public func stopAllPollers ( ) {
let pollers : [ String ] = Array ( isPolling . wrappedValue . keys )
pollers . forEach { groupPublicKey in
self . stopPolling ( for : groupPublicKey )
}
}
public func stopPolling ( for publicKey : String ) {
isPolling . mutate { $0 [ publicKey ] = false }
cancellables . mutate { $0 [ publicKey ] ? . cancel ( ) }
}
// MARK: - A b s t r a c t M e t h o d s
// / T h e n a m e f o r t h i s p o l l e r t o a p p e a r i n t h e l o g s
internal func pollerName ( for publicKey : String ) -> String {
preconditionFailure ( " abstract class - override in subclass " )
}
// / C a l c u l a t e t h e d e l a y w h i c h s h o u l d o c c u r b e f o r e t h e n e x t p o l l
internal func nextPollDelay ( for publicKey : String ) -> TimeInterval {
preconditionFailure ( " abstract class - override in subclass " )
}
// / P e r f o r m a n d l o g i c w h i c h s h o u l d o c c u r w h e n t h e p o l l e r r o r s , w i l l s t o p p o l l i n g i f ` f a l s e ` i s r e t u r n e d
internal func handlePollError ( _ error : Error , for publicKey : String , using dependencies : SMKDependencies ) -> Bool {
preconditionFailure ( " abstract class - override in subclass " )
}
// MARK: - P r i v a t e A P I
internal func startIfNeeded ( for publicKey : String ) {
// R u n o n t h e ' p o l l e r Q u e u e ' t o e n s u r e a n y ' A t o m i c ' a c c e s s d o e s n ' t b l o c k t h e m a i n t h r e a d
// o n s t a r t u p
Threading . pollerQueue . async { [ weak self ] in
guard self ? . isPolling . wrappedValue [ publicKey ] != true else { return }
// M i g h t b e a r a c e c o n d i t i o n t h a t t h e s e t U p P o l l i n g f i n i s h e s t o o s o o n ,
// a n d t h e t i m e r i s n o t c r e a t e d , i f w e m a r k t h e g r o u p a s i s p o l l i n g
// a f t e r s e t U p P o l l i n g . S o t h e p o l l e r m a y n o t w o r k , t h u s m i s s e s m e s s a g e s
self ? . isPolling . mutate { $0 [ publicKey ] = true }
self ? . pollRecursively ( for : publicKey )
}
}
internal func getSnodeForPolling (
for publicKey : String ,
using dependencies : SMKDependencies = SMKDependencies ( )
) -> AnyPublisher < Snode , Error > {
// I f w e d o n ' t w a n t t o p o l l a s n o d e m u l t i p l e t i m e s t h e n j u s t g r a b a r a n d o m o n e f r o m t h e s w a r m
guard maxNodePollCount > 0 else {
return SnodeAPI . getSwarm ( for : publicKey , using : dependencies )
. tryMap { swarm -> Snode in
try swarm . randomElement ( ) ? ? { throw OnionRequestAPIError . insufficientSnodes } ( )
}
. eraseToAnyPublisher ( )
}
// I f w e a l r e a d y h a v e a t a r g e t s n o d e t h e n u s e t h a t
if let targetSnode : Snode = self . targetSnode . wrappedValue {
return Just ( targetSnode )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
// S e l e c t t h e n e x t u n u s e d s n o d e f r o m t h e s w a r m ( i f w e ' v e u s e d t h e m a l l t h e n c l e a r t h e u s e d l i s t a n d
// s t a r t c y c l i n g t h r o u g h t h e m a g a i n )
return SnodeAPI . getSwarm ( for : publicKey , using : dependencies )
. tryMap { [ usedSnodes = self . usedSnodes , targetSnode = self . targetSnode ] swarm -> Snode in
let unusedSnodes : Set < Snode > = swarm . subtracting ( usedSnodes . wrappedValue )
// I f w e ' v e u s e d a l l o f t h e S N o d e s t h e n c l e a r o u t t h e u s e d l i s t
if unusedSnodes . isEmpty {
usedSnodes . mutate { $0 . removeAll ( ) }
}
// S e l e c t t h e n e x t S N o d e
let nextSnode : Snode = try swarm . randomElement ( ) ? ? { throw OnionRequestAPIError . insufficientSnodes } ( )
targetSnode . mutate { $0 = nextSnode }
usedSnodes . mutate { $0 . insert ( nextSnode ) }
return nextSnode
}
. eraseToAnyPublisher ( )
}
internal func incrementPollCount ( publicKey : String ) {
guard maxNodePollCount > 0 else { return }
let pollCount : Int = ( self . pollCount . wrappedValue [ publicKey ] ? ? 0 )
self . pollCount . mutate { $0 [ publicKey ] = ( pollCount + 1 ) }
// C h e c k i f w e ' v e p o l l e d t h e s e r i c e n o d e t o o m a n y t i m e s
guard pollCount > maxNodePollCount else { return }
// I f w e h a v e p o l l e d t h i s s e r v i c e n o d e m o r e t h a n t h e m a x i m u m a l l o w e d t h e n c l e a r o u t
// t h e ' t a r g e t S e r v i c e N o d e ' v a l u e
self . targetSnode . mutate { $0 = nil }
}
private func pollRecursively (
for publicKey : String ,
using dependencies : SMKDependencies = SMKDependencies ( )
) {
guard isPolling . wrappedValue [ publicKey ] = = true else { return }
let namespaces : [ SnodeAPI . Namespace ] = self . namespaces
let lastPollStart : TimeInterval = Date ( ) . timeIntervalSince1970
let lastPollInterval : TimeInterval = nextPollDelay ( for : publicKey )
let getSnodePublisher : AnyPublisher < Snode , Error > = getSnodeForPolling ( for : publicKey )
// S t o r e t h e p u b l i s h e r i n t p t h e c a n c e l l a b l e s d i c t i o n a r y
cancellables . mutate { [ weak self ] cancellables in
cancellables [ publicKey ] = getSnodePublisher
. flatMap { snode -> AnyPublisher < [ Message ] , Error > in
Poller . poll (
namespaces : namespaces ,
from : snode ,
for : publicKey ,
poller : self ,
using : dependencies
)
}
. subscribe ( on : dependencies . subscribeQueue )
. receive ( on : dependencies . receiveQueue )
. sink (
receiveCompletion : { result in
switch result {
case . failure ( let error ) :
// D e t e r m i n e i f t h e e r r o r s h o u l d s t o p u s f r o m p o l l i n g a n y m o r e
guard self ? . handlePollError ( error , for : publicKey , using : dependencies ) = = true else {
return
}
case . finished : break
}
// I n c r e m e n t t h e p o l l c o u n t
self ? . incrementPollCount ( publicKey : publicKey )
// C a l c u l a t e t h e r e m a i n i n g p o l l d e l a y
let currentTime : TimeInterval = Date ( ) . timeIntervalSince1970
let nextPollInterval : TimeInterval = (
self ? . nextPollDelay ( for : publicKey ) ? ?
lastPollInterval
)
let remainingInterval : TimeInterval = max ( 0 , nextPollInterval - ( currentTime - lastPollStart ) )
// S c h e d u l e t h e n e x t p o l l
guard remainingInterval > 0 else {
return dependencies . subscribeQueue . async {
self ? . pollRecursively ( for : publicKey , using : dependencies )
}
}
dependencies . subscribeQueue . asyncAfter ( deadline : . now ( ) + . milliseconds ( Int ( remainingInterval * 1000 ) ) , qos : . default ) {
self ? . pollRecursively ( for : publicKey , using : dependencies )
}
} ,
receiveValue : { _ in }
)
}
}
// / P o l l s t h e s p e c i f i e d n a m e s p a c e s a n d p r o c e s s e s a n y m e s s a g e s , r e t u r n i n g a n a r r a y o f m e s s a g e s t h a t w e r e
// / s u c c e s s f u l l y p r o c e s s e d
// /
// / * * N o t e : * * T h e r e t u r n e d m e s s a g e s w i l l h a v e a l r e a d y b e e n p r o c e s s e d b y t h e ` P o l l e r ` , t h e y a r e o n l y r e t u r n e d
// / f o r c a s e s w h e r e w e n e e d e x p l i c i t / c u s t o m b e h a v i o u r s t o o c c u r ( e g . O n b o a r d i n g )
public static func poll (
namespaces : [ SnodeAPI . Namespace ] ,
from snode : Snode ,
for publicKey : String ,
calledFromBackgroundPoller : Bool = false ,
isBackgroundPollValid : @ escaping ( ( ) -> Bool ) = { true } ,
poller : Poller ? = nil ,
using dependencies : SMKDependencies = SMKDependencies (
subscribeQueue : Threading . pollerQueue ,
receiveQueue : Threading . pollerQueue
)
) -> AnyPublisher < [ Message ] , Error > {
// I f t h e p o l l i n g h a s b e e n c a n c e l l e d t h e n d o n ' t c o n t i n u e
guard
( calledFromBackgroundPoller && isBackgroundPollValid ( ) ) ||
poller ? . isPolling . wrappedValue [ publicKey ] = = true
else {
return Just ( [ ] )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
let pollerName : String = (
poller ? . pollerName ( for : publicKey ) ? ?
" poller with public key \( publicKey ) "
)
let configHashes : [ String ] = SessionUtil . configHashes ( for : publicKey )
// F e t c h t h e m e s s a g e s
return SnodeAPI
. poll (
namespaces : namespaces ,
refreshingConfigHashes : configHashes ,
from : snode ,
associatedWith : publicKey ,
using : dependencies
)
. flatMap { namespacedResults -> AnyPublisher < [ Message ] , Error > in
guard
( calledFromBackgroundPoller && isBackgroundPollValid ( ) ) ||
poller ? . isPolling . wrappedValue [ publicKey ] = = true
else {
return Just ( [ ] )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
let allMessages : [ SnodeReceivedMessage ] = namespacedResults
. compactMap { _ , result -> [ SnodeReceivedMessage ] ? in result . data ? . messages }
. flatMap { $0 }
// N o n e e d t o d o a n y t h i n g i f t h e r e a r e n o m e s s a g e s
guard ! allMessages . isEmpty else {
if ! calledFromBackgroundPoller { SNLog ( " Received no new messages in \( pollerName ) " ) }
return Just ( [ ] )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
// O t h e r w i s e p r o c e s s t h e m e s s a g e s a n d a d d t h e m t o t h e q u e u e f o r h a n d l i n g
let lastHashes : [ String ] = namespacedResults
. compactMap { $0 . value . data ? . lastHash }
let otherKnownHashes : [ String ] = namespacedResults
. filter { $0 . key . shouldDedupeMessages }
. compactMap { $0 . value . data ? . messages . map { $0 . info . hash } }
. reduce ( [ ] , + )
var messageCount : Int = 0
var processedMessages : [ Message ] = [ ]
var hadValidHashUpdate : Bool = false
var configMessageJobsToRun : [ Job ] = [ ]
var standardMessageJobsToRun : [ Job ] = [ ]
var pollerLogOutput : String = " \( pollerName ) failed to process any messages "
Storage . shared . write { db in
let allProcessedMessages : [ ProcessedMessage ] = allMessages
. compactMap { message -> ProcessedMessage ? in
do {
return try Message . processRawReceivedMessage ( db , rawMessage : message )
}
catch {
switch error {
// I g n o r e d u p l i c a t e & s e l f S e n d m e s s a g e e r r o r s ( a n d d o n ' t b o t h e r l o g g i n g
// t h e m a s t h e r e w i l l b e a l o t s i n c e w e e a c h s e r v i c e n o d e d u p l i c a t e s m e s s a g e s )
case DatabaseError . SQLITE_CONSTRAINT_UNIQUE ,
MessageReceiverError . duplicateMessage ,
MessageReceiverError . duplicateControlMessage ,
MessageReceiverError . selfSend :
break
case MessageReceiverError . duplicateMessageNewSnode :
hadValidHashUpdate = true
break
case DatabaseError . SQLITE_ABORT :
// I n t h e b a c k g r o u n d i g n o r e ' S Q L I T E _ A B O R T ' ( i t g e n e r a l l y m e a n s
// t h e B a c k g r o u n d P o l l e r h a s t i m e d o u t
if ! calledFromBackgroundPoller {
SNLog ( " Failed to the database being suspended (running in background with no background task). " )
}
break
default : SNLog ( " Failed to deserialize envelope due to error: \( error ) . " )
}
return nil
}
}
// A d d a j o b t o p r o c e s s t h e c o n f i g m e s s a g e s f i r s t
let configJobIds : [ Int64 ] = allProcessedMessages
. filter { $0 . messageInfo . variant = = . sharedConfigMessage }
. grouped { threadId , _ , _ , _ in threadId }
. compactMap { threadId , threadMessages in
messageCount += threadMessages . count
processedMessages += threadMessages . map { $0 . messageInfo . message }
let jobToRun : Job ? = Job (
variant : . configMessageReceive ,
behaviour : . runOnce ,
threadId : threadId ,
details : ConfigMessageReceiveJob . Details (
messages : threadMessages . map { $0 . messageInfo } ,
calledFromBackgroundPoller : calledFromBackgroundPoller
)
)
configMessageJobsToRun = configMessageJobsToRun . appending ( jobToRun )
// I f w e a r e f o r c e - p o l l i n g t h e n a d d t o t h e J o b R u n n e r s o t h e y a r e
// p e r s i s t e n t a n d w i l l r e t r y o n t h e n e x t a p p r u n i f t h e y f a i l b u t
// d o n ' t l e t t h e m a u t o - s t a r t
let updatedJob : Job ? = dependencies . jobRunner
. add (
db ,
job : jobToRun ,
canStartJob : ! calledFromBackgroundPoller ,
dependencies : dependencies
)
return updatedJob ? . id
}
// A d d j o b s f o r p r o c e s s i n g n o n - c o n f i g m e s s a g e s w h i c h a r e d e p e n d a n t o n t h e c o n f i g m e s s a g e
// p r o c e s s i n g j o b s
allProcessedMessages
. filter { $0 . messageInfo . variant != . sharedConfigMessage }
. grouped { threadId , _ , _ , _ in threadId }
. forEach { threadId , threadMessages in
messageCount += threadMessages . count
processedMessages += threadMessages . map { $0 . messageInfo . message }
let jobToRun : Job ? = Job (
variant : . messageReceive ,
behaviour : . runOnce ,
threadId : threadId ,
details : MessageReceiveJob . Details (
messages : threadMessages . map { $0 . messageInfo } ,
calledFromBackgroundPoller : calledFromBackgroundPoller
)
)
standardMessageJobsToRun = standardMessageJobsToRun . appending ( jobToRun )
// I f w e a r e f o r c e - p o l l i n g t h e n a d d t o t h e J o b R u n n e r s o t h e y a r e
// p e r s i s t e n t a n d w i l l r e t r y o n t h e n e x t a p p r u n i f t h e y f a i l b u t
// d o n ' t l e t t h e m a u t o - s t a r t
let updatedJob : Job ? = dependencies . jobRunner
. add (
db ,
job : jobToRun ,
canStartJob : ! calledFromBackgroundPoller ,
dependencies : dependencies
)
// C r e a t e t h e d e p e n d e n c y b e t w e e n t h e j o b s
if let updatedJobId : Int64 = updatedJob ? . id {
do {
try configJobIds . forEach { configJobId in
try JobDependencies (
jobId : updatedJobId ,
dependantId : configJobId
)
. insert ( db )
}
}
catch {
SNLog ( " Failed to add dependency between config processing and non-config processing messageReceive jobs. " )
}
}
}
// S e t t h e o u t p u t f o r l o g g i n g
pollerLogOutput = " Received \( messageCount ) new message \( messageCount = = 1 ? " " : " s " ) in \( pollerName ) (duplicates: \( allMessages . count - messageCount ) ) "
// C l e a n u p m e s s a g e h a s h e s a n d a d d s o m e l o g s a b o u t t h e p o l l r e s u l t s
if allMessages . isEmpty && ! hadValidHashUpdate {
pollerLogOutput = " Received \( allMessages . count ) new message \( allMessages . count = = 1 ? " " : " s " ) in \( pollerName ) , all duplicates - marking the hash we polled with as invalid "
// U p d a t e t h e c a c h e d v a l i d i t y o f t h e m e s s a g e s
try SnodeReceivedMessageInfo . handlePotentialDeletedOrInvalidHash (
db ,
potentiallyInvalidHashes : lastHashes ,
otherKnownValidHashes : otherKnownHashes
)
}
}
// O n l y o u t p u t l o g s i f i t i s n ' t t h e b a c k g r o u n d p o l l e r
if ! calledFromBackgroundPoller {
SNLog ( pollerLogOutput )
}
// I f w e a r e n ' t r u n i n g i n a b a c k g r o u n d p o l l e r t h e n j u s t f i n i s h i m m e d i a t e l y
guard calledFromBackgroundPoller else {
return Just ( processedMessages )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
// W e w a n t t o t r y t o h a n d l e t h e r e c e i v e j o b s i m m e d i a t e l y i n t h e b a c k g r o u n d
return Publishers
. MergeMany (
configMessageJobsToRun . map { job -> AnyPublisher < Void , Error > in
Deferred {
Future < Void , Error > { resolver in
// N o t e : I n t h e b a c k g r o u n d w e j u s t w a n t j o b s t o f a i l s i l e n t l y
ConfigMessageReceiveJob . run (
job ,
queue : dependencies . receiveQueue ,
success : { _ , _ , _ in resolver ( Result . success ( ( ) ) ) } ,
failure : { _ , _ , _ , _ in resolver ( Result . success ( ( ) ) ) } ,
deferred : { _ , _ in resolver ( Result . success ( ( ) ) ) }
)
}
}
. eraseToAnyPublisher ( )
}
)
. collect ( )
. flatMap { _ in
Publishers
. MergeMany (
standardMessageJobsToRun . map { job -> AnyPublisher < Void , Error > in
Deferred {
Future < Void , Error > { resolver in
// N o t e : I n t h e b a c k g r o u n d w e j u s t w a n t j o b s t o f a i l s i l e n t l y
MessageReceiveJob . run (
job ,
queue : dependencies . receiveQueue ,
success : { _ , _ , _ in resolver ( Result . success ( ( ) ) ) } ,
failure : { _ , _ , _ , _ in resolver ( Result . success ( ( ) ) ) } ,
deferred : { _ , _ in resolver ( Result . success ( ( ) ) ) }
)
}
}
. eraseToAnyPublisher ( )
}
)
. collect ( )
}
. map { _ in processedMessages }
. eraseToAnyPublisher ( )
}
. eraseToAnyPublisher ( )
}
}