// C o p y r i g h t © 2 0 2 2 R a n g e p r o o f P t y L t d . A l l r i g h t s r e s e r v e d .
import Foundation
import Combine
import GRDB
import SessionSnodeKit
import SessionUtilitiesKit
extension OpenGroupAPI {
public protocol PollerType {
func startIfNeeded ( using dependencies : Dependencies )
func stop ( )
}
public final class Poller : PollerType {
typealias PollResponse = ( info : ResponseInfoType , data : [ OpenGroupAPI . Endpoint : Decodable ] )
private let server : String
private var recursiveLoopId : UUID = UUID ( )
private var hasStarted : Bool = false
private var isPolling : Bool = false
// MARK: - S e t t i n g s
private static let minPollInterval : TimeInterval = 3
private static let maxPollInterval : TimeInterval = ( 60 * 60 )
internal static let maxInactivityPeriod : TimeInterval = ( 14 * 24 * 60 * 60 )
// / I f t h e r e a r e h i d d e n r o o m s t h a t w e p o l l a n d t h e y f a i l t o o m a n y t i m e s w e w a n t t o p r u n e t h e m ( a s i t l i k e l y m e a n s t h e y n o l o n g e r
// / e x i s t , a n d s i n c e t h e y a r e a l r e a d y h i d d e n i t ' s u n l i k e l y t h a t t h e u s e r w i l l n o t i c e t h a t w e s t o p p e d p o l l i n g f o r t h e m )
internal static let maxHiddenRoomFailureCount : Int64 = 10
// / W h e n d o i n g a b a c k g r o u n d p o l l w e w a n t t o o n l y f e t c h f r o m r o o m s w h i c h a r e u n l i k e l y t o t i m e o u t , i n o r d e r t o d o t h i s w e e x c l u d e
// / a n y r o o m s w h i c h h a v e f a i l e d m o r e t h a n t h i s t h r e a s h o l d
public static let maxRoomFailureCountForBackgroundPoll : Int64 = 15
// MARK: - L i f e c y c l e
public init ( for server : String ) {
self . server = server
}
public func startIfNeeded ( using dependencies : Dependencies ) {
guard ! hasStarted else { return }
hasStarted = true
recursiveLoopId = UUID ( )
pollRecursively ( using : dependencies )
}
@objc public func stop ( ) {
hasStarted = false
recursiveLoopId = UUID ( )
}
// MARK: - P o l l i n g
private func pollRecursively ( using dependencies : Dependencies ) {
guard hasStarted else { return }
let server : String = self . server
let originalRecursiveLoopId : UUID = self . recursiveLoopId
let lastPollStart : TimeInterval = dependencies . dateNow . timeIntervalSince1970
poll ( using : dependencies )
. subscribe ( on : Threading . communityPollerQueue , using : dependencies )
. receive ( on : OpenGroupAPI . workQueue , using : dependencies )
. sinkUntilComplete (
receiveCompletion : { [ weak self ] _ in
let minPollFailureCount : Int64 = dependencies . storage
. read ( using : dependencies ) { db in
try OpenGroup
. filter ( OpenGroup . Columns . server = = server )
. select ( min ( OpenGroup . Columns . pollFailureCount ) )
. asRequest ( of : Int64 . self )
. fetchOne ( db )
}
. defaulting ( to : 0 )
// C a l c u l a t e t h e r e m a i n i n g p o l l d e l a y
let currentTime : TimeInterval = dependencies . dateNow . timeIntervalSince1970
let nextPollInterval : TimeInterval = Poller . getInterval (
for : TimeInterval ( minPollFailureCount ) ,
minInterval : Poller . minPollInterval ,
maxInterval : Poller . maxPollInterval
)
let remainingInterval : TimeInterval = max ( 0 , nextPollInterval - ( currentTime - lastPollStart ) )
// S c h e d u l e t h e n e x t p o l l
guard remainingInterval > 0 else {
return Threading . communityPollerQueue . async ( using : dependencies ) {
// I f w e s t a r t e d a n e w r e c u r s i v e l o o p t h e n w e d o n ' t w a n t t o d o u b l e u p s o j u s t l e t t h i s
// o n e s t o p l o o p i n g
guard originalRecursiveLoopId = = self ? . recursiveLoopId else { return }
self ? . pollRecursively ( using : dependencies )
}
}
Threading . communityPollerQueue . asyncAfter ( deadline : . now ( ) + . milliseconds ( Int ( remainingInterval * 1000 ) ) , qos : . default , using : dependencies ) {
// I f w e s t a r t e d a n e w r e c u r s i v e l o o p t h e n w e d o n ' t w a n t t o d o u b l e u p s o j u s t l e t t h i s
// o n e s t o p l o o p i n g
guard originalRecursiveLoopId = = self ? . recursiveLoopId else { return }
self ? . pollRecursively ( using : dependencies )
}
}
)
}
public func poll (
calledFromBackgroundPoller : Bool = false ,
isBackgroundPollerValid : @ escaping ( ( ) -> Bool ) = { true } ,
isPostCapabilitiesRetry : Bool = false ,
using dependencies : Dependencies = Dependencies ( )
) -> AnyPublisher < Void , Error > {
guard ! self . isPolling && self . hasStarted else {
return Just ( ( ) )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
self . isPolling = true
let server : String = self . server
let pollStartTime : TimeInterval = dependencies . dateNow . timeIntervalSince1970
let hasPerformedInitialPoll : Bool = ( dependencies . caches [ . openGroupManager ] . hasPerformedInitialPoll [ server ] = = true )
let timeSinceLastPoll : TimeInterval = (
dependencies . caches [ . openGroupManager ] . timeSinceLastPoll [ server ] ? ?
dependencies . caches . mutate ( cache : . openGroupManager ) { cache in
cache . getTimeSinceLastOpen ( using : dependencies )
}
)
return dependencies . storage
. readPublisher ( using : dependencies ) { db -> ( Int64 , Network . PreparedRequest < Network . BatchResponseMap < OpenGroupAPI . Endpoint > > ) in
let failureCount : Int64 = ( try ? OpenGroup
. filter ( OpenGroup . Columns . server = = server )
. select ( max ( OpenGroup . Columns . pollFailureCount ) )
. asRequest ( of : Int64 . self )
. fetchOne ( db ) )
. defaulting ( to : 0 )
return (
failureCount ,
try OpenGroupAPI
. preparedPoll (
db ,
server : server ,
hasPerformedInitialPoll : hasPerformedInitialPoll ,
timeSinceLastPoll : timeSinceLastPoll ,
using : dependencies
)
)
}
. flatMap { failureCount , preparedRequest in
preparedRequest . send ( using : dependencies )
. map { info , response in ( failureCount , info , response ) }
}
. handleEvents (
receiveOutput : { [ weak self ] failureCount , info , response in
guard ! calledFromBackgroundPoller || isBackgroundPollerValid ( ) else {
// I f t h i s w a s a b a c k g r o u n d p o l l a n d t h e b a c k g r o u n d p o l l i s n o l o n g e r v a l i d
// t h e n j u s t s t o p
self ? . isPolling = false
self ? . hasStarted = false
return
}
self ? . isPolling = false
self ? . handlePollResponse (
info : info ,
response : response ,
failureCount : failureCount ,
using : dependencies
)
dependencies . caches . mutate ( cache : . openGroupManager ) { cache in
cache . hasPerformedInitialPoll [ server ] = true
cache . timeSinceLastPoll [ server ] = dependencies . dateNow . timeIntervalSince1970
dependencies . standardUserDefaults [ . lastOpen ] = dependencies . dateNow
}
let pollEndTime : TimeInterval = dependencies . dateNow . timeIntervalSince1970
SNLog ( " Open group polling finished for \( server ) in \( . seconds ( pollEndTime - pollStartTime ) , unit : . s ) . " )
}
)
. map { _ in ( ) }
. catch { [ weak self ] error -> AnyPublisher < Void , Error > in
guard
let strongSelf = self ,
strongSelf . hasStarted ,
( ! calledFromBackgroundPoller || isBackgroundPollerValid ( ) )
else {
// I f t h i s w a s a b a c k g r o u n d p o l l a n d t h e b a c k g r o u n d p o l l i s n o l o n g e r v a l i d
// t h e n j u s t s t o p
self ? . isPolling = false
self ? . hasStarted = false
return Just ( ( ) )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
// I f w e a r e r e t r y i n g t h e n t h e e r r o r i s b e i n g h a n d l e d s o n o n e e d t o c o n t i n u e ( t h i s
// m e t h o d w i l l a l w a y s r e s o l v e )
return strongSelf
. updateCapabilitiesAndRetryIfNeeded (
server : server ,
calledFromBackgroundPoller : calledFromBackgroundPoller ,
isBackgroundPollerValid : isBackgroundPollerValid ,
isPostCapabilitiesRetry : isPostCapabilitiesRetry ,
error : error ,
using : dependencies
)
. handleEvents (
receiveOutput : { [ weak self ] didHandleError in
if ! didHandleError && isBackgroundPollerValid ( ) {
// I n c r e a s e t h e f a i l u r e c o u n t
let pollFailureCount : Int64 = dependencies . storage
. read ( using : dependencies ) { db in
try OpenGroup
. filter ( OpenGroup . Columns . server = = server )
. select ( max ( OpenGroup . Columns . pollFailureCount ) )
. asRequest ( of : Int64 . self )
. fetchOne ( db )
}
. defaulting ( to : 0 )
var prunedIds : [ String ] = [ ]
dependencies . storage . writeAsync ( using : dependencies ) { db in
struct Info : Decodable , FetchableRecord {
let id : String
let shouldBeVisible : Bool
}
let rooms : [ String ] = try OpenGroup
. filter (
OpenGroup . Columns . server = = server &&
OpenGroup . Columns . isActive = = true
)
. select ( . roomToken )
. asRequest ( of : String . self )
. fetchAll ( db )
let roomsAreVisible : [ Info ] = try SessionThread
. select ( . id , . shouldBeVisible )
. filter (
ids : rooms . map {
OpenGroup . idFor ( roomToken : $0 , server : server )
}
)
. asRequest ( of : Info . self )
. fetchAll ( db )
// I n c r e a s e t h e f a i l u r e c o u n t
try OpenGroup
. filter ( OpenGroup . Columns . server = = server )
. updateAll (
db ,
OpenGroup . Columns . pollFailureCount
. set ( to : ( pollFailureCount + 1 ) )
)
// / I f t h e p o l l i n g h a s f a i l e d 1 0 + t i m e s t h e n t r y t o p r u n e a n y i n v a l i d r o o m s t h a t
// / a r e n ' t v i s i b l e ( t h e y w o u l d h a v e b e e n a d d e d v i a c o n f i g m e s s a g e s a n d w i l l
// / l i k e l y a l w a y s f a i l b u t t h e u s e r h a s n o w a y t o d e l e t e t h e m )
guard pollFailureCount > Poller . maxHiddenRoomFailureCount else { return }
prunedIds = roomsAreVisible
. filter { ! $0 . shouldBeVisible }
. map { $0 . id }
prunedIds . forEach { id in
OpenGroupManager . shared . delete (
db ,
openGroupId : id ,
// / * * N o t e : * * W e p a s s ` c a l l e d F r o m C o n f i g H a n d l i n g ` a s ` t r u e `
// / h e r e b e c a u s e w e w a n t t o a v o i d s y n c i n g t h i s d e l e t i o n a s t h e r o o m m i g h t
// / n o t b e i n a n i n v a l i d s t a t e o n o t h e r d e v i c e s - o n e o f t h e o t h e r d e v i c e s
// / w i l l e v e n t u a l l y t r i g g e r a n e w c o n f i g u p d a t e w h i c h w i l l r e - a d d t h i s r o o m
// / a n d h o p e f u l l y a t t h a t t i m e i t ' l l w o r k a g a i n
calledFromConfigHandling : true ,
using : dependencies
)
}
}
let pollEndTime : TimeInterval = dependencies . dateNow . timeIntervalSince1970
SNLog ( " Open group polling to \( server ) failed in \( . seconds ( pollEndTime - pollStartTime ) , unit : . s ) due to error: \( error ) . Setting failure count to \( pollFailureCount + 1 ) . " )
// A d d a n o t e t o t h e l o g s t h a t t h i s h a p p e n e d
if ! prunedIds . isEmpty {
let rooms : String = prunedIds
. compactMap { $0 . components ( separatedBy : server ) . last }
. joined ( separator : " , " )
SNLog ( " Hidden open group failure count surpassed \( Poller . maxHiddenRoomFailureCount ) , removed hidden rooms \( rooms ) . " )
}
}
self ? . isPolling = false
}
)
. map { _ in ( ) }
. eraseToAnyPublisher ( )
}
. eraseToAnyPublisher ( )
}
private func updateCapabilitiesAndRetryIfNeeded (
server : String ,
calledFromBackgroundPoller : Bool ,
isBackgroundPollerValid : @ escaping ( ( ) -> Bool ) = { true } ,
isPostCapabilitiesRetry : Bool ,
error : Error ,
using dependencies : Dependencies
) -> AnyPublisher < Bool , Error > {
// / W e w a n t t o c u s t o m h a n d l e a ' 4 0 0 ' e r r o r c o d e d u e t o n o t h a v i n g b l i n d e d a u t h a s i t l i k e l y m e a n s t h a t w e j o i n t h e
// / O p e n G r o u p b e f o r e b l i n d i n g w a s e n a b l e d a n d n e e d t o u p d a t e i t ' s c a p a b i l i t i e s
// /
// / * * N o t e : * * T o p r e v e n t a n i n f i n i t e l o o p c a u s e d b y a s e r v e r - s i d e b u g w e w a n t t o p r e v e n t t h i s c a p a b i l i t i e s r e q u e s t f r o m
// / h a p p e n i n g m u l t i p l e t i m e s i n a r o w
guard
! isPostCapabilitiesRetry ,
let error : NetworkError = error as ? NetworkError ,
case . badRequest ( let dataString , _ ) = error ,
dataString . contains ( " Invalid authentication: this server requires the use of blinded ids " ) // s t r i n g l i n t : d i s a b l e
else {
return Just ( false )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
return dependencies . storage
. readPublisher { db in
try OpenGroupAPI . preparedCapabilities (
db ,
server : server ,
forceBlinded : true ,
using : dependencies
)
}
. flatMap { $0 . send ( using : dependencies ) }
. flatMap { [ weak self ] _ , responseBody -> AnyPublisher < Void , Error > in
guard let strongSelf = self , strongSelf . hasStarted , isBackgroundPollerValid ( ) else {
return Just ( ( ) )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
// H a n d l e t h e u p d a t e d c a p a b i l i t i e s a n d r e - t r i g g e r t h e p o l l
strongSelf . isPolling = false
dependencies . storage . write { db in
OpenGroupManager . handleCapabilities (
db ,
capabilities : responseBody ,
on : server
)
}
// R e g a r d l e s s o f t h e o u t c o m e w e c a n j u s t r e s o l v e t h i s
// i m m e d i a t e l y a s i t ' l l h a n d l e i t ' s o w n r e s p o n s e
return strongSelf . poll (
calledFromBackgroundPoller : calledFromBackgroundPoller ,
isBackgroundPollerValid : isBackgroundPollerValid ,
isPostCapabilitiesRetry : true ,
using : dependencies
)
. map { _ in ( ) }
. eraseToAnyPublisher ( )
}
. map { _ in true }
. catch { error -> AnyPublisher < Bool , Error > in
SNLog ( " Open group updating capabilities for \( server ) failed due to error: \( error ) . " )
return Just ( true )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
. eraseToAnyPublisher ( )
}
private func handlePollResponse (
info : ResponseInfoType ,
response : Network . BatchResponseMap < OpenGroupAPI . Endpoint > ,
failureCount : Int64 ,
using dependencies : Dependencies
) {
let server : String = self . server
let validResponses : [ OpenGroupAPI . Endpoint : Any ] = response . data
. filter { endpoint , data in
switch endpoint {
case . capabilities :
guard ( data as ? Network . BatchSubResponse < Capabilities > ) ? . body != nil else {
SNLog ( " Open group polling failed due to invalid capability data. " )
return false
}
return true
case . roomPollInfo ( let roomToken , _ ) :
guard ( data as ? Network . BatchSubResponse < RoomPollInfo > ) ? . body != nil else {
switch ( data as ? Network . BatchSubResponse < RoomPollInfo > ) ? . code {
case 404 : SNLog ( " Open group polling failed to retrieve info for unknown room ' \( roomToken ) '. " )
default : SNLog ( " Open group polling failed due to invalid room info data. " )
}
return false
}
return true
case . roomMessagesRecent ( let roomToken ) , . roomMessagesBefore ( let roomToken , _ ) , . roomMessagesSince ( let roomToken , _ ) :
guard
let responseData : Network . BatchSubResponse < [ Failable < Message > ] > = data as ? Network . BatchSubResponse < [ Failable < Message > ] > ,
let responseBody : [ Failable < Message > ] = responseData . body
else {
switch ( data as ? Network . BatchSubResponse < [ Failable < Message > ] > ) ? . code {
case 404 : SNLog ( " Open group polling failed to retrieve messages for unknown room ' \( roomToken ) '. " )
default : SNLog ( " Open group polling failed due to invalid messages data. " )
}
return false
}
let successfulMessages : [ Message ] = responseBody . compactMap { $0 . value }
if successfulMessages . count != responseBody . count {
let droppedCount : Int = ( responseBody . count - successfulMessages . count )
SNLog ( " Dropped \( droppedCount ) invalid open group message \( droppedCount = = 1 ? " " : " s " ) . " )
}
return ! successfulMessages . isEmpty
case . inbox , . inboxSince , . outbox , . outboxSince :
guard
let responseData : Network . BatchSubResponse < [ DirectMessage ] ? > = data as ? Network . BatchSubResponse < [ DirectMessage ] ? > ,
! responseData . failedToParseBody
else {
SNLog ( " Open group polling failed due to invalid inbox/outbox data. " )
return false
}
// D o u b l e o p t i o n a l b e c a u s e t h e s e r v e r c a n r e t u r n a ` 3 0 4 ` w i t h a n e m p t y b o d y
let messages : [ OpenGroupAPI . DirectMessage ] = ( ( responseData . body ? ? [ ] ) ? ? [ ] )
return ! messages . isEmpty
default : return false // N o c u s t o m h a n d l i n g n e e d e d
}
}
// I f t h e r e a r e n o r e m a i n i n g ' v a l i d R e s p o n s e s ' a n d t h e r e h a s n ' t b e e n a f a i l u r e t h e n t h e r e i s
// n o n e e d t o d o a n y t h i n g e l s e
guard ! validResponses . isEmpty || failureCount != 0 else { return }
// R e t r i e v e t h e c u r r e n t c a p a b i l i t y & g r o u p i n f o t o c h e c k i f a n y t h i n g c h a n g e d
let rooms : [ String ] = validResponses
. keys
. compactMap { endpoint -> String ? in
switch endpoint {
case . roomPollInfo ( let roomToken , _ ) : return roomToken
default : return nil
}
}
let currentInfo : ( capabilities : Capabilities , groups : [ OpenGroup ] ) ? = dependencies . storage . read { db in
let allCapabilities : [ Capability ] = try Capability
. filter ( Capability . Columns . openGroupServer = = server )
. fetchAll ( db )
let capabilities : Capabilities = Capabilities (
capabilities : allCapabilities
. filter { ! $0 . isMissing }
. map { $0 . variant } ,
missing : {
let missingCapabilities : [ Capability . Variant ] = allCapabilities
. filter { $0 . isMissing }
. map { $0 . variant }
return ( missingCapabilities . isEmpty ? nil : missingCapabilities )
} ( )
)
let openGroupIds : [ String ] = rooms
. map { OpenGroup . idFor ( roomToken : $0 , server : server ) }
let groups : [ OpenGroup ] = try OpenGroup
. filter ( ids : openGroupIds )
. fetchAll ( db )
return ( capabilities , groups )
}
let changedResponses : [ OpenGroupAPI . Endpoint : Any ] = validResponses
. filter { endpoint , data in
switch endpoint {
case . capabilities :
guard
let responseData : Network . BatchSubResponse < Capabilities > = data as ? Network . BatchSubResponse < Capabilities > ,
let responseBody : Capabilities = responseData . body
else { return false }
return ( responseBody != currentInfo ? . capabilities )
case . roomPollInfo ( let roomToken , _ ) :
guard
let responseData : Network . BatchSubResponse < RoomPollInfo > = data as ? Network . BatchSubResponse < RoomPollInfo > ,
let responseBody : RoomPollInfo = responseData . body
else { return false }
guard let existingOpenGroup : OpenGroup = currentInfo ? . groups . first ( where : { $0 . roomToken = = roomToken } ) else {
return true
}
// N o t e : T h i s m i g h t n e e d t o b e u p d a t e d i n t h e f u t u r e w h e n w e s t a r t t r a c k i n g
// u s e r p e r m i s s i o n s i f c h a n g e s t o p e r m i s s i o n s d o n ' t t r i g g e r a c h a n g e t o
// t h e ' i n f o U p d a t e s '
return (
responseBody . activeUsers != existingOpenGroup . userCount || (
responseBody . details != nil &&
responseBody . details ? . infoUpdates != existingOpenGroup . infoUpdates
)
)
default : return true
}
}
// I f t h e r e a r e n o ' c h a n g e d R e s p o n s e s ' a n d t h e r e h a s n ' t b e e n a f a i l u r e t h e n t h e r e i s
// n o n e e d t o d o a n y t h i n g e l s e
guard ! changedResponses . isEmpty || failureCount != 0 else { return }
dependencies . storage . write { db in
// R e s e t t h e f a i l u r e c o u n t
if failureCount > 0 {
try OpenGroup
. filter ( OpenGroup . Columns . server = = server )
. updateAll ( db , OpenGroup . Columns . pollFailureCount . set ( to : 0 ) )
}
try changedResponses . forEach { endpoint , data in
switch endpoint {
case . capabilities :
guard
let responseData : Network . BatchSubResponse < Capabilities > = data as ? Network . BatchSubResponse < Capabilities > ,
let responseBody : Capabilities = responseData . body
else { return }
OpenGroupManager . handleCapabilities (
db ,
capabilities : responseBody ,
on : server
)
case . roomPollInfo ( let roomToken , _ ) :
guard
let responseData : Network . BatchSubResponse < RoomPollInfo > = data as ? Network . BatchSubResponse < RoomPollInfo > ,
let responseBody : RoomPollInfo = responseData . body
else { return }
try OpenGroupManager . handlePollInfo (
db ,
pollInfo : responseBody ,
publicKey : nil ,
for : roomToken ,
on : server ,
using : dependencies
)
case . roomMessagesRecent ( let roomToken ) , . roomMessagesBefore ( let roomToken , _ ) , . roomMessagesSince ( let roomToken , _ ) :
guard
let responseData : Network . BatchSubResponse < [ Failable < Message > ] > = data as ? Network . BatchSubResponse < [ Failable < Message > ] > ,
let responseBody : [ Failable < Message > ] = responseData . body
else { return }
OpenGroupManager . handleMessages (
db ,
messages : responseBody . compactMap { $0 . value } ,
for : roomToken ,
on : server ,
using : dependencies
)
case . inbox , . inboxSince , . outbox , . outboxSince :
guard
let responseData : Network . BatchSubResponse < [ DirectMessage ] ? > = data as ? Network . BatchSubResponse < [ DirectMessage ] ? > ,
! responseData . failedToParseBody
else { return }
// D o u b l e o p t i o n a l b e c a u s e t h e s e r v e r c a n r e t u r n a ` 3 0 4 ` w i t h a n e m p t y b o d y
let messages : [ OpenGroupAPI . DirectMessage ] = ( ( responseData . body ? ? [ ] ) ? ? [ ] )
let fromOutbox : Bool = {
switch endpoint {
case . outbox , . outboxSince : return true
default : return false
}
} ( )
OpenGroupManager . handleDirectMessages (
db ,
messages : messages ,
fromOutbox : fromOutbox ,
on : server ,
using : dependencies
)
default : break // N o c u s t o m h a n d l i n g n e e d e d
}
}
}
}
// MARK: - C o n v e n i e n c e
fileprivate static func getInterval ( for failureCount : TimeInterval , minInterval : TimeInterval , maxInterval : TimeInterval ) -> TimeInterval {
// A r b i t r a r y b a c k o f f f a c t o r . . .
return min ( maxInterval , minInterval + pow ( 2 , failureCount ) )
}
}
}