@ -5,14 +5,19 @@ import { UserUtils } from '../..';
import { ConfigDumpData } from '../../../../data/configDump/configDump' ;
import { ReleasedFeatures } from '../../../../util/releaseFeature' ;
import { isSignInByLinking } from '../../../../util/storage' ;
import { isMetaWrapperType } from '../../../../webworker/workers/browser/libsession_worker_functions' ;
import { NotEmptyArrayOfBatchResults } from '../../../apis/snode_api/SnodeRequestTypes' ;
import { MetaGroupWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface' ;
import {
NotEmptyArrayOfBatchResults ,
StoreOnNodeData ,
} from '../../../apis/snode_api/SnodeRequestTypes' ;
import { GetNetworkTime } from '../../../apis/snode_api/getNetworkTime' ;
import { SnodeNamespaces } from '../../../apis/snode_api/namespaces' ;
import { TTL_DEFAULT } from '../../../constants' ;
import { getConversationController } from '../../../conversations' ;
import { SharedGroupConfigMessage } from '../../../messages/outgoing/controlMessage/SharedConfigMessage' ;
import { MessageSender } from '../../../sending/MessageSender' ;
import { PubKey } from '../../../types' ;
import { allowOnlyOneAtATime } from '../../Promise' ;
import { LibSessionUtil , OutgoingConfResult } from '../../libsession/libsession_utils' ;
import { LibSessionUtil , PendingChangesForGroup } from '../../libsession/libsession_utils' ;
import { runners } from '../JobRunner' ;
import {
AddJobCheckReturn ,
@ -20,9 +25,6 @@ import {
PersistedJob ,
RunJobResult ,
} from '../PersistedJob' ;
import { MetaGroupWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface' ;
import { SignalService } from '../../../../protobuf' ;
import { GroupConfigKind } from '../../../../types/ProtobufKind' ;
const defaultMsBetweenRetries = 15000 ; // a long time between retries, to avoid running multiple jobs at the same time, when one was postponed at the same time as one already planned (5s)
const defaultMaxAttempts = 2 ;
@ -33,13 +35,13 @@ const defaultMaxAttempts = 2;
* /
let lastRunConfigSyncJobTimestamp : number | null = null ;
export type SingleDestinationChanges = {
messages : Array < OutgoingConfResult< GroupConfigKind , SharedGroupConfigMessage > >;
type Group SingleDestinationChanges = {
messages : Array < PendingChangesForGroup >;
allOldHashes : Array < string > ;
} ;
type SuccessfulChange = {
message: SharedGroupConfigMessage ;
pushed: PendingChangesForGroup ;
updatedHash : string ;
} ;
@ -47,14 +49,24 @@ type SuccessfulChange = {
* Later in the syncing logic , we want to batch - send all the updates for a pubkey in a single batch call .
* To make this easier , this function prebuilds and merges together all the changes for each pubkey .
* /
async function retrieve SingleDestinationChanges(
async function retrieve Group SingleDestinationChanges(
groupPk : GroupPubkeyType
) : Promise < SingleDestinationChanges> {
) : Promise < Group SingleDestinationChanges> {
const outgoingConfResults = await LibSessionUtil . pendingChangesForGroup ( groupPk ) ;
const compactedHashes = compact ( outgoingConfResults . map ( m = > m . oldMessageHashes ) ) . flat ( ) ;
return { messages : outgoingConfResults , allOldHashes : compactedHashes } ;
const compactedHashes = compact ( [ . . . outgoingConfResults ] . map ( m = > m [ 1 ] . oldMessageHashes ) ) . flat ( ) ;
const sortedMessagesKeyFirst = compact (
[ . . . outgoingConfResults . keys ( ) ]
. sort ( ( a , b ) = > {
if ( a === 'GroupKeys' ) return - 1 ;
if ( b === 'GroupKeys' ) return 1 ;
return 0 ;
} )
. map ( key = > {
return outgoingConfResults . get ( key ) ;
} )
) ;
return { messages : sortedMessagesKeyFirst , allOldHashes : compactedHashes } ;
}
/ * *
@ -62,7 +74,7 @@ async function retrieveSingleDestinationChanges(
* /
function resultsToSuccessfulChange (
result : NotEmptyArrayOfBatchResults | null ,
request : SingleDestinationChanges
request : Group SingleDestinationChanges
) : Array < SuccessfulChange > {
const successfulChanges : Array < SuccessfulChange > = [ ] ;
@ -82,16 +94,13 @@ function resultsToSuccessfulChange(
for ( let j = 0 ; j < result . length ; j ++ ) {
const batchResult = result [ j ] ;
const messagePostedHashes = batchResult ? . body ? . hash ;
console . warn ( 'messagePostedHashes' , messagePostedHashes ) ;
if (
batchResult . code === 200 &&
isString ( messagePostedHashes ) &&
request . messages ? . [ j ] . message
) {
// the library keeps track of the hashes to push and pushed using the hashes now
if ( batchResult . code === 200 && isString ( messagePostedHashes ) && request . messages ? . [ j ] . data ) {
// libsession keeps track of the hashes to push and pushed using the hashes now
successfulChanges . push ( {
updatedHash : messagePostedHashes ,
message : request.messages?. [ j ] . message ,
pushed : request.messages?. [ j ] ,
} ) ;
}
}
@ -110,27 +119,23 @@ async function buildAndSaveDumpsToDB(
for ( let i = 0 ; i < changes . length ; i ++ ) {
const change = changes [ i ] ;
const variant = LibSessionUtil . groupKindToVariant ( change . message . kind , groupPk ) ;
if ( ! isMetaWrapperType ( variant ) ) {
throw new Error ( ` buildAndSaveDumpsToDB non metagroup variant: ${ variant } ` ) ;
}
const Kind = SignalService . SharedConfigMessage . Kind ;
switch ( change . message . kind ) {
case Kind . GROUP_INFO : {
toConfirm [ 1 ] . groupInfo = [ change . message . seqno . toNumber ( ) , change . updatedHash ] ;
switch ( change . pushed . namespace ) {
case SnodeNamespaces . ClosedGroupInfo : {
toConfirm [ 1 ] . groupInfo = [ change . pushed . seqno . toNumber ( ) , change . updatedHash ] ;
break ;
}
case Kind. GROUP_MEMBERS : {
toConfirm [ 1 ] . groupMember = [ change . message . seqno . toNumber ( ) , change . updatedHash ] ;
case SnodeNamespaces . ClosedGroupMembers : {
toConfirm [ 1 ] . groupMember = [ change . pushed . seqno . toNumber ( ) , change . updatedHash ] ;
break ;
}
case Kind. GROUP_KEYS : {
toConfirm [ 1 ] . groupKeys = [ change . message . seqno . toNumber ( ) , change . updatedHash ] ;
case SnodeNamespaces. ClosedGroupKeys : {
toConfirm [ 1 ] . groupKeys = [ change . pushed . seqno . toNumber ( ) , change . updatedHash ] ;
break ;
}
}
}
await MetaGroupWrapperActions . metaConfirmPushed ( . . . toConfirm ) ;
const metaNeedsDump = await MetaGroupWrapperActions . needsDump ( groupPk ) ;
// save the concatenated dumps as a single entry in the DB if any of the dumps had a need for dump
@ -207,7 +212,7 @@ class GroupSyncJob extends PersistedJob<GroupSyncPersistedData> {
if ( ! newGroupsReleased ) {
return RunJobResult . Success ;
}
const singleDestChanges = await retrieve SingleDestinationChanges( thisJobDestination ) ;
const singleDestChanges = await retrieve Group SingleDestinationChanges( thisJobDestination ) ;
// If there are no pending changes then the job can just complete (next time something
// is updated we want to try and run immediately so don't scuedule another run in this case)
@ -215,17 +220,18 @@ class GroupSyncJob extends PersistedJob<GroupSyncPersistedData> {
return RunJobResult . Success ;
}
const oldHashesToDelete = new Set ( singleDestChanges . allOldHashes ) ;
const msgs = singleDestChanges . messages . map ( item = > {
const msgs : Array < StoreOnNodeData > = singleDestChanges . messages . map ( item = > {
return {
namespace : item . namespace ,
pubkey : thisJobDestination ,
timestamp: item.message.timestamp ,
ttl : item.message.ttl( ) ,
message: item.message ,
networkTimestamp: GetNetworkTime.getNowWithNetworkOffset ( ) ,
ttl : TTL_DEFAULT.TTL_CONFIG ,
data: item.data ,
} ;
} ) ;
const result = await MessageSender . send Messages ToSnode(
const result = await MessageSender . send EncryptedData ToSnode(
msgs ,
thisJobDestination ,
oldHashesToDelete
@ -236,7 +242,7 @@ class GroupSyncJob extends PersistedJob<GroupSyncPersistedData> {
// we do a sequence call here. If we do not have the right expected number of results, consider it a failure
if ( ! isArray ( result ) || result . length !== expectedReplyLength ) {
window . log . info (
` Configuration SyncJob: unexpected result length: expected ${ expectedReplyLength } but got ${ result ? . length } `
` Group SyncJob: unexpected result length: expected ${ expectedReplyLength } but got ${ result ? . length } `
) ;
// this might be a 421 error (already handled) so let's retry this request a little bit later
return RunJobResult . RetryJobIfPossible ;