fix: cleaned up pending removal job with tech design

pull/3052/head
Audric Ackermann 10 months ago
parent 9963287193
commit 40d3ddb244

@ -169,7 +169,10 @@ const GroupStatusText = ({ groupPk, pubkey }: { pubkey: PubkeyType; groupPk: Gro
return (
<StyledGroupStatusText
data-testid={'group-member-status-text'}
isFailure={(groupPromotionFailed && !groupPromotionSending) || (groupInviteFailed && !groupInviteSending)}
isFailure={
(groupPromotionFailed && !groupPromotionSending) ||
(groupInviteFailed && !groupInviteSending)
}
>
{statusText}
</StyledGroupStatusText>

@ -15,10 +15,14 @@ import {
StoreGroupMessageSubRequest,
} from '../SnodeRequestTypes';
import { SnodeNamespaces } from '../namespaces';
import { GroupUpdateDeleteMemberContentMessage } from '../../../messages/outgoing/controlMessage/group_v2/to_group/GroupUpdateDeleteMemberContentMessage';
import { GroupUpdateMemberLeftNotificationMessage } from '../../../messages/outgoing/controlMessage/group_v2/to_group/GroupUpdateMemberLeftNotificationMessage';
export type StoreMessageToSubRequestType =
| GroupUpdateMemberChangeMessage
| GroupUpdateInfoChangeMessage;
| GroupUpdateInfoChangeMessage
| GroupUpdateDeleteMemberContentMessage
| GroupUpdateMemberLeftNotificationMessage;
async function makeGroupMessageSubRequest(
updateMessages: Array<StoreMessageToSubRequestType | null>,

@ -46,6 +46,7 @@ import { SessionUtilContact } from '../utils/libsession/libsession_utils_contact
import { SessionUtilConvoInfoVolatile } from '../utils/libsession/libsession_utils_convo_info_volatile';
import { SessionUtilUserGroups } from '../utils/libsession/libsession_utils_user_groups';
import { DisappearingMessages } from '../disappearing_messages';
import { StoreGroupRequestFactory } from '../apis/snode_api/factories/StoreGroupRequestFactory';
let instance: ConvoController | null;
@ -620,7 +621,7 @@ async function leaveClosedGroup(groupPk: PubkeyType | GroupPubkeyType, fromSyncM
if (PubKey.is03Pubkey(groupPk)) {
const group = await UserGroupsWrapperActions.getGroup(groupPk);
if (!group) {
if (!group || (!group.secretKey && !group.authData)) {
throw new Error('leaveClosedGroup: group from UserGroupsWrapperActions is null ');
}
const createAtNetworkTimestamp = GetNetworkTime.now();
@ -644,9 +645,16 @@ async function leaveClosedGroup(groupPk: PubkeyType | GroupPubkeyType, fromSyncM
// We might not be able to send our leaving messages (no encryption keypair, we were already removed, no network, etc).
// If that happens, we should just remove everything from our current user.
try {
const results = await MessageSender.sendUnencryptedDataToSnode({
const storeRequests = await StoreGroupRequestFactory.makeGroupMessageSubRequest(
[ourLeavingNotificationMessage, ourLeavingMessage],
{
authData: group.authData,
secretKey: group.secretKey,
}
);
const results = await MessageSender.sendEncryptedDataToSnode({
destination: groupPk,
messages: [ourLeavingNotificationMessage, ourLeavingMessage],
sortedSubRequests: storeRequests,
method: 'sequence',
});

@ -49,8 +49,6 @@ import { MessageEncrypter } from '../crypto/MessageEncrypter';
import { ContentMessage } from '../messages/outgoing';
import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage';
import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage';
import { GroupUpdateMemberLeftMessage } from '../messages/outgoing/controlMessage/group_v2/to_group/GroupUpdateMemberLeftMessage';
import { GroupUpdateMemberLeftNotificationMessage } from '../messages/outgoing/controlMessage/group_v2/to_group/GroupUpdateMemberLeftNotificationMessage';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
import { PubKey } from '../types';
import { OutgoingRawMessage } from '../types/RawMessage';
@ -632,65 +630,6 @@ async function sendEncryptedDataToSnode<T extends GroupPubkeyType | PubkeyType>(
}
}
/**
* Send an array of **not** preencrypted data to the corresponding swarm.
* Note: the messages order is not changed when sending them, but if they are not correctly sorted an exception will be thrown.
*
* @param messages the data to deposit (after encryption)
* @param destination the pubkey we should deposit those message to
*/
async function sendUnencryptedDataToSnode<T extends GroupPubkeyType | PubkeyType>({
destination,
messages,
method,
}: {
// keeping those as an array because the order needs to be enforced for some (groupkeys for instance)
destination: T;
messages: Array<
T extends GroupPubkeyType
? GroupUpdateMemberLeftMessage | GroupUpdateMemberLeftNotificationMessage
: never
>;
method: MethodBatchType;
}) {
const rawMessages: Array<EncryptAndWrapMessage> = messages.map(m => {
return {
networkTimestamp: m.createAtNetworkTimestamp,
plainTextBuffer: m.plainTextBuffer(),
ttl: m.ttl(),
destination: m.destination,
identifier: m.identifier,
namespace: m.namespace,
isSyncMessage: false,
};
});
const encryptedAndWrappedArr = await encryptMessagesAndWrap(
rawMessages.map(message => {
return {
destination,
plainTextBuffer: message.plainTextBuffer,
namespace: message.namespace,
ttl: message.ttl,
identifier: message.identifier,
networkTimestamp: message.networkTimestamp,
isSyncMessage: false,
};
})
);
const sortedSubRequests = await messagesToRequests({
encryptedAndWrappedArr,
destination,
});
return sendEncryptedDataToSnode({
destination,
sortedSubRequests,
method,
});
}
// ================ Open Group ================
/**
* Send a message to an open group v2.
@ -756,7 +695,6 @@ export const MessageSender = {
getSignatureParamsFromNamespace,
signSubRequests,
encryptMessagesAndWrap,
sendUnencryptedDataToSnode,
messagesToRequests,
};

@ -4,20 +4,22 @@ import { compact, isEmpty, isNumber } from 'lodash';
import { v4 } from 'uuid';
import { StringUtils } from '../..';
import { Data } from '../../../../data/data';
import {
deleteMessagesFromSwarmOnly,
unsendMessagesForEveryoneGroupV2,
} from '../../../../interactions/conversations/unsendingInteractions';
import { deleteMessagesFromSwarmOnly } from '../../../../interactions/conversations/unsendingInteractions';
import {
MetaGroupWrapperActions,
MultiEncryptWrapperActions,
UserGroupsWrapperActions,
} from '../../../../webworker/workers/browser/libsession_worker_interface';
import { StoreGroupRevokedRetrievableSubRequest } from '../../../apis/snode_api/SnodeRequestTypes';
import {
StoreGroupMessageSubRequest,
StoreGroupRevokedRetrievableSubRequest,
} from '../../../apis/snode_api/SnodeRequestTypes';
import { StoreGroupRequestFactory } from '../../../apis/snode_api/factories/StoreGroupRequestFactory';
import { GetNetworkTime } from '../../../apis/snode_api/getNetworkTime';
import { RevokeChanges, SnodeAPIRevoke } from '../../../apis/snode_api/revokeSubaccount';
import { WithSecretKey } from '../../../apis/snode_api/types';
import { concatUInt8Array } from '../../../crypto';
import { concatUInt8Array, getSodiumRenderer } from '../../../crypto';
import { GroupUpdateDeleteMemberContentMessage } from '../../../messages/outgoing/controlMessage/group_v2/to_group/GroupUpdateDeleteMemberContentMessage';
import { MessageSender } from '../../../sending';
import { fromHexToArray } from '../../String';
import { runners } from '../JobRunner';
@ -41,7 +43,7 @@ type JobExtraArgs = Pick<GroupPendingRemovalsPersistedData, 'groupPk'>;
async function addJob({ groupPk }: JobExtraArgs) {
const pendingRemovalJob = new GroupPendingRemovalsJob({
groupPk,
nextAttemptTimestamp: Date.now(),
nextAttemptTimestamp: Date.now() + 1000, // postpone by 1s
});
window.log.debug(`addGroupPendingRemovalJob: adding group pending removal for ${groupPk} `);
await runners.groupPendingRemovalJobRunner.addJob(pendingRemovalJob);
@ -130,6 +132,10 @@ class GroupPendingRemovalsJob extends PersistedJob<GroupPendingRemovalsPersisted
if (!pendingRemovals.length) {
return RunJobResult.Success;
}
const deleteMessagesOfMembers = pendingRemovals
.filter(m => m.removedStatus === 2)
.map(m => m.pubkeyHex);
const sessionIdsHex = pendingRemovals.map(m => m.pubkeyHex);
const sessionIds = sessionIdsHex.map(m => fromHexToArray(m).slice(1));
const currentGen = await MetaGroupWrapperActions.keyGetCurrentGen(groupPk);
@ -162,44 +168,69 @@ class GroupPendingRemovalsJob extends PersistedJob<GroupPendingRemovalsPersisted
revokeUnrevokeParams.revokeSubRequest ? revokeUnrevokeParams.revokeSubRequest : null,
revokeUnrevokeParams.unrevokeSubRequest ? revokeUnrevokeParams.unrevokeSubRequest : null,
]);
let storeRequests: Array<StoreGroupMessageSubRequest> = [];
if (deleteMessagesOfMembers.length) {
const deleteContentMsg = new GroupUpdateDeleteMemberContentMessage({
createAtNetworkTimestamp: GetNetworkTime.now(),
expirationType: 'unknown', // this is not displayed so not expiring.
expireTimer: 0,
groupPk,
memberSessionIds: deleteMessagesOfMembers,
messageHashes: [],
sodium: await getSodiumRenderer(),
secretKey: group.secretKey,
});
storeRequests = await StoreGroupRequestFactory.makeGroupMessageSubRequest(
[deleteContentMsg],
{ authData: null, secretKey: group.secretKey }
);
}
const sortedSubRequests = compact([multiEncryptRequest, ...revokeRequests, ...storeRequests]);
const result = await MessageSender.sendEncryptedDataToSnode({
sortedSubRequests: [multiEncryptRequest, ...revokeRequests],
sortedSubRequests,
destination: groupPk,
method: 'sequence',
});
if (result?.length === 2 && result[0].code === 200 && result[1].code === 200) {
// both requests success, remove the members from the group member entirely and sync
await MetaGroupWrapperActions.memberEraseAndRekey(groupPk, sessionIdsHex);
await GroupSync.queueNewJobIfNeeded(groupPk);
const deleteMessagesOf = pendingRemovals
.filter(m => m.removedStatus === 2)
.map(m => m.pubkeyHex);
if (deleteMessagesOf.length) {
if (
!result ||
result.length !== sortedSubRequests.length ||
result.some(m => m.code !== 200)
) {
window.log.warn(
'GroupPendingRemovalsJob: sendEncryptedDataToSnode unexpected result length or content. Scheduling retry if possible'
);
return RunJobResult.RetryJobIfPossible;
}
// both requests success, remove the members from the group member entirely and sync
await MetaGroupWrapperActions.memberEraseAndRekey(groupPk, sessionIdsHex);
await GroupSync.queueNewJobIfNeeded(groupPk);
try {
if (deleteMessagesOfMembers.length) {
const msgHashesToDeleteOnGroupSwarm =
await Data.deleteAllMessageFromSendersInConversation({
groupPk,
toRemove: sessionIdsHex,
toRemove: deleteMessagesOfMembers,
signatureTimestamp: GetNetworkTime.now(),
});
await unsendMessagesForEveryoneGroupV2({
allMessagesFrom: deleteMessagesOf,
groupPk,
msgsToDelete: [],
});
if (msgHashesToDeleteOnGroupSwarm.length) {
await deleteMessagesFromSwarmOnly(msgHashesToDeleteOnGroupSwarm, groupPk);
}
}
} catch (e) {
window.log.warn('GroupPendingRemovalsJob failable part failed with:', e.message);
}
// return true so this job is marked as a success and we don't need to retry it
return RunJobResult.Success;
} catch (e) {
window.log.warn('PendingRemovalJob failed with', e.message);
window.log.warn('GroupPendingRemovalsJob failed with', e.message);
return RunJobResult.RetryJobIfPossible;
}
// return true so this job is marked as a success and we don't need to retry it
return RunJobResult.Success;
}
public serializeJob() {

@ -770,6 +770,8 @@ async function handleMemberRemovedFromUI({
await checkWeAreAdminOrThrow(groupPk, 'handleMemberRemovedFromUI');
if (removeMembers.length === 0) {
window.log.debug('handleMemberRemovedFromUI: removeMembers is empty');
return;
}
@ -778,24 +780,31 @@ async function handleMemberRemovedFromUI({
removed: removeMembers,
});
// Note: We don't revoke members from here, instead we schedule a GroupPendingRemovals which will deal with the revokes of all of them together
if (removed.length === 0) {
window.log.debug('handleMemberRemovedFromUI: removeMembers after validation is empty');
// Send the groupUpdateDeleteMessage that can still be decrypted by those removed members to namespace ClosedGroupRevokedRetrievableMessages. (not when handling a MEMBER_LEFT message)
// Then, rekey the wrapper, but don't push the changes yet, we want to batch all of the requests to be made together in the `pushChangesToGroupSwarmIfNeeded` below.
if (removed.length) {
await MetaGroupWrapperActions.membersMarkPendingRemoval(groupPk, removed, alsoRemoveMessages);
return;
}
await GroupPendingRemovals.addJob({ groupPk });
const createAtNetworkTimestamp = GetNetworkTime.now();
// We need to mark the member as "pending removal" so any admins (including us) can deal with it as soon as possible
await MetaGroupWrapperActions.membersMarkPendingRemoval(groupPk, removed, alsoRemoveMessages);
await LibSessionUtil.saveDumpsToDb(groupPk);
// We don't revoke the member's token right away. Instead we schedule a `GroupPendingRemovals`
// which will deal with the revokes of all of them together.
await GroupPendingRemovals.addJob({ groupPk });
// Build a GroupUpdateMessage to be sent if that member was kicked by us.
const createAtNetworkTimestamp = GetNetworkTime.now();
const expiringDetails = DisappearingMessages.getExpireDetailsForOutgoingMesssage(
convo,
createAtNetworkTimestamp
);
let removedControlMessage: GroupUpdateMemberChangeMessage | null = null;
if (removed.length && !fromMemberLeftMessage) {
// We only add/send a message if that user didn't leave but was explicitely kicked.
// When we leaves by himself, he sends a GroupUpdateMessage.
if (!fromMemberLeftMessage) {
const msgModel = await ClosedGroup.addUpdateMessage({
diff: { type: 'kicked', kicked: removed },
convo,
@ -809,7 +818,7 @@ async function handleMemberRemovedFromUI({
? createAtNetworkTimestamp + expiringDetails.expireTimer
: null,
},
markAlreadySent: false, // the store below will mark the message as sent with dbMsgIdentifier
markAlreadySent: false, // the store below will mark the message as sent using dbMsgIdentifier
});
removedControlMessage = await getRemovedControlMessage({
adminSecretKey: group.secretKey,
@ -822,12 +831,13 @@ async function handleMemberRemovedFromUI({
});
}
// build the request for that GroupUpdateMessage if needed
const extraStoreRequests = await StoreGroupRequestFactory.makeGroupMessageSubRequest(
[removedControlMessage],
group
);
// revoked pubkeys, update messages, and libsession groups config in a single batch call
// Send the updated config (with changes to pending_removal) and that GroupUpdateMessage request (if any) as a sequence.
const sequenceResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk,
extraStoreRequests,

@ -1,6 +1,6 @@
import { expect } from 'chai';
import Sinon from 'sinon';
import { SnodeNamespace } from '../../../../session/apis/snode_api/namespaces';
import { SnodeNamespace, SnodeNamespaces } from '../../../../session/apis/snode_api/namespaces';
describe('maxSizeMap', () => {
afterEach(() => {
@ -24,4 +24,22 @@ describe('maxSizeMap', () => {
{ namespace: 5, maxSize: -8 },
]);
});
it('multiple namespaces config for is correct', () => {
expect(
SnodeNamespace.maxSizeMap([
SnodeNamespaces.ClosedGroupMessages,
SnodeNamespaces.ClosedGroupInfo,
SnodeNamespaces.ClosedGroupMembers,
SnodeNamespaces.ClosedGroupKeys,
SnodeNamespaces.ClosedGroupRevokedRetrievableMessages,
])
).to.be.deep.eq([
{ namespace: SnodeNamespaces.ClosedGroupMessages, maxSize: -2 }, // message has a priority of 10 so takes its own bucket
{ namespace: SnodeNamespaces.ClosedGroupInfo, maxSize: -8 }, // the other ones are sharing the next bucket
{ namespace: SnodeNamespaces.ClosedGroupMembers, maxSize: -8 },
{ namespace: SnodeNamespaces.ClosedGroupKeys, maxSize: -8 },
{ namespace: SnodeNamespaces.ClosedGroupRevokedRetrievableMessages, maxSize: -8 },
]);
});
});

@ -117,9 +117,11 @@ export class WorkerInterface {
private _removeJob(id: number) {
if (this._DEBUG) {
this._jobs[id].complete = true;
} else {
delete this._jobs[id];
return this._jobs[id];
}
const job = this._jobs[id];
delete this._jobs[id];
return job;
}
private _getJob(id: number) {

Loading…
Cancel
Save