diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index bec36c777..434548038 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -2568,7 +2568,7 @@ extension ConversationVC { else { return Just(()).eraseToAnyPublisher() } return viewModel.dependencies[singleton: .storage] - .writePublisher { [dependencies = viewModel.dependencies] db -> AnyPublisher in + .writePublisher { [dependencies = viewModel.dependencies] db in /// Remove any existing `infoGroupInfoInvited` interactions from the group (don't want to have a duplicate one from /// inside the group history) _ = try Interaction @@ -2587,52 +2587,39 @@ extension ConversationVC { isHidden: false ).upsert(db) - /// Actually trigger the approval - return try ClosedGroup - .approveGroup( + /// If we aren't creating a new thread (ie. sending a message request) and the user is not an admin + /// then schedule sending a `GroupUpdateInviteResponseMessage` to the group (this allows + /// other members to know that the user has joined the group) + if !isNewThread && group.groupIdentityPrivateKey == nil { + try MessageSender.send( db, - group: group, - calledFromConfig: nil, + message: GroupUpdateInviteResponseMessage( + isApproved: true, + sentTimestampMs: UInt64(timestampMs) + ), + interactionId: nil, + threadId: threadId, + threadVariant: threadVariant, using: dependencies ) - .map { _ in () } - .eraseToAnyPublisher() + } + + /// Actually trigger the approval + try ClosedGroup.approveGroup( + db, + group: group, + calledFromConfig: nil, + using: dependencies + ) } + .map { _ in () } + .catch { _ in Just(()).eraseToAnyPublisher() } .handleEvents( receiveOutput: { _ in // Update the UI updateNavigationBackStack() } ) - .flatMap { [dependencies = viewModel.dependencies] pollPublisher in - pollPublisher - .first() - .handleEvents( - receiveOutput: { _ in - /// If we aren't creating a new thread (ie. sending a message request) and the user is not an admin - /// then send a `GroupUpdateInviteResponseMessage` to the group (this allows other members - /// to know that the user has joined the group) - guard !isNewThread && group.groupIdentityPrivateKey == nil else { return } - - dependencies[singleton: .storage].write { db in - /// The user is not an admin so send a invite response - try MessageSender.send( - db, - message: GroupUpdateInviteResponseMessage( - isApproved: true, - sentTimestampMs: UInt64(timestampMs) - ), - interactionId: nil, - threadId: threadId, - threadVariant: threadVariant, - using: dependencies - ) - } - } - ) - .eraseToAnyPublisher() - } - .catch { _ in Just(()).eraseToAnyPublisher() } .eraseToAnyPublisher() default: return Just(()).eraseToAnyPublisher() diff --git a/SessionMessagingKit/Database/Models/ClosedGroup.swift b/SessionMessagingKit/Database/Models/ClosedGroup.swift index 04ceb93b3..f325dbde1 100644 --- a/SessionMessagingKit/Database/Models/ClosedGroup.swift +++ b/SessionMessagingKit/Database/Models/ClosedGroup.swift @@ -183,12 +183,12 @@ public extension ClosedGroup { } /// Approves the group and returns the `Poller.receivedPollResult` publisher for the group - @discardableResult static func approveGroup( + static func approveGroup( _ db: Database, group: ClosedGroup, calledFromConfig configTriggeringChange: ConfigDump.Variant?, using dependencies: Dependencies - ) throws -> AnyPublisher { + ) throws { guard let userED25519KeyPair: KeyPair = Identity.fetchUserEd25519KeyPair(db) else { throw MessageReceiverError.noUserED25519KeyPair } @@ -229,8 +229,7 @@ public extension ClosedGroup { } /// Start the poller - let poller: SwarmPollerType = dependencies.mutate(cache: .groupPollers) { $0.getOrCreatePoller(for: group.id) } - poller.startIfNeeded() + dependencies.mutate(cache: .groupPollers) { $0.getOrCreatePoller(for: group.id).startIfNeeded() } /// Subscribe for group push notifications if let token: String = dependencies[defaults: .standard, key: .deviceToken] { @@ -245,9 +244,6 @@ public extension ClosedGroup { .subscribe(on: DispatchQueue.global(qos: .userInitiated), using: dependencies) .sinkUntilComplete() } - - /// Return a publisher for the pollers poll results - return poller.receivedPollResponse } static func removeData( diff --git a/SessionMessagingKit/Jobs/MessageSendJob.swift b/SessionMessagingKit/Jobs/MessageSendJob.swift index 25ea6793b..fcb139b8f 100644 --- a/SessionMessagingKit/Jobs/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/MessageSendJob.swift @@ -64,7 +64,7 @@ public enum MessageSendJob: JobExecutor { // If the original interaction no longer exists then don't bother sending the message (ie. the // message was deleted before it even got sent) guard try Interaction.exists(db, id: interactionId) else { - Log.warn(.cat, "Failing (\(job.id ?? -1)) due to missing interaction") + Log.warn(.cat, "Failing \(messageType) (\(job.id ?? -1)) due to missing interaction") return (StorageError.objectNotFound, [], []) } @@ -80,7 +80,7 @@ public enum MessageSendJob: JobExecutor { // If there were failed attachments then this job should fail (can't send a // message which has associated attachments if the attachments fail to upload) guard !allAttachmentStateInfo.contains(where: { $0.state == .failedDownload }) else { - Log.info(.cat, "Failing (\(job.id ?? -1)) due to failed attachment upload") + Log.info(.cat, "Failing \(messageType) (\(job.id ?? -1)) due to failed attachment upload") return (AttachmentError.notUploaded, [], fileIds) } @@ -114,7 +114,7 @@ public enum MessageSendJob: JobExecutor { /// If we got an error when trying to retrieve the attachment state then this job is actually invalid so it /// should permanently fail guard attachmentState.error == nil else { - Log.error(.cat, "Failed due to invalid attachment state") + Log.error(.cat, "Failed \(messageType) (\(job.id ?? -1)) due to invalid attachment state") return failure(job, (attachmentState.error ?? MessageSenderError.invalidMessage), true) } @@ -160,7 +160,7 @@ public enum MessageSendJob: JobExecutor { } } - Log.info(.cat, "Deferring (\(job.id ?? -1)) due to pending attachment uploads") + Log.info(.cat, "Deferring \(messageType) (\(job.id ?? -1)) due to pending attachment uploads") return deferred(job) } @@ -168,6 +168,44 @@ public enum MessageSendJob: JobExecutor { messageFileIds = attachmentState.preparedFileIds } + /// If this message is being sent to an updated group then we should first make sure that we have a encryption keys + /// for the group before we try to send the message, if not then defer the job 1 second to give the poller the chance to + /// receive the keys + /// + /// **Note:** If we have already deferred this message once then we should only continue to defer if we have a config + /// for the message (this way we won't get stuck deferring permanently if config state isn't loaded and we will instead try, + /// and fail, to send the message) + var previousDeferralsMessage: String = "" + + switch details.destination { + case .closedGroup(let groupPublicKey) where groupPublicKey.starts(with: SessionId.Prefix.group.rawValue): + let deferalDuration: TimeInterval = 1 + let groupSessionId: SessionId = SessionId(.group, hex: groupPublicKey) + let numGroupKeys: Int = (try? LibSession.numKeys(groupSessionId: groupSessionId, using: dependencies)) + .defaulting(to: 0) + let deferCount: Int = dependencies[singleton: .jobRunner].deferCount(for: job.id, of: job.variant) + previousDeferralsMessage = " and \(.seconds(Double(deferCount) * deferalDuration), unit: .s) of deferrals" // stringlint:ignore + + guard + numGroupKeys > 0 && ( + deferCount == 0 || + dependencies[cache: .libSession].hasConfig(for: .groupKeys, sessionId: groupSessionId) + ) + else { + // Defer the job by 1s to give it a little more time to receive updated keys + let updatedJob: Job? = dependencies[singleton: .storage].write { db in + try job + .with(nextRunTimestamp: dependencies.dateNow.timeIntervalSince1970 + deferalDuration) + .upserted(db) + } + + Log.info(.cat, "Deferring \(messageType) (\(job.id ?? -1)) as we haven't received the group encryption keys yet") + return deferred(updatedJob ?? job) + } + + default: break + } + // Store the sentTimestamp from the message in case it fails due to a clockOutOfSync error let originalSentTimestampMs: UInt64? = details.message.sentTimestampMs let startTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970 @@ -196,11 +234,11 @@ public enum MessageSendJob: JobExecutor { receiveCompletion: { result in switch result { case .finished: - Log.info(.cat, "Completed sending \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s).") + Log.info(.cat, "Completed sending \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage).") success(job, false) case .failure(let error): - Log.info(.cat, "Failed to send \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s) due to error: \(error).") + Log.info(.cat, "Failed to send \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage) due to error: \(error).") // Actual error handling switch (error, details.message) { diff --git a/SessionMessagingKit/LibSession/Config Handling/LibSession+GroupKeys.swift b/SessionMessagingKit/LibSession/Config Handling/LibSession+GroupKeys.swift index 981db6eb0..752393e7d 100644 --- a/SessionMessagingKit/LibSession/Config Handling/LibSession+GroupKeys.swift +++ b/SessionMessagingKit/LibSession/Config Handling/LibSession+GroupKeys.swift @@ -137,6 +137,19 @@ internal extension LibSession { } } + static func numKeys( + groupSessionId: SessionId, + using dependencies: Dependencies + ) throws -> Int { + return try dependencies.mutate(cache: .libSession) { cache in + guard case .groupKeys(let conf, _, _) = cache.config(for: .groupKeys, sessionId: groupSessionId) else { + throw LibSessionError.invalidConfigObject + } + + return Int(groups_keys_size(conf)) + } + } + static func currentGeneration( groupSessionId: SessionId, using dependencies: Dependencies diff --git a/SessionMessagingKit/LibSession/LibSession+SessionMessagingKit.swift b/SessionMessagingKit/LibSession/LibSession+SessionMessagingKit.swift index 1c76de9fb..180d514e2 100644 --- a/SessionMessagingKit/LibSession/LibSession+SessionMessagingKit.swift +++ b/SessionMessagingKit/LibSession/LibSession+SessionMessagingKit.swift @@ -364,6 +364,10 @@ public extension LibSession { } } + public func hasConfig(for variant: ConfigDump.Variant, sessionId: SessionId) -> Bool { + return (configStore[sessionId, variant] != nil) + } + public func config(for variant: ConfigDump.Variant, sessionId: SessionId) -> Config? { return configStore[sessionId, variant] } @@ -697,6 +701,8 @@ public extension LibSession { public protocol LibSessionImmutableCacheType: ImmutableCacheType { var userSessionId: SessionId { get } var isEmpty: Bool { get } + + func hasConfig(for variant: ConfigDump.Variant, sessionId: SessionId) -> Bool } /// The majority `libSession` functions can only be accessed via the mutable cache because `libSession` isn't thread safe so if we try @@ -716,6 +722,7 @@ public protocol LibSessionCacheType: LibSessionImmutableCacheType, MutableCacheT userSessionId: SessionId, userEd25519KeyPair: KeyPair ) + func hasConfig(for variant: ConfigDump.Variant, sessionId: SessionId) -> Bool func config(for variant: ConfigDump.Variant, sessionId: SessionId) -> LibSession.Config? func setConfig(for variant: ConfigDump.Variant, sessionId: SessionId, to config: LibSession.Config) func removeConfigs(for sessionId: SessionId) @@ -783,6 +790,7 @@ private final class NoopLibSessionCache: LibSessionCacheType { userSessionId: SessionId, userEd25519KeyPair: KeyPair ) {} + func hasConfig(for variant: ConfigDump.Variant, sessionId: SessionId) -> Bool { return false } func config(for variant: ConfigDump.Variant, sessionId: SessionId) -> LibSession.Config? { return nil } func setConfig(for variant: ConfigDump.Variant, sessionId: SessionId, to config: LibSession.Config) {} func removeConfigs(for sessionId: SessionId) {} diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Groups.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Groups.swift index 35ac29676..5f46c5cbb 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Groups.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Groups.swift @@ -153,7 +153,7 @@ extension MessageReceiver { } /// This returns the `resultPublisher` for the group poller so can be ignored if we don't need to wait for the first poll to succeed - @discardableResult internal static func handleNewGroup( + internal static func handleNewGroup( _ db: Database, groupSessionId: String, groupIdentityPrivateKey: Data?, @@ -164,7 +164,7 @@ extension MessageReceiver { hasAlreadyBeenKicked: Bool, calledFromConfig configTriggeringChange: ConfigDump.Variant?, using dependencies: Dependencies - ) throws -> AnyPublisher { + ) throws { // Create the group try SessionThread.fetchOrCreate( db, @@ -202,9 +202,9 @@ extension MessageReceiver { } /// If the group wasn't already approved, is not in the invite state and the user hasn't been kicked from it then handle the approval process - guard !groupAlreadyApproved && !invited && !hasAlreadyBeenKicked else { return Just([]).eraseToAnyPublisher() } + guard !groupAlreadyApproved && !invited && !hasAlreadyBeenKicked else { return } - return try ClosedGroup.approveGroup( + try ClosedGroup.approveGroup( db, group: closedGroup, calledFromConfig: configTriggeringChange, @@ -421,29 +421,35 @@ extension MessageReceiver { let messageInfo: ClosedGroup.MessageInfo = { switch message.changeType { case .added: - return ClosedGroup.MessageInfo - .addedUsers( - hasCurrentUser: messageContainsCurrentUser, - names: names, - historyShared: message.historyShared - ) + return ClosedGroup.MessageInfo.addedUsers( + hasCurrentUser: messageContainsCurrentUser, + names: names, + historyShared: message.historyShared + ) case .removed: - return ClosedGroup.MessageInfo - .removedUsers( - hasCurrentUser: messageContainsCurrentUser, - names: names - ) + return ClosedGroup.MessageInfo.removedUsers( + hasCurrentUser: messageContainsCurrentUser, + names: names + ) case .promoted: - return ClosedGroup.MessageInfo - .promotedUsers( - hasCurrentUser: messageContainsCurrentUser, - names: names - ) + return ClosedGroup.MessageInfo.promotedUsers( + hasCurrentUser: messageContainsCurrentUser, + names: names + ) } }() + /// If the message is about adding the current user then we should remove any existing `infoGroupInfoInvited` interactions + /// from the group (don't want to have two different messages indicating the current user was added to the group) + if messageContainsCurrentUser && message.changeType == .added { + _ = try Interaction + .filter(Interaction.Columns.threadId == groupSessionId.hexString) + .filter(Interaction.Columns.variant == Interaction.Variant.infoGroupInfoInvited) + .deleteAll(db) + } + switch messageInfo.infoString(using: dependencies) { case .none: Log.warn(.messageReceiver, "Failed to encode member change info string.") case .some(let messageBody): @@ -834,7 +840,7 @@ extension MessageReceiver { groupSessionId: groupSessionId, using: dependencies ) - let initialPollPublisher: AnyPublisher = try MessageReceiver.handleNewGroup( + try MessageReceiver.handleNewGroup( db, groupSessionId: groupSessionId.hexString, groupIdentityPrivateKey: groupIdentityPrivateKey, @@ -862,20 +868,22 @@ extension MessageReceiver { switch serverHash { case .none: break case .some(let serverHash): - try? SnodeAPI - .preparedDeleteMessages( - serverHashes: [serverHash], - requireSuccessfulDeletion: false, - authMethod: try Authentication.with( - db, - swarmPublicKey: userSessionId.hexString, + db.afterNextTransaction { db in + try? SnodeAPI + .preparedDeleteMessages( + serverHashes: [serverHash], + requireSuccessfulDeletion: false, + authMethod: try Authentication.with( + db, + swarmPublicKey: userSessionId.hexString, + using: dependencies + ), using: dependencies - ), - using: dependencies - ) - .send(using: dependencies) - .subscribe(on: DispatchQueue.global(qos: .background), using: dependencies) - .sinkUntilComplete() + ) + .send(using: dependencies) + .subscribe(on: DispatchQueue.global(qos: .background), using: dependencies) + .sinkUntilComplete() + } } /// If the thread didn't already exist, or the user had previously been kicked but has since been re-added to the group, then insert @@ -936,28 +944,17 @@ extension MessageReceiver { /// If we aren't creating a new thread (ie. sending a message request) then send a /// `GroupUpdateInviteResponseMessage` to the group (this allows other members /// to know that the user has joined the group) - db.afterNextTransactionNested(using: dependencies) { _ in - initialPollPublisher - .first() - .subscribe(on: DispatchQueue.global(qos: .background), using: dependencies) - .sinkUntilComplete( - receiveCompletion: { _ in - dependencies[singleton: .storage].write { db in - try MessageSender.send( - db, - message: GroupUpdateInviteResponseMessage( - isApproved: true, - sentTimestampMs: dependencies[cache: .snodeAPI].currentOffsetTimestampMs() - ), - interactionId: nil, - threadId: groupSessionId.hexString, - threadVariant: .group, - using: dependencies - ) - } - } - ) - } + try MessageSender.send( + db, + message: GroupUpdateInviteResponseMessage( + isApproved: true, + sentTimestampMs: dependencies[cache: .snodeAPI].currentOffsetTimestampMs() + ), + interactionId: nil, + threadId: groupSessionId.hexString, + threadVariant: .group, + using: dependencies + ) /// If the sender wasn't approved this is a message request so we should notify the user about the invite case (false, _): diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+Groups.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+Groups.swift index d853ad016..23adb694d 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+Groups.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+Groups.swift @@ -136,7 +136,7 @@ extension MessageSender { .eraseToAnyPublisher() } .handleEvents( - receiveOutput: { groupSessionId, _, thread, _, members, preparedNotificationSubscription in + receiveOutput: { groupSessionId, _, thread, group, members, preparedNotificationSubscription in // Start polling dependencies .mutate(cache: .groupPollers) { $0.getOrCreatePoller(for: thread.id) } @@ -148,10 +148,10 @@ extension MessageSender { .subscribe(on: DispatchQueue.global(qos: .userInitiated), using: dependencies) .sinkUntilComplete() - // Save jobs for sending group member invitations dependencies[singleton: .storage].write { db in let userSessionId: SessionId = dependencies[cache: .general].sessionId + // Save jobs for sending group member invitations members .filter { $0.profileId != userSessionId.hexString } .compactMap { member -> (GroupMember, GroupInviteMemberJob.Details)? in @@ -185,6 +185,30 @@ extension MessageSender { canStartJob: true ) } + + // Schedule the "members added" control message to be sent to the group + if let privateKey: Data = group.groupIdentityPrivateKey { + try? MessageSender.send( + db, + message: GroupUpdateMemberChangeMessage( + changeType: .added, + memberSessionIds: members + .filter { $0.profileId != userSessionId.hexString } + .map { $0.profileId }, + historyShared: false, + sentTimestampMs: dependencies[cache: .snodeAPI].currentOffsetTimestampMs(), + authMethod: Authentication.groupAdmin( + groupSessionId: groupSessionId, + ed25519SecretKey: Array(privateKey) + ), + using: dependencies + ), + interactionId: nil, + threadId: thread.id, + threadVariant: .group, + using: dependencies + ) + } } } ) @@ -778,9 +802,11 @@ extension MessageSender { let changeTimestampMs: Int64 = dependencies[cache: .snodeAPI].currentOffsetTimestampMs() dependencies[singleton: .storage].writeAsync { db in + var membersReceivingPromotions: [(id: String, profile: Profile?)] = [] + // Update the libSession status for each member and schedule a job to send // the promotion message - try members.forEach { memberId, _ in + try members.forEach { memberId, profile in try LibSession.updateMemberStatus( db, groupSessionId: groupSessionId, @@ -799,6 +825,8 @@ extension MessageSender { switch (existingMember?.role, existingMember?.roleStatus) { case (.standard, _): + membersReceivingPromoations.append((memberId, profile)) + try GroupMember .filter(GroupMember.Columns.groupId == groupSessionId.hexString) .filter(GroupMember.Columns.profileId == memberId) @@ -838,8 +866,13 @@ extension MessageSender { } /// Send the admin changed message if desired - if sendAdminChangedMessage { + /// + /// **Note:** It's possible that this call could contain both members being promoted as well as admins + /// that are getting promotions re-sent to them - we only want to send an admin changed message if there + /// is a newly promoted member + if sendAdminChangedMessage && !membersReceivingPromotions.isEmpty { let userSessionId: SessionId = dependencies[cache: .general].sessionId + _ = try Interaction( threadId: groupSessionId.hexString, threadVariant: .group, @@ -847,8 +880,8 @@ extension MessageSender { variant: .infoGroupMembersUpdated, body: ClosedGroup.MessageInfo .promotedUsers( - hasCurrentUser: members.map { $0.id }.contains(userSessionId.hexString), - names: members + hasCurrentUser: membersReceivingPromotions.map { $0.id }.contains(userSessionId.hexString), + names: membersReceivingPromotions .sorted { lhs, rhs in lhs.id == userSessionId.hexString } .map { id, profile in profile?.displayName(for: .group) ?? @@ -865,7 +898,7 @@ extension MessageSender { db, message: GroupUpdateMemberChangeMessage( changeType: .promoted, - memberSessionIds: members.map { $0.id }, + memberSessionIds: membersReceivingPromotions.map { $0.id }, historyShared: false, sentTimestampMs: UInt64(changeTimestampMs), authMethod: try Authentication.with( diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 2eefbfb39..e2cc3eecc 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -33,6 +33,7 @@ public protocol JobRunnerType { // MARK: - State Management func jobInfoFor(jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant?) -> [Int64: JobRunner.JobInfo] + func deferCount(for jobId: Int64?, of variant: Job.Variant) -> Int func appDidFinishLaunching() func appDidBecomeActive() @@ -491,6 +492,12 @@ public final class JobRunner: JobRunnerType { } } + public func deferCount(for jobId: Int64?, of variant: Job.Variant) -> Int { + guard let jobId: Int64 = jobId else { return 0 } + + return (queues.wrappedValue[variant]?.deferLoopTracker.wrappedValue[jobId]?.count ?? 0) + } + public func appDidFinishLaunching() { // Flag that the JobRunner can start it's queues appReadyToStartQueues.mutate { $0 = true } @@ -1050,7 +1057,7 @@ public final class JobQueue: Hashable { fileprivate var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:]) fileprivate var currentlyRunningJobIds: Atomic> = Atomic([]) private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:]) - private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) + fileprivate var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) private let maxDeferralsPerSecond: Int fileprivate var hasPendingJobs: Bool { !pendingJobsQueue.wrappedValue.isEmpty }