// C o p y r i g h t © 2 0 2 2 R a n g e p r o o f P t y L t d . A l l r i g h t s r e s e r v e d .
import Foundation
import GRDB
import PromiseKit
import SessionSnodeKit
import SessionUtilitiesKit
public final class ClosedGroupPoller {
private var isPolling : Atomic < [ String : Bool ] > = Atomic ( [ : ] )
private var timers : [ String : Timer ] = [ : ]
// MARK: - S e t t i n g s
private static let minPollInterval : Double = 2
private static let maxPollInterval : Double = 30
// MARK: - E r r o r
private enum Error : LocalizedError {
case insufficientSnodes
case pollingCanceled
internal var errorDescription : String ? {
switch self {
case . insufficientSnodes : return " No snodes left to poll. "
case . pollingCanceled : return " Polling canceled. "
}
}
}
// MARK: - I n i t i a l i z a t i o n
public static let shared = ClosedGroupPoller ( )
// MARK: - P u b l i c A P I
@objc public func start ( ) {
// F e t c h a l l c l o s e d g r o u p s ( e x c l u d i n g a n y d o n ' t c o n t a i n t h e c u r r e n t u s e r a s a
// G r o u p M e m e b e r a s t h e u s e r i s n o l o n g e r a m e m b e r o f t h o s e )
Storage . shared
. read { db in
try ClosedGroup
. select ( . threadId )
. joining (
required : ClosedGroup . members
. filter ( GroupMember . Columns . profileId = = getUserHexEncodedPublicKey ( db ) )
)
. asRequest ( of : String . self )
. fetchAll ( db )
}
. defaulting ( to : [ ] )
. forEach { [ weak self ] groupPublicKey in
self ? . startPolling ( for : groupPublicKey )
}
}
public func startPolling ( for groupPublicKey : String ) {
guard isPolling . wrappedValue [ groupPublicKey ] != true else { return }
// M i g h t b e a r a c e c o n d i t i o n t h a t t h e s e t U p P o l l i n g f i n i s h e s t o o s o o n ,
// a n d t h e t i m e r i s n o t c r e a t e d , i f w e m a r k t h e g r o u p a s i s p o l l i n g
// a f t e r s e t U p P o l l i n g . S o t h e p o l l e r m a y n o t w o r k , t h u s m i s s e s m e s s a g e s .
isPolling . mutate { $0 [ groupPublicKey ] = true }
setUpPolling ( for : groupPublicKey )
}
@objc public func stop ( ) {
Storage . shared
. read { db in
try ClosedGroup
. select ( . threadId )
. asRequest ( of : String . self )
. fetchAll ( db )
}
. defaulting ( to : [ ] )
. forEach { [ weak self ] groupPublicKey in
self ? . stopPolling ( for : groupPublicKey )
}
}
public func stopPolling ( for groupPublicKey : String ) {
isPolling . mutate { $0 [ groupPublicKey ] = false }
timers [ groupPublicKey ] ? . invalidate ( )
}
// MARK: - P r i v a t e A P I
private func setUpPolling ( for groupPublicKey : String ) {
Threading . pollerQueue . async {
ClosedGroupPoller . poll ( groupPublicKey , poller : self )
. done ( on : Threading . pollerQueue ) { [ weak self ] _ in
self ? . pollRecursively ( groupPublicKey )
}
. catch ( on : Threading . pollerQueue ) { [ weak self ] error in
// T h e e r r o r i s l o g g e d i n p o l l ( _ : )
self ? . pollRecursively ( groupPublicKey )
}
}
}
private func pollRecursively ( _ groupPublicKey : String ) {
guard
isPolling . wrappedValue [ groupPublicKey ] = = true ,
let thread : SessionThread = Storage . shared . read ( { db in try SessionThread . fetchOne ( db , id : groupPublicKey ) } )
else { return }
// G e t t h e r e c e i v e d d a t e o f t h e l a s t m e s s a g e i n t h e t h r e a d . I f w e d o n ' t h a v e a n y m e s s a g e s y e t , p i c k s o m e
// r e a s o n a b l e f a k e t i m e i n t e r v a l t o u s e i n s t e a d
let lastMessageDate : Date = Storage . shared
. read { db in
try thread
. interactions
. select ( . receivedAtTimestampMs )
. order ( Interaction . Columns . timestampMs . desc )
. asRequest ( of : Int64 . self )
. fetchOne ( db )
}
. map { receivedAtTimestampMs -> Date ? in
guard receivedAtTimestampMs > 0 else { return nil }
return Date ( timeIntervalSince1970 : ( TimeInterval ( receivedAtTimestampMs ) / 1000 ) )
}
. defaulting ( to : Date ( ) . addingTimeInterval ( - 5 * 60 ) )
let timeSinceLastMessage : TimeInterval = Date ( ) . timeIntervalSince ( lastMessageDate )
let minPollInterval : Double = ClosedGroupPoller . minPollInterval
let limit : Double = ( 12 * 60 * 60 )
let a = ( ClosedGroupPoller . maxPollInterval - minPollInterval ) / limit
let nextPollInterval = a * min ( timeSinceLastMessage , limit ) + minPollInterval
SNLog ( " Next poll interval for closed group with public key: \( groupPublicKey ) is \( nextPollInterval ) s. " )
timers [ groupPublicKey ] = Timer . scheduledTimerOnMainThread ( withTimeInterval : nextPollInterval , repeats : false ) { [ weak self ] timer in
timer . invalidate ( )
Threading . pollerQueue . async {
ClosedGroupPoller . poll ( groupPublicKey , poller : self )
. done ( on : Threading . pollerQueue ) { _ in
self ? . pollRecursively ( groupPublicKey )
}
. catch ( on : Threading . pollerQueue ) { error in
// T h e e r r o r i s l o g g e d i n p o l l ( _ : )
self ? . pollRecursively ( groupPublicKey )
}
}
}
}
public static func poll (
_ groupPublicKey : String ,
on queue : DispatchQueue = SessionSnodeKit . Threading . workQueue ,
maxRetryCount : UInt = 0 ,
isBackgroundPoll : Bool = false ,
poller : ClosedGroupPoller ? = nil
) -> Promise < Void > {
let promise : Promise < Void > = SnodeAPI . getSwarm ( for : groupPublicKey )
. then ( on : queue ) { swarm -> Promise < Void > in
// r a n d o m E l e m e n t ( ) u s e s t h e s y s t e m ' s d e f a u l t r a n d o m g e n e r a t o r , w h i c h i s c r y p t o g r a p h i c a l l y s e c u r e
guard let snode = swarm . randomElement ( ) else { return Promise ( error : Error . insufficientSnodes ) }
return attempt ( maxRetryCount : maxRetryCount , recoveringOn : queue ) {
guard isBackgroundPoll || poller ? . isPolling . wrappedValue [ groupPublicKey ] = = true else {
return Promise ( error : Error . pollingCanceled )
}
let promises : [ Promise < [ SnodeReceivedMessage ] > ] = {
if SnodeAPI . hardfork >= 19 && SnodeAPI . softfork >= 1 {
return [ SnodeAPI . getMessages ( from : snode , associatedWith : groupPublicKey , authenticated : false ) ]
}
if SnodeAPI . hardfork >= 19 {
return [
SnodeAPI . getClosedGroupMessagesFromDefaultNamespace ( from : snode , associatedWith : groupPublicKey ) ,
SnodeAPI . getMessages ( from : snode , associatedWith : groupPublicKey , authenticated : false )
]
}
return [ SnodeAPI . getClosedGroupMessagesFromDefaultNamespace ( from : snode , associatedWith : groupPublicKey ) ]
} ( )
return when ( resolved : promises )
. then ( on : queue ) { messageResults -> Promise < Void > in
guard isBackgroundPoll || poller ? . isPolling . wrappedValue [ groupPublicKey ] = = true else { return Promise . value ( ( ) ) }
var promises : [ Promise < Void > ] = [ ]
let allMessages : [ SnodeReceivedMessage ] = messageResults
. reduce ( [ ] ) { result , next in
switch next {
case . fulfilled ( let messages ) : return result . appending ( contentsOf : messages )
default : return result
}
}
var messageCount : Int = 0
let totalMessagesCount : Int = allMessages . count
Storage . shared . write { db in
let processedMessages : [ ProcessedMessage ] = allMessages
. compactMap { message -> ProcessedMessage ? in
do {
return try Message . processRawReceivedMessage ( db , rawMessage : message )
}
catch {
switch error {
// I g n o r e d u p l i c a t e & s e l f S e n d m e s s a g e e r r o r s ( a n d d o n ' t b o t h e r l o g g i n g
// t h e m a s t h e r e w i l l b e a l o t s i n c e w e e a c h s e r v i c e n o d e d u p l i c a t e s m e s s a g e s )
case DatabaseError . SQLITE_CONSTRAINT_UNIQUE ,
MessageReceiverError . duplicateMessage ,
MessageReceiverError . duplicateControlMessage ,
MessageReceiverError . selfSend :
break
default : SNLog ( " Failed to deserialize envelope due to error: \( error ) . " )
}
return nil
}
}
messageCount = processedMessages . count
let jobToRun : Job ? = Job (
variant : . messageReceive ,
behaviour : . runOnce ,
threadId : groupPublicKey ,
details : MessageReceiveJob . Details (
messages : processedMessages . map { $0 . messageInfo } ,
isBackgroundPoll : isBackgroundPoll
)
)
// I f w e a r e f o r c e - p o l l i n g t h e n a d d t o t h e J o b R u n n e r s o t h e y a r e p e r s i s t e n t a n d w i l l r e t r y o n
// t h e n e x t a p p r u n i f t h e y f a i l b u t d o n ' t l e t t h e m a u t o - s t a r t
JobRunner . add ( db , job : jobToRun , canStartJob : ! isBackgroundPoll )
// W e w a n t t o t r y t o h a n d l e t h e r e c e i v e j o b s i m m e d i a t e l y i n t h e b a c k g r o u n d
if isBackgroundPoll {
promises = promises . appending (
jobToRun . map { job -> Promise < Void > in
let ( promise , seal ) = Promise < Void > . pending ( )
// N o t e : I n t h e b a c k g r o u n d w e j u s t w a n t j o b s t o f a i l s i l e n t l y
MessageReceiveJob . run (
job ,
queue : queue ,
success : { _ , _ in seal . fulfill ( ( ) ) } ,
failure : { _ , _ , _ in seal . fulfill ( ( ) ) } ,
deferred : { _ in seal . fulfill ( ( ) ) }
)
return promise
}
)
}
}
if ! isBackgroundPoll {
if totalMessagesCount > 0 {
SNLog ( " Received \( messageCount ) new message \( messageCount = = 1 ? " " : " s " ) in closed group with public key: \( groupPublicKey ) (duplicates: \( totalMessagesCount - messageCount ) ) " )
}
else {
SNLog ( " Received no new messages in closed group with public key: \( groupPublicKey ) " )
}
}
return when ( fulfilled : promises )
}
}
}
if ! isBackgroundPoll {
promise . catch2 { error in
SNLog ( " Polling failed for closed group with public key: \( groupPublicKey ) due to error: \( error ) . " )
}
}
return promise
}
}