@ -150,7 +150,7 @@ public class SwarmPoller: SwarmPollerType & PollerType {
. compactMap { $0 . value . data ? . messages . map { $0 . info . hash } }
. reduce ( [ ] , + )
var messageCount : Int = 0
var p rocessedMessages: [ ProcessedMessage ] = [ ]
var finalP rocessedMessages: [ ProcessedMessage ] = [ ]
var hadValidHashUpdate : Bool = false
return dependencies [ singleton : . storage ] . writePublisher { db -> ( configMessageJobs : [ Job ] , standardMessageJobs : [ Job ] , pollResult : PollResult ) in
@ -258,6 +258,9 @@ public class SwarmPoller: SwarmPollerType & PollerType {
}
}
// / M a k e s u r e t o a d d a n y s y n c h r o n o u s l y p r o c e s s e d m e s s a g e s t o t h e ` f i n a l P r o c e s s e d M e s s a g e s `
// / a s o t h e r w i s e t h e y w o u l d n ' t b e e m i t t e d b y t h e ` r e c e i v e d P o l l R e s p o n s e S u b j e c t `
finalProcessedMessages += processedMessages
return nil
}
. flatMap { $0 }
@ -266,8 +269,8 @@ public class SwarmPoller: SwarmPollerType & PollerType {
// t o c r e a t e m e s s a g e r e c e i v e j o b s o r m e s s w i t h c a c h e d h a s h e s )
guard shouldStoreMessages else {
messageCount += allProcessedMessages . count
p rocessedMessages += allProcessedMessages
return ( [ ] , [ ] , ( p rocessedMessages, rawMessageCount , messageCount , hadValidHashUpdate ) )
finalP rocessedMessages += allProcessedMessages
return ( [ ] , [ ] , ( finalP rocessedMessages, rawMessageCount , messageCount , hadValidHashUpdate ) )
}
// A d d a j o b t o p r o c e s s t h e c o n f i g m e s s a g e s f i r s t
@ -276,7 +279,7 @@ public class SwarmPoller: SwarmPollerType & PollerType {
. grouped { $0 . threadId }
. compactMap { threadId , threadMessages in
messageCount += threadMessages . count
p rocessedMessages += threadMessages
finalP rocessedMessages += threadMessages
let job : Job ? = Job (
variant : . configMessageReceive ,
@ -306,7 +309,7 @@ public class SwarmPoller: SwarmPollerType & PollerType {
. grouped { $0 . threadId }
. compactMap { threadId , threadMessages in
messageCount += threadMessages . count
p rocessedMessages += threadMessages
finalP rocessedMessages += threadMessages
let job : Job ? = Job (
variant : . messageReceive ,
@ -360,7 +363,7 @@ public class SwarmPoller: SwarmPollerType & PollerType {
otherKnownValidHashes : otherKnownHashes
)
return ( configMessageJobs , standardMessageJobs , ( p rocessedMessages, rawMessageCount , messageCount , hadValidHashUpdate ) )
return ( configMessageJobs , standardMessageJobs , ( finalP rocessedMessages, rawMessageCount , messageCount , hadValidHashUpdate ) )
}
}
. flatMap { [ dependencies ] ( configMessageJobs : [ Job ] , standardMessageJobs : [ Job ] , pollResult : PollResult ) -> AnyPublisher < PollResult , Error > in