Fixed a couple of bugs revolving around handling group invitations

pull/894/head
Morgan Pretty 6 months ago
parent 4ab99f8276
commit 7b70ba03fc

@ -2568,7 +2568,7 @@ extension ConversationVC {
else { return Just(()).eraseToAnyPublisher() } else { return Just(()).eraseToAnyPublisher() }
return viewModel.dependencies[singleton: .storage] return viewModel.dependencies[singleton: .storage]
.writePublisher { [dependencies = viewModel.dependencies] db -> AnyPublisher<Void, Never> in .writePublisher { [dependencies = viewModel.dependencies] db in
/// Remove any existing `infoGroupInfoInvited` interactions from the group (don't want to have a duplicate one from /// Remove any existing `infoGroupInfoInvited` interactions from the group (don't want to have a duplicate one from
/// inside the group history) /// inside the group history)
_ = try Interaction _ = try Interaction
@ -2587,52 +2587,39 @@ extension ConversationVC {
isHidden: false isHidden: false
).upsert(db) ).upsert(db)
/// Actually trigger the approval /// If we aren't creating a new thread (ie. sending a message request) and the user is not an admin
return try ClosedGroup /// then schedule sending a `GroupUpdateInviteResponseMessage` to the group (this allows
.approveGroup( /// other members to know that the user has joined the group)
if !isNewThread && group.groupIdentityPrivateKey == nil {
try MessageSender.send(
db, db,
group: group, message: GroupUpdateInviteResponseMessage(
calledFromConfig: nil, isApproved: true,
sentTimestampMs: UInt64(timestampMs)
),
interactionId: nil,
threadId: threadId,
threadVariant: threadVariant,
using: dependencies using: dependencies
) )
.map { _ in () } }
.eraseToAnyPublisher()
/// Actually trigger the approval
try ClosedGroup.approveGroup(
db,
group: group,
calledFromConfig: nil,
using: dependencies
)
} }
.map { _ in () }
.catch { _ in Just(()).eraseToAnyPublisher() }
.handleEvents( .handleEvents(
receiveOutput: { _ in receiveOutput: { _ in
// Update the UI // Update the UI
updateNavigationBackStack() updateNavigationBackStack()
} }
) )
.flatMap { [dependencies = viewModel.dependencies] pollPublisher in
pollPublisher
.first()
.handleEvents(
receiveOutput: { _ in
/// If we aren't creating a new thread (ie. sending a message request) and the user is not an admin
/// then send a `GroupUpdateInviteResponseMessage` to the group (this allows other members
/// to know that the user has joined the group)
guard !isNewThread && group.groupIdentityPrivateKey == nil else { return }
dependencies[singleton: .storage].write { db in
/// The user is not an admin so send a invite response
try MessageSender.send(
db,
message: GroupUpdateInviteResponseMessage(
isApproved: true,
sentTimestampMs: UInt64(timestampMs)
),
interactionId: nil,
threadId: threadId,
threadVariant: threadVariant,
using: dependencies
)
}
}
)
.eraseToAnyPublisher()
}
.catch { _ in Just(()).eraseToAnyPublisher() }
.eraseToAnyPublisher() .eraseToAnyPublisher()
default: return Just(()).eraseToAnyPublisher() default: return Just(()).eraseToAnyPublisher()

@ -183,12 +183,12 @@ public extension ClosedGroup {
} }
/// Approves the group and returns the `Poller.receivedPollResult` publisher for the group /// Approves the group and returns the `Poller.receivedPollResult` publisher for the group
@discardableResult static func approveGroup( static func approveGroup(
_ db: Database, _ db: Database,
group: ClosedGroup, group: ClosedGroup,
calledFromConfig configTriggeringChange: ConfigDump.Variant?, calledFromConfig configTriggeringChange: ConfigDump.Variant?,
using dependencies: Dependencies using dependencies: Dependencies
) throws -> AnyPublisher<GroupPoller.PollResponse, Never> { ) throws {
guard let userED25519KeyPair: KeyPair = Identity.fetchUserEd25519KeyPair(db) else { guard let userED25519KeyPair: KeyPair = Identity.fetchUserEd25519KeyPair(db) else {
throw MessageReceiverError.noUserED25519KeyPair throw MessageReceiverError.noUserED25519KeyPair
} }
@ -229,8 +229,7 @@ public extension ClosedGroup {
} }
/// Start the poller /// Start the poller
let poller: SwarmPollerType = dependencies.mutate(cache: .groupPollers) { $0.getOrCreatePoller(for: group.id) } dependencies.mutate(cache: .groupPollers) { $0.getOrCreatePoller(for: group.id).startIfNeeded() }
poller.startIfNeeded()
/// Subscribe for group push notifications /// Subscribe for group push notifications
if let token: String = dependencies[defaults: .standard, key: .deviceToken] { if let token: String = dependencies[defaults: .standard, key: .deviceToken] {
@ -245,9 +244,6 @@ public extension ClosedGroup {
.subscribe(on: DispatchQueue.global(qos: .userInitiated), using: dependencies) .subscribe(on: DispatchQueue.global(qos: .userInitiated), using: dependencies)
.sinkUntilComplete() .sinkUntilComplete()
} }
/// Return a publisher for the pollers poll results
return poller.receivedPollResponse
} }
static func removeData( static func removeData(

@ -64,7 +64,7 @@ public enum MessageSendJob: JobExecutor {
// If the original interaction no longer exists then don't bother sending the message (ie. the // If the original interaction no longer exists then don't bother sending the message (ie. the
// message was deleted before it even got sent) // message was deleted before it even got sent)
guard try Interaction.exists(db, id: interactionId) else { guard try Interaction.exists(db, id: interactionId) else {
Log.warn(.cat, "Failing (\(job.id ?? -1)) due to missing interaction") Log.warn(.cat, "Failing \(messageType) (\(job.id ?? -1)) due to missing interaction")
return (StorageError.objectNotFound, [], []) return (StorageError.objectNotFound, [], [])
} }
@ -80,7 +80,7 @@ public enum MessageSendJob: JobExecutor {
// If there were failed attachments then this job should fail (can't send a // If there were failed attachments then this job should fail (can't send a
// message which has associated attachments if the attachments fail to upload) // message which has associated attachments if the attachments fail to upload)
guard !allAttachmentStateInfo.contains(where: { $0.state == .failedDownload }) else { guard !allAttachmentStateInfo.contains(where: { $0.state == .failedDownload }) else {
Log.info(.cat, "Failing (\(job.id ?? -1)) due to failed attachment upload") Log.info(.cat, "Failing \(messageType) (\(job.id ?? -1)) due to failed attachment upload")
return (AttachmentError.notUploaded, [], fileIds) return (AttachmentError.notUploaded, [], fileIds)
} }
@ -114,7 +114,7 @@ public enum MessageSendJob: JobExecutor {
/// If we got an error when trying to retrieve the attachment state then this job is actually invalid so it /// If we got an error when trying to retrieve the attachment state then this job is actually invalid so it
/// should permanently fail /// should permanently fail
guard attachmentState.error == nil else { guard attachmentState.error == nil else {
Log.error(.cat, "Failed due to invalid attachment state") Log.error(.cat, "Failed \(messageType) (\(job.id ?? -1)) due to invalid attachment state")
return failure(job, (attachmentState.error ?? MessageSenderError.invalidMessage), true) return failure(job, (attachmentState.error ?? MessageSenderError.invalidMessage), true)
} }
@ -160,7 +160,7 @@ public enum MessageSendJob: JobExecutor {
} }
} }
Log.info(.cat, "Deferring (\(job.id ?? -1)) due to pending attachment uploads") Log.info(.cat, "Deferring \(messageType) (\(job.id ?? -1)) due to pending attachment uploads")
return deferred(job) return deferred(job)
} }
@ -168,6 +168,44 @@ public enum MessageSendJob: JobExecutor {
messageFileIds = attachmentState.preparedFileIds messageFileIds = attachmentState.preparedFileIds
} }
/// If this message is being sent to an updated group then we should first make sure that we have a encryption keys
/// for the group before we try to send the message, if not then defer the job 1 second to give the poller the chance to
/// receive the keys
///
/// **Note:** If we have already deferred this message once then we should only continue to defer if we have a config
/// for the message (this way we won't get stuck deferring permanently if config state isn't loaded and we will instead try,
/// and fail, to send the message)
var previousDeferralsMessage: String = ""
switch details.destination {
case .closedGroup(let groupPublicKey) where groupPublicKey.starts(with: SessionId.Prefix.group.rawValue):
let deferalDuration: TimeInterval = 1
let groupSessionId: SessionId = SessionId(.group, hex: groupPublicKey)
let numGroupKeys: Int = (try? LibSession.numKeys(groupSessionId: groupSessionId, using: dependencies))
.defaulting(to: 0)
let deferCount: Int = dependencies[singleton: .jobRunner].deferCount(for: job.id, of: job.variant)
previousDeferralsMessage = " and \(.seconds(Double(deferCount) * deferalDuration), unit: .s) of deferrals" // stringlint:ignore
guard
numGroupKeys > 0 && (
deferCount == 0 ||
dependencies[cache: .libSession].hasConfig(for: .groupKeys, sessionId: groupSessionId)
)
else {
// Defer the job by 1s to give it a little more time to receive updated keys
let updatedJob: Job? = dependencies[singleton: .storage].write { db in
try job
.with(nextRunTimestamp: dependencies.dateNow.timeIntervalSince1970 + deferalDuration)
.upserted(db)
}
Log.info(.cat, "Deferring \(messageType) (\(job.id ?? -1)) as we haven't received the group encryption keys yet")
return deferred(updatedJob ?? job)
}
default: break
}
// Store the sentTimestamp from the message in case it fails due to a clockOutOfSync error // Store the sentTimestamp from the message in case it fails due to a clockOutOfSync error
let originalSentTimestampMs: UInt64? = details.message.sentTimestampMs let originalSentTimestampMs: UInt64? = details.message.sentTimestampMs
let startTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970 let startTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970
@ -196,11 +234,11 @@ public enum MessageSendJob: JobExecutor {
receiveCompletion: { result in receiveCompletion: { result in
switch result { switch result {
case .finished: case .finished:
Log.info(.cat, "Completed sending \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s).") Log.info(.cat, "Completed sending \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage).")
success(job, false) success(job, false)
case .failure(let error): case .failure(let error):
Log.info(.cat, "Failed to send \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s) due to error: \(error).") Log.info(.cat, "Failed to send \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage) due to error: \(error).")
// Actual error handling // Actual error handling
switch (error, details.message) { switch (error, details.message) {

@ -137,6 +137,19 @@ internal extension LibSession {
} }
} }
static func numKeys(
groupSessionId: SessionId,
using dependencies: Dependencies
) throws -> Int {
return try dependencies.mutate(cache: .libSession) { cache in
guard case .groupKeys(let conf, _, _) = cache.config(for: .groupKeys, sessionId: groupSessionId) else {
throw LibSessionError.invalidConfigObject
}
return Int(groups_keys_size(conf))
}
}
static func currentGeneration( static func currentGeneration(
groupSessionId: SessionId, groupSessionId: SessionId,
using dependencies: Dependencies using dependencies: Dependencies

@ -364,6 +364,10 @@ public extension LibSession {
} }
} }
public func hasConfig(for variant: ConfigDump.Variant, sessionId: SessionId) -> Bool {
return (configStore[sessionId, variant] != nil)
}
public func config(for variant: ConfigDump.Variant, sessionId: SessionId) -> Config? { public func config(for variant: ConfigDump.Variant, sessionId: SessionId) -> Config? {
return configStore[sessionId, variant] return configStore[sessionId, variant]
} }
@ -697,6 +701,8 @@ public extension LibSession {
public protocol LibSessionImmutableCacheType: ImmutableCacheType { public protocol LibSessionImmutableCacheType: ImmutableCacheType {
var userSessionId: SessionId { get } var userSessionId: SessionId { get }
var isEmpty: Bool { get } var isEmpty: Bool { get }
func hasConfig(for variant: ConfigDump.Variant, sessionId: SessionId) -> Bool
} }
/// The majority `libSession` functions can only be accessed via the mutable cache because `libSession` isn't thread safe so if we try /// The majority `libSession` functions can only be accessed via the mutable cache because `libSession` isn't thread safe so if we try
@ -716,6 +722,7 @@ public protocol LibSessionCacheType: LibSessionImmutableCacheType, MutableCacheT
userSessionId: SessionId, userSessionId: SessionId,
userEd25519KeyPair: KeyPair userEd25519KeyPair: KeyPair
) )
func hasConfig(for variant: ConfigDump.Variant, sessionId: SessionId) -> Bool
func config(for variant: ConfigDump.Variant, sessionId: SessionId) -> LibSession.Config? func config(for variant: ConfigDump.Variant, sessionId: SessionId) -> LibSession.Config?
func setConfig(for variant: ConfigDump.Variant, sessionId: SessionId, to config: LibSession.Config) func setConfig(for variant: ConfigDump.Variant, sessionId: SessionId, to config: LibSession.Config)
func removeConfigs(for sessionId: SessionId) func removeConfigs(for sessionId: SessionId)
@ -783,6 +790,7 @@ private final class NoopLibSessionCache: LibSessionCacheType {
userSessionId: SessionId, userSessionId: SessionId,
userEd25519KeyPair: KeyPair userEd25519KeyPair: KeyPair
) {} ) {}
func hasConfig(for variant: ConfigDump.Variant, sessionId: SessionId) -> Bool { return false }
func config(for variant: ConfigDump.Variant, sessionId: SessionId) -> LibSession.Config? { return nil } func config(for variant: ConfigDump.Variant, sessionId: SessionId) -> LibSession.Config? { return nil }
func setConfig(for variant: ConfigDump.Variant, sessionId: SessionId, to config: LibSession.Config) {} func setConfig(for variant: ConfigDump.Variant, sessionId: SessionId, to config: LibSession.Config) {}
func removeConfigs(for sessionId: SessionId) {} func removeConfigs(for sessionId: SessionId) {}

@ -153,7 +153,7 @@ extension MessageReceiver {
} }
/// This returns the `resultPublisher` for the group poller so can be ignored if we don't need to wait for the first poll to succeed /// This returns the `resultPublisher` for the group poller so can be ignored if we don't need to wait for the first poll to succeed
@discardableResult internal static func handleNewGroup( internal static func handleNewGroup(
_ db: Database, _ db: Database,
groupSessionId: String, groupSessionId: String,
groupIdentityPrivateKey: Data?, groupIdentityPrivateKey: Data?,
@ -164,7 +164,7 @@ extension MessageReceiver {
hasAlreadyBeenKicked: Bool, hasAlreadyBeenKicked: Bool,
calledFromConfig configTriggeringChange: ConfigDump.Variant?, calledFromConfig configTriggeringChange: ConfigDump.Variant?,
using dependencies: Dependencies using dependencies: Dependencies
) throws -> AnyPublisher<GroupPoller.PollResponse, Never> { ) throws {
// Create the group // Create the group
try SessionThread.fetchOrCreate( try SessionThread.fetchOrCreate(
db, db,
@ -202,9 +202,9 @@ extension MessageReceiver {
} }
/// If the group wasn't already approved, is not in the invite state and the user hasn't been kicked from it then handle the approval process /// If the group wasn't already approved, is not in the invite state and the user hasn't been kicked from it then handle the approval process
guard !groupAlreadyApproved && !invited && !hasAlreadyBeenKicked else { return Just([]).eraseToAnyPublisher() } guard !groupAlreadyApproved && !invited && !hasAlreadyBeenKicked else { return }
return try ClosedGroup.approveGroup( try ClosedGroup.approveGroup(
db, db,
group: closedGroup, group: closedGroup,
calledFromConfig: configTriggeringChange, calledFromConfig: configTriggeringChange,
@ -421,29 +421,35 @@ extension MessageReceiver {
let messageInfo: ClosedGroup.MessageInfo = { let messageInfo: ClosedGroup.MessageInfo = {
switch message.changeType { switch message.changeType {
case .added: case .added:
return ClosedGroup.MessageInfo return ClosedGroup.MessageInfo.addedUsers(
.addedUsers( hasCurrentUser: messageContainsCurrentUser,
hasCurrentUser: messageContainsCurrentUser, names: names,
names: names, historyShared: message.historyShared
historyShared: message.historyShared )
)
case .removed: case .removed:
return ClosedGroup.MessageInfo return ClosedGroup.MessageInfo.removedUsers(
.removedUsers( hasCurrentUser: messageContainsCurrentUser,
hasCurrentUser: messageContainsCurrentUser, names: names
names: names )
)
case .promoted: case .promoted:
return ClosedGroup.MessageInfo return ClosedGroup.MessageInfo.promotedUsers(
.promotedUsers( hasCurrentUser: messageContainsCurrentUser,
hasCurrentUser: messageContainsCurrentUser, names: names
names: names )
)
} }
}() }()
/// If the message is about adding the current user then we should remove any existing `infoGroupInfoInvited` interactions
/// from the group (don't want to have two different messages indicating the current user was added to the group)
if messageContainsCurrentUser && message.changeType == .added {
_ = try Interaction
.filter(Interaction.Columns.threadId == groupSessionId.hexString)
.filter(Interaction.Columns.variant == Interaction.Variant.infoGroupInfoInvited)
.deleteAll(db)
}
switch messageInfo.infoString(using: dependencies) { switch messageInfo.infoString(using: dependencies) {
case .none: Log.warn(.messageReceiver, "Failed to encode member change info string.") case .none: Log.warn(.messageReceiver, "Failed to encode member change info string.")
case .some(let messageBody): case .some(let messageBody):
@ -834,7 +840,7 @@ extension MessageReceiver {
groupSessionId: groupSessionId, groupSessionId: groupSessionId,
using: dependencies using: dependencies
) )
let initialPollPublisher: AnyPublisher<GroupPoller.PollResponse, Never> = try MessageReceiver.handleNewGroup( try MessageReceiver.handleNewGroup(
db, db,
groupSessionId: groupSessionId.hexString, groupSessionId: groupSessionId.hexString,
groupIdentityPrivateKey: groupIdentityPrivateKey, groupIdentityPrivateKey: groupIdentityPrivateKey,
@ -862,20 +868,22 @@ extension MessageReceiver {
switch serverHash { switch serverHash {
case .none: break case .none: break
case .some(let serverHash): case .some(let serverHash):
try? SnodeAPI db.afterNextTransaction { db in
.preparedDeleteMessages( try? SnodeAPI
serverHashes: [serverHash], .preparedDeleteMessages(
requireSuccessfulDeletion: false, serverHashes: [serverHash],
authMethod: try Authentication.with( requireSuccessfulDeletion: false,
db, authMethod: try Authentication.with(
swarmPublicKey: userSessionId.hexString, db,
swarmPublicKey: userSessionId.hexString,
using: dependencies
),
using: dependencies using: dependencies
), )
using: dependencies .send(using: dependencies)
) .subscribe(on: DispatchQueue.global(qos: .background), using: dependencies)
.send(using: dependencies) .sinkUntilComplete()
.subscribe(on: DispatchQueue.global(qos: .background), using: dependencies) }
.sinkUntilComplete()
} }
/// If the thread didn't already exist, or the user had previously been kicked but has since been re-added to the group, then insert /// If the thread didn't already exist, or the user had previously been kicked but has since been re-added to the group, then insert
@ -936,28 +944,17 @@ extension MessageReceiver {
/// If we aren't creating a new thread (ie. sending a message request) then send a /// If we aren't creating a new thread (ie. sending a message request) then send a
/// `GroupUpdateInviteResponseMessage` to the group (this allows other members /// `GroupUpdateInviteResponseMessage` to the group (this allows other members
/// to know that the user has joined the group) /// to know that the user has joined the group)
db.afterNextTransactionNested(using: dependencies) { _ in try MessageSender.send(
initialPollPublisher db,
.first() message: GroupUpdateInviteResponseMessage(
.subscribe(on: DispatchQueue.global(qos: .background), using: dependencies) isApproved: true,
.sinkUntilComplete( sentTimestampMs: dependencies[cache: .snodeAPI].currentOffsetTimestampMs()
receiveCompletion: { _ in ),
dependencies[singleton: .storage].write { db in interactionId: nil,
try MessageSender.send( threadId: groupSessionId.hexString,
db, threadVariant: .group,
message: GroupUpdateInviteResponseMessage( using: dependencies
isApproved: true, )
sentTimestampMs: dependencies[cache: .snodeAPI].currentOffsetTimestampMs()
),
interactionId: nil,
threadId: groupSessionId.hexString,
threadVariant: .group,
using: dependencies
)
}
}
)
}
/// If the sender wasn't approved this is a message request so we should notify the user about the invite /// If the sender wasn't approved this is a message request so we should notify the user about the invite
case (false, _): case (false, _):

@ -136,7 +136,7 @@ extension MessageSender {
.eraseToAnyPublisher() .eraseToAnyPublisher()
} }
.handleEvents( .handleEvents(
receiveOutput: { groupSessionId, _, thread, _, members, preparedNotificationSubscription in receiveOutput: { groupSessionId, _, thread, group, members, preparedNotificationSubscription in
// Start polling // Start polling
dependencies dependencies
.mutate(cache: .groupPollers) { $0.getOrCreatePoller(for: thread.id) } .mutate(cache: .groupPollers) { $0.getOrCreatePoller(for: thread.id) }
@ -148,10 +148,10 @@ extension MessageSender {
.subscribe(on: DispatchQueue.global(qos: .userInitiated), using: dependencies) .subscribe(on: DispatchQueue.global(qos: .userInitiated), using: dependencies)
.sinkUntilComplete() .sinkUntilComplete()
// Save jobs for sending group member invitations
dependencies[singleton: .storage].write { db in dependencies[singleton: .storage].write { db in
let userSessionId: SessionId = dependencies[cache: .general].sessionId let userSessionId: SessionId = dependencies[cache: .general].sessionId
// Save jobs for sending group member invitations
members members
.filter { $0.profileId != userSessionId.hexString } .filter { $0.profileId != userSessionId.hexString }
.compactMap { member -> (GroupMember, GroupInviteMemberJob.Details)? in .compactMap { member -> (GroupMember, GroupInviteMemberJob.Details)? in
@ -185,6 +185,30 @@ extension MessageSender {
canStartJob: true canStartJob: true
) )
} }
// Schedule the "members added" control message to be sent to the group
if let privateKey: Data = group.groupIdentityPrivateKey {
try? MessageSender.send(
db,
message: GroupUpdateMemberChangeMessage(
changeType: .added,
memberSessionIds: members
.filter { $0.profileId != userSessionId.hexString }
.map { $0.profileId },
historyShared: false,
sentTimestampMs: dependencies[cache: .snodeAPI].currentOffsetTimestampMs(),
authMethod: Authentication.groupAdmin(
groupSessionId: groupSessionId,
ed25519SecretKey: Array(privateKey)
),
using: dependencies
),
interactionId: nil,
threadId: thread.id,
threadVariant: .group,
using: dependencies
)
}
} }
} }
) )
@ -778,9 +802,11 @@ extension MessageSender {
let changeTimestampMs: Int64 = dependencies[cache: .snodeAPI].currentOffsetTimestampMs() let changeTimestampMs: Int64 = dependencies[cache: .snodeAPI].currentOffsetTimestampMs()
dependencies[singleton: .storage].writeAsync { db in dependencies[singleton: .storage].writeAsync { db in
var membersReceivingPromotions: [(id: String, profile: Profile?)] = []
// Update the libSession status for each member and schedule a job to send // Update the libSession status for each member and schedule a job to send
// the promotion message // the promotion message
try members.forEach { memberId, _ in try members.forEach { memberId, profile in
try LibSession.updateMemberStatus( try LibSession.updateMemberStatus(
db, db,
groupSessionId: groupSessionId, groupSessionId: groupSessionId,
@ -799,6 +825,8 @@ extension MessageSender {
switch (existingMember?.role, existingMember?.roleStatus) { switch (existingMember?.role, existingMember?.roleStatus) {
case (.standard, _): case (.standard, _):
membersReceivingPromoations.append((memberId, profile))
try GroupMember try GroupMember
.filter(GroupMember.Columns.groupId == groupSessionId.hexString) .filter(GroupMember.Columns.groupId == groupSessionId.hexString)
.filter(GroupMember.Columns.profileId == memberId) .filter(GroupMember.Columns.profileId == memberId)
@ -838,8 +866,13 @@ extension MessageSender {
} }
/// Send the admin changed message if desired /// Send the admin changed message if desired
if sendAdminChangedMessage { ///
/// **Note:** It's possible that this call could contain both members being promoted as well as admins
/// that are getting promotions re-sent to them - we only want to send an admin changed message if there
/// is a newly promoted member
if sendAdminChangedMessage && !membersReceivingPromotions.isEmpty {
let userSessionId: SessionId = dependencies[cache: .general].sessionId let userSessionId: SessionId = dependencies[cache: .general].sessionId
_ = try Interaction( _ = try Interaction(
threadId: groupSessionId.hexString, threadId: groupSessionId.hexString,
threadVariant: .group, threadVariant: .group,
@ -847,8 +880,8 @@ extension MessageSender {
variant: .infoGroupMembersUpdated, variant: .infoGroupMembersUpdated,
body: ClosedGroup.MessageInfo body: ClosedGroup.MessageInfo
.promotedUsers( .promotedUsers(
hasCurrentUser: members.map { $0.id }.contains(userSessionId.hexString), hasCurrentUser: membersReceivingPromotions.map { $0.id }.contains(userSessionId.hexString),
names: members names: membersReceivingPromotions
.sorted { lhs, rhs in lhs.id == userSessionId.hexString } .sorted { lhs, rhs in lhs.id == userSessionId.hexString }
.map { id, profile in .map { id, profile in
profile?.displayName(for: .group) ?? profile?.displayName(for: .group) ??
@ -865,7 +898,7 @@ extension MessageSender {
db, db,
message: GroupUpdateMemberChangeMessage( message: GroupUpdateMemberChangeMessage(
changeType: .promoted, changeType: .promoted,
memberSessionIds: members.map { $0.id }, memberSessionIds: membersReceivingPromotions.map { $0.id },
historyShared: false, historyShared: false,
sentTimestampMs: UInt64(changeTimestampMs), sentTimestampMs: UInt64(changeTimestampMs),
authMethod: try Authentication.with( authMethod: try Authentication.with(

@ -33,6 +33,7 @@ public protocol JobRunnerType {
// MARK: - State Management // MARK: - State Management
func jobInfoFor(jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant?) -> [Int64: JobRunner.JobInfo] func jobInfoFor(jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant?) -> [Int64: JobRunner.JobInfo]
func deferCount(for jobId: Int64?, of variant: Job.Variant) -> Int
func appDidFinishLaunching() func appDidFinishLaunching()
func appDidBecomeActive() func appDidBecomeActive()
@ -491,6 +492,12 @@ public final class JobRunner: JobRunnerType {
} }
} }
public func deferCount(for jobId: Int64?, of variant: Job.Variant) -> Int {
guard let jobId: Int64 = jobId else { return 0 }
return (queues.wrappedValue[variant]?.deferLoopTracker.wrappedValue[jobId]?.count ?? 0)
}
public func appDidFinishLaunching() { public func appDidFinishLaunching() {
// Flag that the JobRunner can start it's queues // Flag that the JobRunner can start it's queues
appReadyToStartQueues.mutate { $0 = true } appReadyToStartQueues.mutate { $0 = true }
@ -1050,7 +1057,7 @@ public final class JobQueue: Hashable {
fileprivate var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:]) fileprivate var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:])
fileprivate var currentlyRunningJobIds: Atomic<Set<Int64>> = Atomic([]) fileprivate var currentlyRunningJobIds: Atomic<Set<Int64>> = Atomic([])
private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:]) private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:])
private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) fileprivate var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
private let maxDeferralsPerSecond: Int private let maxDeferralsPerSecond: Int
fileprivate var hasPendingJobs: Bool { !pendingJobsQueue.wrappedValue.isEmpty } fileprivate var hasPendingJobs: Bool { !pendingJobsQueue.wrappedValue.isEmpty }

Loading…
Cancel
Save