From 9cf1419ca523faa8fe5df0f7d46227eb2bd764f2 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Wed, 25 Jan 2023 17:46:30 +1100 Subject: [PATCH] feat: add first try to build SharedConfigMessages --- package.json | 9 +- ts/components/dialog/EditProfileDialog.tsx | 39 ++- ts/components/leftpane/ActionsPanel.tsx | 44 +-- ts/data/data.ts | 250 ++++++++------- ts/interactions/conversationInteractions.ts | 4 + ts/mains/main_renderer.tsx | 4 +- ts/models/conversationAttributes.ts | 2 +- ts/node/database_utility.ts | 8 +- ts/node/migration/sessionMigrations.ts | 12 +- ts/node/sql.ts | 303 +++++++----------- ts/receiver/cache.ts | 71 ++-- ts/receiver/configMessage.ts | 69 ++-- ts/receiver/contentMessage.ts | 23 +- ts/receiver/receiver.ts | 30 +- ts/receiver/userProfileImageUpdates.ts | 1 + ts/session/apis/snode_api/retrieveRequest.ts | 1 - ts/session/apis/snode_api/swarmPolling.ts | 74 +++-- ts/session/group/closed-group.ts | 4 +- .../messages/incoming/IncomingMessage.ts | 2 +- ts/session/sending/MessageQueue.ts | 2 +- ts/session/utils/AttachmentsDownload.ts | 3 +- ts/session/utils/job_runners/JobRunner.ts | 7 +- .../job_runners/jobs/ConfigurationSyncJob.ts | 4 +- .../unit/utils/job_runner/FakeSleepForJob.ts | 8 +- .../unit/utils/job_runner/JobRunner_test.ts | 8 +- ts/types/sqlSharedTypes.ts | 39 +++ yarn.lock | 23 +- 27 files changed, 532 insertions(+), 512 deletions(-) diff --git a/package.json b/package.json index 10614b508..0273d4e89 100644 --- a/package.json +++ b/package.json @@ -98,6 +98,7 @@ "filesize": "3.6.1", "firstline": "1.2.1", "fs-extra": "9.0.0", + "git": "^0.1.5", "glob": "7.1.2", "image-type": "^4.1.0", "ip2country": "1.0.1", @@ -139,7 +140,7 @@ "rimraf": "2.6.2", "sanitize.css": "^12.0.1", "semver": "5.4.1", - "session_util_wrapper": "https://github.com/oxen-io/libsession-util-nodejs", + "session_util_wrapper": "/home/audric/pro/contribs/libsession-util-nodejs", "styled-components": "5.1.1", "uuid": "8.3.2" }, @@ -289,7 +290,11 @@ "StartupWMClass": "Session" }, "asarUnpack": "node_modules/spellchecker/vendor/hunspell_dictionaries", - "target": ["deb", "rpm", "freebsd"], + "target": [ + "deb", + "rpm", + "freebsd" + ], "icon": "build/icon-linux.icns" }, "asarUnpack": [ diff --git a/ts/components/dialog/EditProfileDialog.tsx b/ts/components/dialog/EditProfileDialog.tsx index 788eb2964..7ff3a70be 100644 --- a/ts/components/dialog/EditProfileDialog.tsx +++ b/ts/components/dialog/EditProfileDialog.tsx @@ -21,6 +21,14 @@ import { sanitizeSessionUsername } from '../../session/utils/String'; import { setLastProfileUpdateTimestamp } from '../../util/storage'; import { ConversationTypeEnum } from '../../models/conversationAttributes'; import { MAX_USERNAME_BYTES } from '../../session/constants'; +import { SharedConfigMessage } from '../../session/messages/outgoing/controlMessage/SharedConfigMessage'; +import { callLibSessionWorker } from '../../webworker/workers/browser/libsession_worker_interface'; +import { SignalService } from '../../protobuf'; +import Long from 'long'; +import { GetNetworkTime } from '../../session/apis/snode_api/getNetworkTime'; +import { getMessageQueue } from '../../session/sending'; +import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces'; +import { from_string } from 'libsodium-wrappers-sumo'; interface State { profileName: string; @@ -337,8 +345,35 @@ async function commitProfileEdits(newName: string, scaledAvatarUrl: string | nul } // do not update the avatar if it did not change conversation.setSessionDisplayNameNoCommit(newName); + // might be good to not trigger a sync if the name did not change await conversation.commit(); - await setLastProfileUpdateTimestamp(Date.now()); - await SyncUtils.forceSyncConfigurationNowIfNeeded(true); + + if (window.sessionFeatureFlags.useSharedUtilForUserConfig) { + await callLibSessionWorker(['UserConfig', 'setName', newName]); + const pointer = conversation.get('avatarPointer'); + const profileKey = conversation.get('profileKey'); + if (profileKey && pointer) { + await callLibSessionWorker([ + 'UserConfig', + 'setProfilePicture', + pointer, + from_string(profileKey), + ]); + } else { + await callLibSessionWorker(['UserConfig', 'setProfilePicture', '', new Uint8Array()]); + } + + const message = new SharedConfigMessage({ + data: (await callLibSessionWorker(['UserConfig', 'dump'])) as Uint8Array, + kind: SignalService.SharedConfigMessage.Kind.USER_PROFILE, + seqno: Long.fromNumber(0), + timestamp: GetNetworkTime.getNowWithNetworkOffset(), + }); + await getMessageQueue().sendSyncMessage({ message, namespace: SnodeNamespaces.UserProfile }); + } else { + await setLastProfileUpdateTimestamp(Date.now()); + + await SyncUtils.forceSyncConfigurationNowIfNeeded(true); + } } diff --git a/ts/components/leftpane/ActionsPanel.tsx b/ts/components/leftpane/ActionsPanel.tsx index 96b4b6882..247128de9 100644 --- a/ts/components/leftpane/ActionsPanel.tsx +++ b/ts/components/leftpane/ActionsPanel.tsx @@ -2,57 +2,52 @@ import React, { useEffect, useState } from 'react'; import { getConversationController } from '../../session/conversations'; import { syncConfigurationIfNeeded } from '../../session/utils/sync/syncUtils'; +import { useDispatch, useSelector } from 'react-redux'; import { Data, hasSyncedInitialConfigurationItem, lastAvatarUploadTimestamp, } from '../../data/data'; import { getMessageQueue } from '../../session/sending'; -import { useDispatch, useSelector } from 'react-redux'; // tslint:disable: no-submodule-imports import useInterval from 'react-use/lib/useInterval'; import useTimeoutFn from 'react-use/lib/useTimeoutFn'; -import { getOurNumber } from '../../state/selectors/user'; +import { clearSearch } from '../../state/ducks/search'; +import { resetOverlayMode, SectionType, showLeftPaneSection } from '../../state/ducks/section'; import { getOurPrimaryConversation, getUnreadMessageCount, } from '../../state/selectors/conversations'; import { getFocusedSection } from '../../state/selectors/section'; -import { clearSearch } from '../../state/ducks/search'; -import { resetOverlayMode, SectionType, showLeftPaneSection } from '../../state/ducks/section'; +import { getOurNumber } from '../../state/selectors/user'; import { cleanUpOldDecryptedMedias } from '../../session/crypto/DecryptedAttachmentsManager'; import { DURATION } from '../../session/constants'; -import { onionPathModal } from '../../state/ducks/modalDialog'; -import { uploadOurAvatar } from '../../interactions/conversationInteractions'; import { debounce, isEmpty, isString } from 'lodash'; +import { uploadOurAvatar } from '../../interactions/conversationInteractions'; +import { editProfileModal, onionPathModal } from '../../state/ducks/modalDialog'; // tslint:disable-next-line: no-import-side-effect no-submodule-imports -import { ActionPanelOnionStatusLight } from '../dialog/OnionStatusPathDialog'; +import { ipcRenderer } from 'electron'; import { loadDefaultRooms } from '../../session/apis/open_group_api/opengroupV2/ApiUtil'; import { getOpenGroupManager } from '../../session/apis/open_group_api/opengroupV2/OpenGroupManagerV2'; import { getSwarmPollingInstance } from '../../session/apis/snode_api'; +import { UserUtils } from '../../session/utils'; import { Avatar, AvatarSize } from '../avatar/Avatar'; +import { ActionPanelOnionStatusLight } from '../dialog/OnionStatusPathDialog'; import { SessionIconButton } from '../icon'; import { LeftPaneSectionContainer } from './LeftPaneSectionContainer'; -import { ipcRenderer } from 'electron'; -import { UserUtils } from '../../session/utils'; import { getLatestReleaseFromFileServer } from '../../session/apis/file_server_api/FileServerApi'; -import { switchThemeTo } from '../../themes/switchTheme'; -import { ThemeStateType } from '../../themes/constants/colors'; -import { isDarkTheme } from '../../state/selectors/theme'; import { forceRefreshRandomSnodePool } from '../../session/apis/snode_api/snodePool'; -import { SharedConfigMessage } from '../../session/messages/outgoing/controlMessage/SharedConfigMessage'; -import { SignalService } from '../../protobuf'; -import { GetNetworkTime } from '../../session/apis/snode_api/getNetworkTime'; -import Long from 'long'; -import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces'; import { initializeLibSessionUtilWrappers } from '../../session/utils/libsession/libsession_utils'; +import { isDarkTheme } from '../../state/selectors/theme'; +import { ThemeStateType } from '../../themes/constants/colors'; +import { switchThemeTo } from '../../themes/switchTheme'; const Section = (props: { type: SectionType }) => { const ourNumber = useSelector(getOurNumber); @@ -67,15 +62,7 @@ const Section = (props: { type: SectionType }) => { const handleClick = async () => { /* tslint:disable:no-void-expression */ if (type === SectionType.Profile) { - const message = new SharedConfigMessage({ - data: new Uint8Array([1, 2, 3]), - kind: SignalService.SharedConfigMessage.Kind.USER_PROFILE, - seqno: Long.fromNumber(0), - timestamp: GetNetworkTime.getNowWithNetworkOffset(), - }); - await getMessageQueue().sendSyncMessage({ message, namespace: SnodeNamespaces.UserProfile }); - console.warn('FIXME'); - // dispatch(editProfileModal({})); + dispatch(editProfileModal({})); } else if (type === SectionType.ColorMode) { const currentTheme = String(window.Events.getThemeSetting()); const newTheme = (isDarkMode @@ -220,10 +207,6 @@ const doAppStartUp = async () => { // TODO make this a job of the JobRunner debounce(triggerAvatarReUploadIfNeeded, 200); - // init the messageQueue. In the constructor, we add all not send messages - // this call does nothing except calling the constructor, which will continue sending message in the pipeline - void getMessageQueue().processAllPending(); - /* Postpone a little bit of the polling of sogs messages to let the swarm messages come in first. */ global.setTimeout(() => { void getOpenGroupManager().startPolling(); @@ -311,7 +294,6 @@ export const ActionsPanel = () => {
-
diff --git a/ts/data/data.ts b/ts/data/data.ts index bc5b92a1d..828d10ab0 100644 --- a/ts/data/data.ts +++ b/ts/data/data.ts @@ -11,7 +11,12 @@ import { HexKeyPair } from '../receiver/keypairs'; import { getConversationController } from '../session/conversations'; import { getSodiumRenderer } from '../session/crypto'; import { PubKey } from '../session/types'; -import { MsgDuplicateSearchOpenGroup, UpdateLastHashType } from '../types/sqlSharedTypes'; +import { + AsyncWrapper, + MsgDuplicateSearchOpenGroup, + UnprocessedDataNode, + UpdateLastHashType, +} from '../types/sqlSharedTypes'; import { ExpirationTimerOptions } from '../util/expiringMessages'; import { Storage } from '../util/storage'; import { channels } from './channels'; @@ -101,96 +106,6 @@ function _cleanData(data: any): any { return data; } -// we export them like this instead of directly with the `export function` cause this is helping a lot for testing -export const Data = { - shutdown, - close, - removeDB, - getPasswordHash, - - // items table logic - createOrUpdateItem, - getItemById, - getAllItems, - removeItemById, - - // guard nodes - getGuardNodes, - updateGuardNodes, - generateAttachmentKeyIfEmpty, - getSwarmNodesForPubkey, - updateSwarmNodesForPubkey, - getAllEncryptionKeyPairsForGroup, - getLatestClosedGroupEncryptionKeyPair, - addClosedGroupEncryptionKeyPair, - removeAllClosedGroupEncryptionKeyPairs, - saveConversation, - getConversationById, - removeConversation, - getAllConversations, - getPubkeysInPublicConversation, - searchConversations, - searchMessages, - searchMessagesInConversation, - cleanSeenMessages, - cleanLastHashes, - saveSeenMessageHashes, - updateLastHash, - saveMessage, - saveMessages, - removeMessage, - removeMessagesByIds, - getMessageIdsFromServerIds, - getMessageById, - getMessageBySenderAndSentAt, - getMessageByServerId, - filterAlreadyFetchedOpengroupMessage, - getMessageBySenderAndTimestamp, - getUnreadByConversation, - getUnreadCountByConversation, - markAllAsReadByConversationNoExpiration, - getMessageCountByType, - getMessagesByConversation, - getLastMessagesByConversation, - getLastMessageIdInConversation, - getLastMessageInConversation, - getOldestMessageInConversation, - getMessageCount, - getFirstUnreadMessageIdInConversation, - getFirstUnreadMessageWithMention, - hasConversationOutgoingMessage, - getLastHashBySnode, - getSeenMessagesByHashList, - removeAllMessagesInConversation, - getMessagesBySentAt, - getExpiredMessages, - getOutgoingWithoutExpiresAt, - getNextExpiringMessage, - getUnprocessedCount, - getAllUnprocessed, - getUnprocessedById, - saveUnprocessed, - updateUnprocessedAttempts, - updateUnprocessedWithData, - removeUnprocessed, - removeAllUnprocessed, - getNextAttachmentDownloadJobs, - saveAttachmentDownloadJob, - setAttachmentDownloadJobPending, - resetAttachmentDownloadPending, - removeAttachmentDownloadJob, - removeAllAttachmentDownloadJobs, - removeAll, - removeAllConversations, - cleanupOrphanedAttachments, - removeOtherData, - getMessagesWithVisualMediaAttachments, - getMessagesWithFileAttachments, - getSnodePoolFromDb, - updateSnodePoolOnDb, - fillWithTestData, -}; - // Basic async function shutdown(): Promise { // Stop accepting new SQL jobs, flush outstanding queue @@ -674,49 +589,42 @@ async function getNextExpiringMessage(): Promise { // Unprocessed -async function getUnprocessedCount(): Promise { +const getUnprocessedCount: AsyncWrapper = () => { return channels.getUnprocessedCount(); -} +}; -async function getAllUnprocessed(): Promise> { +const getAllUnprocessed: AsyncWrapper = () => { return channels.getAllUnprocessed(); -} +}; -async function getUnprocessedById(id: string): Promise { +const getUnprocessedById: AsyncWrapper = id => { return channels.getUnprocessedById(id); -} - -export type UnprocessedParameter = { - id: string; - version: number; - envelope: string; - timestamp: number; - attempts: number; - messageHash: string; - senderIdentity?: string; - decrypted?: string; // added once the envelopes's content is decrypted with updateCache - source?: string; // added once the envelopes's content is decrypted with updateCache }; -async function saveUnprocessed(data: UnprocessedParameter): Promise { - const id = await channels.saveUnprocessed(_cleanData(data)); - return id; -} +const saveUnprocessed: AsyncWrapper = data => { + return channels.saveUnprocessed(_cleanData(data)); +}; -async function updateUnprocessedAttempts(id: string, attempts: number): Promise { - await channels.updateUnprocessedAttempts(id, attempts); -} -async function updateUnprocessedWithData(id: string, data: any): Promise { - await channels.updateUnprocessedWithData(id, data); -} +const updateUnprocessedAttempts: AsyncWrapper = ( + id, + attempts +) => { + return channels.updateUnprocessedAttempts(id, attempts); +}; +const updateUnprocessedWithData: AsyncWrapper = ( + id, + data +) => { + return channels.updateUnprocessedWithData(id, _cleanData(data)); +}; -async function removeUnprocessed(id: string): Promise { - await channels.removeUnprocessed(id); -} +const removeUnprocessed: AsyncWrapper = id => { + return channels.removeUnprocessed(id); +}; -async function removeAllUnprocessed(): Promise { - await channels.removeAllUnprocessed(); -} +const removeAllUnprocessed: AsyncWrapper = () => { + return channels.removeAllUnprocessed(); +}; // Attachment downloads @@ -908,3 +816,97 @@ export async function getAllItems(): Promise> { export async function removeItemById(id: string): Promise { await channels.removeItemById(id); } + +// we export them like this instead of directly with the `export function` cause this is helping a lot for testing +export const Data = { + shutdown, + close, + removeDB, + getPasswordHash, + + // items table logic + createOrUpdateItem, + getItemById, + getAllItems, + removeItemById, + + // guard nodes + getGuardNodes, + updateGuardNodes, + generateAttachmentKeyIfEmpty, + getSwarmNodesForPubkey, + updateSwarmNodesForPubkey, + getAllEncryptionKeyPairsForGroup, + getLatestClosedGroupEncryptionKeyPair, + addClosedGroupEncryptionKeyPair, + removeAllClosedGroupEncryptionKeyPairs, + saveConversation, + getConversationById, + removeConversation, + getAllConversations, + getPubkeysInPublicConversation, + searchConversations, + searchMessages, + searchMessagesInConversation, + cleanSeenMessages, + cleanLastHashes, + saveSeenMessageHashes, + updateLastHash, + saveMessage, + saveMessages, + removeMessage, + removeMessagesByIds, + getMessageIdsFromServerIds, + getMessageById, + getMessageBySenderAndSentAt, + getMessageByServerId, + filterAlreadyFetchedOpengroupMessage, + getMessageBySenderAndTimestamp, + getUnreadByConversation, + getUnreadCountByConversation, + markAllAsReadByConversationNoExpiration, + getMessageCountByType, + getMessagesByConversation, + getLastMessagesByConversation, + getLastMessageIdInConversation, + getLastMessageInConversation, + getOldestMessageInConversation, + getMessageCount, + getFirstUnreadMessageIdInConversation, + getFirstUnreadMessageWithMention, + hasConversationOutgoingMessage, + getLastHashBySnode, + getSeenMessagesByHashList, + removeAllMessagesInConversation, + getMessagesBySentAt, + getExpiredMessages, + getOutgoingWithoutExpiresAt, + getNextExpiringMessage, + + // Unprocessed messages data + getUnprocessedCount, + getAllUnprocessed, + getUnprocessedById, + saveUnprocessed, + updateUnprocessedAttempts, + updateUnprocessedWithData, + removeUnprocessed, + removeAllUnprocessed, + + // attachments download jobs + getNextAttachmentDownloadJobs, + saveAttachmentDownloadJob, + setAttachmentDownloadJobPending, + resetAttachmentDownloadPending, + removeAttachmentDownloadJob, + removeAllAttachmentDownloadJobs, + removeAll, + removeAllConversations, + cleanupOrphanedAttachments, + removeOtherData, + getMessagesWithVisualMediaAttachments, + getMessagesWithFileAttachments, + getSnodePoolFromDb, + updateSnodePoolOnDb, + fillWithTestData, +}; diff --git a/ts/interactions/conversationInteractions.ts b/ts/interactions/conversationInteractions.ts index 90d32275e..3cdf64033 100644 --- a/ts/interactions/conversationInteractions.ts +++ b/ts/interactions/conversationInteractions.ts @@ -455,6 +455,10 @@ export async function uploadOurAvatar(newAvatarDecrypted?: ArrayBuffer) { `Reuploading avatar finished at ${newTimestampReupload}, newAttachmentPointer ${fileUrl}` ); } + return { + avatarPointer: ourConvo.get('avatarPointer'), + profileKey: ourConvo.get('profileKey'), + }; } export async function replyToMessage(messageId: string) { diff --git a/ts/mains/main_renderer.tsx b/ts/mains/main_renderer.tsx index c5879d65c..d57cf8655 100644 --- a/ts/mains/main_renderer.tsx +++ b/ts/mains/main_renderer.tsx @@ -440,7 +440,9 @@ async function connect() { Notifications.enable(); }, 10 * 1000); // 10 sec - await queueAllCached(); + setTimeout(() => { + void queueAllCached(); + }, 10 * 1000); // 10 sec await AttachmentDownloads.start({ logger: window.log, }); diff --git a/ts/models/conversationAttributes.ts b/ts/models/conversationAttributes.ts index 83938d2a0..eb5aa4566 100644 --- a/ts/models/conversationAttributes.ts +++ b/ts/models/conversationAttributes.ts @@ -91,7 +91,7 @@ export interface ConversationAttributes { /** * When we create a closed group v3 or get promoted to admim, we need to save the private key of that closed group. */ - identityPrivateKey?: string; + // identityPrivateKey?: string; } /** diff --git a/ts/node/database_utility.ts b/ts/node/database_utility.ts index 1a8087342..988f37070 100644 --- a/ts/node/database_utility.ts +++ b/ts/node/database_utility.ts @@ -169,10 +169,10 @@ export function formatRowOfConversation(row?: Record): Conversation convo.active_at = 0; } - convo.identityPrivateKey = row.identityPrivateKey; - if (!convo.identityPrivateKey) { - convo.identityPrivateKey = undefined; - } + // convo.identityPrivateKey = row.identityPrivateKey; + // if (!convo.identityPrivateKey) { + // convo.identityPrivateKey = undefined; + // } return convo; } diff --git a/ts/node/migration/sessionMigrations.ts b/ts/node/migration/sessionMigrations.ts index 69df8c3fa..36b9d44f4 100644 --- a/ts/node/migration/sessionMigrations.ts +++ b/ts/node/migration/sessionMigrations.ts @@ -1242,13 +1242,15 @@ function updateToSessionSchemaVersion31(currentVersion: number, db: BetterSqlite combinedMessageHashes TEXT); `); - db.exec(`ALTER TABLE conversations - ADD COLUMN lastReadTimestampMs INTEGER; - ; - `); + // db.exec(`ALTER TABLE conversations + // ADD COLUMN lastReadTimestampMs INTEGER; + // ; + // `); + + db.exec(`ALTER TABLE unprocessed DROP COLUMN serverTimestamp;`); // we need to populate those fields with the current state of the conversation so let's throw null until this is done - throw null; + // throw new Error('update me'); writeSessionSchemaVersion(targetVersion, db); })(); diff --git a/ts/node/sql.ts b/ts/node/sql.ts index 2c09883f3..8481be09e 100644 --- a/ts/node/sql.ts +++ b/ts/node/sql.ts @@ -17,6 +17,7 @@ import { isString, last, map, + omit, } from 'lodash'; import { redactAll } from '../util/privacy'; // checked - only node import { LocaleMessagesType } from './locale'; // checked - only node @@ -46,7 +47,14 @@ import { toSqliteBoolean, } from './database_utility'; -import { ConfigDumpDataNode, ConfigDumpRow, UpdateLastHashType } from '../types/sqlSharedTypes'; +import { + ConfigDumpDataNode, + ConfigDumpRow, + MsgDuplicateSearchOpenGroup, + UnprocessedDataNode, + UnprocessedParameter, + UpdateLastHashType, +} from '../types/sqlSharedTypes'; import { OpenGroupV2Room } from '../data/opengroups'; import { @@ -437,9 +445,16 @@ function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3 avatarInProfile, displayNameInProfile, conversationIdOrigin, - identityPrivateKey, + // identityPrivateKey, } = formatted; + //FIXME + console.warn('FIXME omit(formatted, identityPrivateKey);'); + const omited = omit(formatted, 'identityPrivateKey'); + const keys = Object.keys(omited); + const columnsCommaSeparated = keys.join(', '); + const valuesArgs = keys.map(k => `$${k}`).join(', '); + const maxLength = 300; // shorten the last message as we never need more than `maxLength` chars (and it bloats the redux/ipc calls uselessly. @@ -450,73 +465,9 @@ function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3 assertGlobalInstanceOrInstance(instance) .prepare( `INSERT OR REPLACE INTO ${CONVERSATIONS_TABLE} ( - id, - active_at, - type, - members, - nickname, - profileKey, - zombies, - left, - expireTimer, - mentionedUs, - unreadCount, - lastMessageStatus, - lastMessage, - lastJoinedTimestamp, - groupAdmins, - groupModerators, - isKickedFromGroup, - subscriberCount, - readCapability, - writeCapability, - uploadCapability, - is_medium_group, - avatarPointer, - avatarImageId, - triggerNotificationsFor, - isTrustedForAttachmentDownload, - isPinned, - isApproved, - didApproveMe, - avatarInProfile, - displayNameInProfile, - conversationIdOrigin, - identityPrivateKey + ${columnsCommaSeparated} ) values ( - $id, - $active_at, - $type, - $members, - $nickname, - $profileKey, - $zombies, - $left, - $expireTimer, - $mentionedUs, - $unreadCount, - $lastMessageStatus, - $lastMessage, - $lastJoinedTimestamp, - $groupAdmins, - $groupModerators, - $isKickedFromGroup, - $subscriberCount, - $readCapability, - $writeCapability, - $uploadCapability, - $is_medium_group, - $avatarPointer, - $avatarImageId, - $triggerNotificationsFor, - $isTrustedForAttachmentDownload, - $isPinned, - $isApproved, - $didApproveMe, - $avatarInProfile, - $displayNameInProfile, - $conversationIdOrigin, - $identityPrivateKey + ${valuesArgs} )` ) .run({ @@ -555,7 +506,7 @@ function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3 avatarInProfile, displayNameInProfile, conversationIdOrigin, - identityPrivateKey, + // identityPrivateKey, }); } @@ -1076,8 +1027,8 @@ function getMessageBySenderAndTimestamp({ } function filterAlreadyFetchedOpengroupMessage( - msgDetails: Array<{ sender: string; serverTimestamp: number }> // MsgDuplicateSearchOpenGroup -): Array<{ sender: string; serverTimestamp: number }> { + msgDetails: MsgDuplicateSearchOpenGroup +): MsgDuplicateSearchOpenGroup { const filteredNonBlinded = msgDetails.filter(msg => { const rows = assertGlobalInstance() .prepare( @@ -1478,127 +1429,119 @@ function getNextExpiringMessage() { } /* Unproccessed a received messages not yet processed */ -function saveUnprocessed(data: any) { - const { id, timestamp, version, attempts, envelope, senderIdentity, messageHash } = data; - if (!id) { - throw new Error(`saveUnprocessed: id was falsey: ${id}`); - } +const unprocessed: UnprocessedDataNode = { + saveUnprocessed: (data: UnprocessedParameter) => { + const { id, timestamp, version, attempts, envelope, senderIdentity, messageHash } = data; + if (!id) { + throw new Error(`saveUnprocessed: id was falsey: ${id}`); + } - assertGlobalInstance() - .prepare( - `INSERT OR REPLACE INTO unprocessed ( - id, - timestamp, - version, - attempts, - envelope, - senderIdentity, - serverHash - ) values ( - $id, - $timestamp, - $version, - $attempts, - $envelope, - $senderIdentity, - $messageHash - );` - ) - .run({ - id, - timestamp, - version, - attempts, - envelope, - senderIdentity, - messageHash, - }); + assertGlobalInstance() + .prepare( + `INSERT OR REPLACE INTO unprocessed ( + id, + timestamp, + version, + attempts, + envelope, + senderIdentity, + serverHash + ) values ( + $id, + $timestamp, + $version, + $attempts, + $envelope, + $senderIdentity, + $messageHash + );` + ) + .run({ + id, + timestamp, + version, + attempts, + envelope, + senderIdentity, + messageHash, + }); + }, - return id; -} + updateUnprocessedAttempts: (id: string, attempts: number) => { + assertGlobalInstance() + .prepare('UPDATE unprocessed SET attempts = $attempts WHERE id = $id;') + .run({ + id, + attempts, + }); + }, -function updateUnprocessedAttempts(id: string, attempts: number) { - assertGlobalInstance() - .prepare('UPDATE unprocessed SET attempts = $attempts WHERE id = $id;') - .run({ - id, - attempts, - }); -} -function updateUnprocessedWithData(id: string, data: any = {}) { - const { source, serverTimestamp, decrypted, senderIdentity } = data; + updateUnprocessedWithData: (id: string, data: UnprocessedParameter) => { + const { source, decrypted, senderIdentity } = data; - assertGlobalInstance() - .prepare( - `UPDATE unprocessed SET - source = $source, - serverTimestamp = $serverTimestamp, - decrypted = $decrypted, - senderIdentity = $senderIdentity - WHERE id = $id;` - ) - .run({ - id, - source, - serverTimestamp, - decrypted, - senderIdentity, - }); -} + assertGlobalInstance() + .prepare( + `UPDATE unprocessed SET + source = $source, + decrypted = $decrypted, + senderIdentity = $senderIdentity + WHERE id = $id;` + ) + .run({ + id, + source, + decrypted, + senderIdentity, + }); + }, -function getUnprocessedById(id: string) { - const row = assertGlobalInstance() - .prepare('SELECT * FROM unprocessed WHERE id = $id;') - .get({ - id, - }); + getUnprocessedById: (id: string) => { + const row = assertGlobalInstance() + .prepare('SELECT * FROM unprocessed WHERE id = $id;') + .get({ + id, + }); - return row; -} + return row; + }, -function getUnprocessedCount() { - const row = assertGlobalInstance() - .prepare('SELECT count(*) from unprocessed;') - .get(); + getUnprocessedCount: () => { + const row = assertGlobalInstance() + .prepare('SELECT count(*) from unprocessed;') + .get(); - if (!row) { - throw new Error('getMessageCount: Unable to get count of unprocessed'); - } + if (!row) { + throw new Error('getMessageCount: Unable to get count of unprocessed'); + } - return row['count(*)']; -} + return row['count(*)']; + }, -function getAllUnprocessed() { - const rows = assertGlobalInstance() - .prepare('SELECT * FROM unprocessed ORDER BY timestamp ASC;') - .all(); + getAllUnprocessed: () => { + const rows = assertGlobalInstance() + .prepare('SELECT * FROM unprocessed ORDER BY timestamp ASC;') + .all(); - return rows; -} + return rows; + }, -function removeUnprocessed(id: string) { - if (!Array.isArray(id)) { + removeUnprocessed: (id: string): void => { + if (Array.isArray(id)) { + console.warn('removeUnprocessed only supports single ids at a time'); + throw new Error('removeUnprocessed only supports single ids at a time'); + } assertGlobalInstance() .prepare('DELETE FROM unprocessed WHERE id = $id;') .run({ id }); return; - } - - if (!id.length) { - throw new Error('removeUnprocessed: No ids to delete!'); - } - - // Our node interface doesn't seem to allow you to replace one single ? with an array - assertGlobalInstance() - .prepare(`DELETE FROM unprocessed WHERE id IN ( ${id.map(() => '?').join(', ')} );`) - .run(id); -} + }, -function removeAllUnprocessed() { - assertGlobalInstance() - .prepare('DELETE FROM unprocessed;') - .run(); -} + removeAllUnprocessed: () => { + assertGlobalInstance() + .prepare('DELETE FROM unprocessed;') + .run(); + }, +}; function getNextAttachmentDownloadJobs(limit: number) { const timestamp = Date.now(); @@ -2565,14 +2508,8 @@ export const sqlNode = { hasConversationOutgoingMessage, fillWithTestData, - getUnprocessedCount, - getAllUnprocessed, - saveUnprocessed, - updateUnprocessedAttempts, - updateUnprocessedWithData, - getUnprocessedById, - removeUnprocessed, - removeAllUnprocessed, + // add all the calls related to the unprocessed cache of incoming messages + ...unprocessed, getNextAttachmentDownloadJobs, saveAttachmentDownloadJob, diff --git a/ts/receiver/cache.ts b/ts/receiver/cache.ts index e67e98374..5e334c1a8 100644 --- a/ts/receiver/cache.ts +++ b/ts/receiver/cache.ts @@ -1,12 +1,11 @@ import { EnvelopePlus } from './types'; import { StringUtils } from '../session/utils'; import _ from 'lodash'; -import { Data, UnprocessedParameter } from '../data/data'; +import { Data } from '../data/data'; +import { UnprocessedParameter } from '../types/sqlSharedTypes'; export async function removeFromCache(envelope: EnvelopePlus) { - const { id } = envelope; - // window?.log?.info(`removing from cache envelope: ${id}`); - return Data.removeUnprocessed(id); + return Data.removeUnprocessed(envelope.id); } export async function addToCache( @@ -15,7 +14,6 @@ export async function addToCache( messageHash: string ) { const { id } = envelope; - // window?.log?.info(`adding to cache envelope: ${id}`); const encodedEnvelope = StringUtils.decode(plaintext, 'base64'); const data: UnprocessedParameter = { @@ -30,10 +28,10 @@ export async function addToCache( if (envelope.senderIdentity) { data.senderIdentity = envelope.senderIdentity; } - return Data.saveUnprocessed(data); + await Data.saveUnprocessed(data); } -async function fetchAllFromCache(): Promise> { +async function fetchAllFromCache(): Promise> { const count = await Data.getUnprocessedCount(); if (count > 1500) { @@ -42,30 +40,26 @@ async function fetchAllFromCache(): Promise> { return []; } - const items = await Data.getAllUnprocessed(); - return items; + return Data.getAllUnprocessed(); } -export async function getAllFromCache() { - window?.log?.info('getAllFromCache'); - const items = await fetchAllFromCache(); - - window?.log?.info('getAllFromCache loaded', items.length, 'saved envelopes'); - +async function increaseAttemptsOrRemove( + items: Array +): Promise> { return Promise.all( - _.map(items, async (item: any) => { + _.map(items, async item => { const attempts = _.toNumber(item.attempts || 0) + 1; try { if (attempts >= 10) { - window?.log?.warn('getAllFromCache final attempt for envelope', item.id); + window?.log?.warn('increaseAttemptsOrRemove final attempt for envelope', item.id); await Data.removeUnprocessed(item.id); } else { await Data.updateUnprocessedAttempts(item.id, attempts); } } catch (error) { window?.log?.error( - 'getAllFromCache error updating item after load:', + 'increaseAttemptsOrRemove error updating item after load:', error && error.stack ? error.stack : error ); } @@ -75,6 +69,14 @@ export async function getAllFromCache() { ); } +export async function getAllFromCache() { + window?.log?.info('getAllFromCache'); + const items = await fetchAllFromCache(); + + window?.log?.info('getAllFromCache loaded', items.length, 'saved envelopes'); + return increaseAttemptsOrRemove(items); +} + export async function getAllFromCacheForSource(source: string) { const items = await fetchAllFromCache(); @@ -85,34 +87,19 @@ export async function getAllFromCacheForSource(source: string) { window?.log?.info('getAllFromCacheForSource loaded', itemsFromSource.length, 'saved envelopes'); - return Promise.all( - _.map(items, async (item: any) => { - const attempts = _.toNumber(item.attempts || 0) + 1; - - try { - if (attempts >= 10) { - window?.log?.warn('getAllFromCache final attempt for envelope', item.id); - await Data.removeUnprocessed(item.id); - } else { - await Data.updateUnprocessedAttempts(item.id, attempts); - } - } catch (error) { - window?.log?.error( - 'getAllFromCache error updating item after load:', - error && error.stack ? error.stack : error - ); - } - - return item; - }) - ); + return increaseAttemptsOrRemove(itemsFromSource); } -export async function updateCache(envelope: EnvelopePlus, plaintext: ArrayBuffer): Promise { +export async function updateCacheWithDecryptedContent( + envelope: EnvelopePlus, + plaintext: ArrayBuffer +): Promise { const { id } = envelope; const item = await Data.getUnprocessedById(id); if (!item) { - window?.log?.error(`updateCache: Didn't find item ${id} in cache to update`); + window?.log?.error( + `updateCacheWithDecryptedContent: Didn't find item ${id} in cache to update` + ); return; } @@ -125,5 +112,5 @@ export async function updateCache(envelope: EnvelopePlus, plaintext: ArrayBuffer item.decrypted = StringUtils.decode(plaintext, 'base64'); - return Data.updateUnprocessedWithData(item.id, item); + await Data.updateUnprocessedWithData(item.id, item); } diff --git a/ts/receiver/configMessage.ts b/ts/receiver/configMessage.ts index 173776b6d..a612c13f4 100644 --- a/ts/receiver/configMessage.ts +++ b/ts/receiver/configMessage.ts @@ -1,28 +1,27 @@ -import _, { groupBy, isArray, isEmpty } from 'lodash'; +import _, { isEmpty } from 'lodash'; +import { ContactInfo, ProfilePicture } from 'session_util_wrapper'; import { Data, hasSyncedInitialConfigurationItem } from '../data/data'; +import { ConversationInteraction } from '../interactions'; +import { ConversationTypeEnum } from '../models/conversationAttributes'; +import { SignalService } from '../protobuf'; import { joinOpenGroupV2WithUIEvents, parseOpenGroupV2, } from '../session/apis/open_group_api/opengroupV2/JoinOpenGroupV2'; import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils'; -import { SignalService } from '../protobuf'; import { getConversationController } from '../session/conversations'; +import { IncomingMessage } from '../session/messages/incoming/IncomingMessage'; import { UserUtils } from '../session/utils'; import { toHex } from '../session/utils/String'; import { configurationMessageReceived, trigger } from '../shims/events'; import { BlockedNumberController } from '../util'; +import { getLastProfileUpdateTimestamp, setLastProfileUpdateTimestamp } from '../util/storage'; +import { ConfigWrapperObjectTypes } from '../webworker/workers/browser/libsession_worker_functions'; +import { callLibSessionWorker } from '../webworker/workers/browser/libsession_worker_interface'; import { removeFromCache } from './cache'; import { handleNewClosedGroup } from './closedGroups'; import { EnvelopePlus } from './types'; -import { ConversationInteraction } from '../interactions'; -import { getLastProfileUpdateTimestamp, setLastProfileUpdateTimestamp } from '../util/storage'; import { appendFetchAvatarAndProfileJob, updateOurProfileSync } from './userProfileImageUpdates'; -import { ConversationTypeEnum } from '../models/conversationAttributes'; -import { callLibSessionWorker } from '../webworker/workers/browser/libsession_worker_interface'; -import { IncomingMessage } from '../session/messages/incoming/IncomingMessage'; -import { ConfigWrapperObjectTypes } from '../webworker/workers/browser/libsession_worker_functions'; -import { Dictionary } from '@reduxjs/toolkit'; -import { ContactInfo, ProfilePicture } from 'session_util_wrapper'; type IncomingConfResult = { needsPush: boolean; @@ -45,26 +44,22 @@ function protobufSharedConfigTypeToWrapper( } async function mergeConfigsWithIncomingUpdates( - groupedByKind: Dictionary>> + incomingConfig: IncomingMessage ) { const kindMessageMap: Map = new Map(); - // do the merging on all wrappers sequentially instead of with a promise.all() - const allKinds = (Object.keys(groupedByKind) as unknown) as Array< - SignalService.SharedConfigMessage.Kind - >; + const allKinds = [incomingConfig.message.kind]; for (let index = 0; index < allKinds.length; index++) { const kind = allKinds[index]; - // see comment above "groupedByKind = groupBy" about why this is needed - const castedKind = (kind as unknown) as SignalService.SharedConfigMessage.Kind; - const currentKindMessages = groupedByKind[castedKind]; + + const currentKindMessages = [incomingConfig]; if (!currentKindMessages) { continue; } const toMerge = currentKindMessages.map(m => m.message.data); - const wrapperId = protobufSharedConfigTypeToWrapper(castedKind); + const wrapperId = protobufSharedConfigTypeToWrapper(kind); if (!wrapperId) { - throw new Error(`Invalid castedKind: ${castedKind}`); + throw new Error(`Invalid kind: ${kind}`); } await callLibSessionWorker([wrapperId, 'merge', toMerge]); @@ -181,6 +176,7 @@ async function handleContactsUpdate(result: IncomingConfResult) { } async function processMergingResults( + envelope: EnvelopePlus, results: Map ) { const keys = [...results.keys()]; @@ -206,32 +202,30 @@ async function processMergingResults( throw e; } } + await removeFromCache(envelope); } -async function handleConfigMessagesViaLibSession( - configMessages: Array> +async function handleConfigMessageViaLibSession( + envelope: EnvelopePlus, + configMessage: IncomingMessage ) { // FIXME: Remove this once `useSharedUtilForUserConfig` is permanent - - if ( - !window.sessionFeatureFlags.useSharedUtilForUserConfig || - !configMessages || - !isArray(configMessages) || - configMessages.length === 0 - ) { + if (!window.sessionFeatureFlags.useSharedUtilForUserConfig) { + await removeFromCache(envelope); return; } - window?.log?.info( - `Handling our profileUdpates via libsession_util. count: ${configMessages.length}` - ); + if (!configMessage) { + await removeFromCache(envelope); + + return; + } - // lodash does not have a way to give the type of the keys as generic parameter so this can only be a string: Array<> - const groupedByKind = groupBy(configMessages, m => m.message.kind); + window?.log?.info(`Handling our profileUdpates via libsession_util.`); - const kindMessagesMap = await mergeConfigsWithIncomingUpdates(groupedByKind); + const kindMessagesMap = await mergeConfigsWithIncomingUpdates(configMessage); - await processMergingResults(kindMessagesMap); + await processMergingResults(envelope, kindMessagesMap); } async function handleOurProfileUpdate( @@ -426,6 +420,7 @@ async function handleConfigurationMessage( window?.log?.info( 'useSharedUtilForUserConfig is set, not handling config messages with "handleConfigurationMessage()"' ); + await removeFromCache(envelope); return; } @@ -449,5 +444,5 @@ async function handleConfigurationMessage( export const ConfigMessageHandler = { handleConfigurationMessage, - handleConfigMessagesViaLibSession, + handleConfigMessageViaLibSession, }; diff --git a/ts/receiver/contentMessage.ts b/ts/receiver/contentMessage.ts index 855972c8a..04958329d 100644 --- a/ts/receiver/contentMessage.ts +++ b/ts/receiver/contentMessage.ts @@ -1,7 +1,7 @@ import { EnvelopePlus } from './types'; import { handleSwarmDataMessage } from './dataMessage'; -import { removeFromCache, updateCache } from './cache'; +import { removeFromCache, updateCacheWithDecryptedContent } from './cache'; import { SignalService } from '../protobuf'; import { compact, flatten, identity, isEmpty, pickBy, toNumber } from 'lodash'; import { KeyPrefixType, PubKey } from '../session/types'; @@ -28,6 +28,7 @@ import { import { ConversationTypeEnum } from '../models/conversationAttributes'; import { findCachedBlindedMatchOrLookupOnAllServers } from '../session/apis/open_group_api/sogsv3/knownBlindedkeys'; import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates'; +import { IncomingMessage } from '../session/messages/incoming/IncomingMessage'; export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageHash: string) { try { @@ -39,7 +40,7 @@ export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageH return; } const sentAtTimestamp = toNumber(envelope.timestamp); - // swarm messages already comes with a timestamp is milliseconds, so this sentAtTimestamp is correct. + // swarm messages already comes with a timestamp in milliseconds, so this sentAtTimestamp is correct. // the sogs messages do not come as milliseconds but just seconds, so we override it await innerHandleSwarmContentMessage(envelope, sentAtTimestamp, plaintext, messageHash); } catch (e) { @@ -270,15 +271,15 @@ async function decrypt(envelope: EnvelopePlus, ciphertext: ArrayBuffer): Promise return null; } - perfStart(`updateCache-${envelope.id}`); + perfStart(`updateCacheWithDecryptedContent-${envelope.id}`); - await updateCache(envelope, plaintext).catch((error: any) => { + await updateCacheWithDecryptedContent(envelope, plaintext).catch((error: any) => { window?.log?.error( 'decrypt failed to save decrypted message contents to cache:', error && error.stack ? error.stack : error ); }); - perfEnd(`updateCache-${envelope.id}`, 'updateCache'); + perfEnd(`updateCacheWithDecryptedContent-${envelope.id}`, 'updateCacheWithDecryptedContent'); return plaintext; } catch (error) { @@ -433,6 +434,18 @@ export async function innerHandleSwarmContentMessage( ); return; } + if (content.sharedConfigMessage) { + if (window.sessionFeatureFlags.useSharedUtilForUserConfig) { + const asIncomingMsg: IncomingMessage = { + envelopeTimestamp: sentAtTimestamp, + message: content.sharedConfigMessage, + messageHash: messageHash, + authorOrGroupPubkey: envelope.source, + authorInGroup: envelope.senderIdentity, + }; + await ConfigMessageHandler.handleConfigMessageViaLibSession(envelope, asIncomingMsg); + } + } if (content.dataExtractionNotification) { perfStart(`handleDataExtractionNotification-${envelope.id}`); diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index c2e2e7c44..1382c9dee 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -14,12 +14,11 @@ import { SignalService } from '../protobuf'; import { Data } from '../data/data'; import { createTaskWithTimeout } from '../session/utils/TaskWithTimeout'; import { perfEnd, perfStart } from '../session/utils/Performance'; +import { UnprocessedParameter } from '../types/sqlSharedTypes'; -// TODO: check if some of these exports no longer needed - -interface ReqOptions { +export type ReqOptions = { conversationId: string; -} +}; const incomingMessagePromises: Array> = []; @@ -29,7 +28,7 @@ async function handleSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) } await removeFromCache(envelope); - throw new Error('Received message with no content and no legacyMessage'); + throw new Error('Received message with no content'); } class EnvelopeQueue { @@ -57,8 +56,6 @@ const envelopeQueue = new EnvelopeQueue(); function queueSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) { const id = getEnvelopeId(envelope); - // window?.log?.info('queueing envelope', id); - const task = handleSwarmEnvelope.bind(null, envelope, messageHash); const taskWithTimeout = createTaskWithTimeout(task, `queueSwarmEnvelope ${id}`); @@ -102,7 +99,7 @@ async function handleRequestDetail( envelope.senderIdentity = senderIdentity; } - envelope.id = envelope.serverGuid || uuidv4(); + envelope.id = uuidv4(); envelope.serverTimestamp = envelope.serverTimestamp ? envelope.serverTimestamp.toNumber() : null; envelope.messageHash = messageHash; @@ -115,11 +112,7 @@ async function handleRequestDetail( await addToCache(envelope, plaintext, messageHash); perfEnd(`addToCache-${envelope.id}`, 'addToCache'); - // TODO: This is the glue between the first and the last part of the - // receiving pipeline refactor. It is to be implemented in the next PR. - // To ensure that we queue in the same order we receive messages - await lastPromise; queueSwarmEnvelope(envelope, messageHash); @@ -131,7 +124,11 @@ async function handleRequestDetail( } } -export function handleRequest(plaintext: any, options: ReqOptions, messageHash: string): void { +export function handleRequest( + plaintext: Uint8Array, + options: ReqOptions, + messageHash: string +): void { // tslint:disable-next-line no-promise-as-boolean const lastPromise = _.last(incomingMessagePromises) || Promise.resolve(); @@ -163,7 +160,7 @@ export async function queueAllCachedFromSource(source: string) { }, Promise.resolve()); } -async function queueCached(item: any) { +async function queueCached(item: UnprocessedParameter) { try { const envelopePlaintext = StringUtils.encode(item.envelope, 'base64'); const envelopeArray = new Uint8Array(envelopePlaintext); @@ -174,7 +171,7 @@ async function queueCached(item: any) { // Why do we need to do this??? envelope.senderIdentity = envelope.senderIdentity || item.senderIdentity; - envelope.serverTimestamp = envelope.serverTimestamp || item.serverTimestamp; + envelope.serverTimestamp = envelope.serverTimestamp; const { decrypted } = item; @@ -194,8 +191,7 @@ async function queueCached(item: any) { ); try { - const { id } = item; - await Data.removeUnprocessed(id); + await Data.removeUnprocessed(item.id); } catch (deleteError) { window?.log?.error( 'queueCached error deleting item', diff --git a/ts/receiver/userProfileImageUpdates.ts b/ts/receiver/userProfileImageUpdates.ts index 1e2c47020..34e8d4f98 100644 --- a/ts/receiver/userProfileImageUpdates.ts +++ b/ts/receiver/userProfileImageUpdates.ts @@ -57,6 +57,7 @@ export async function updateOurProfileSync( window?.log?.warn('[profileupdate] Cannot update our profile with empty convoid'); return; } + const oneAtaTimeStr = `appendFetchAvatarAndProfileJob:${ourConvo.id}`; return allowOnlyOneAtATime(oneAtaTimeStr, async () => { return createOrUpdateProfile(ourConvo.id, profileInDataMessage, profileKey); diff --git a/ts/session/apis/snode_api/retrieveRequest.ts b/ts/session/apis/snode_api/retrieveRequest.ts index 85d1aefa6..f8dca046f 100644 --- a/ts/session/apis/snode_api/retrieveRequest.ts +++ b/ts/session/apis/snode_api/retrieveRequest.ts @@ -110,7 +110,6 @@ async function retrieveNextMessages( const firstResult = results[0]; // TODO we should probably check for status code of all the results (when polling for a few namespaces at a time) - console.warn('what should we do if we dont get a 200 on any of those fetches?'); if (firstResult.code !== 200) { window?.log?.warn(`retrieveNextMessages result is not 200 but ${firstResult.code}`); diff --git a/ts/session/apis/snode_api/swarmPolling.ts b/ts/session/apis/snode_api/swarmPolling.ts index 8bd59b418..a42b8e71d 100644 --- a/ts/session/apis/snode_api/swarmPolling.ts +++ b/ts/session/apis/snode_api/swarmPolling.ts @@ -17,23 +17,33 @@ import pRetry from 'p-retry'; import { SnodeAPIRetrieve } from './retrieveRequest'; import { SnodeNamespace, SnodeNamespaces } from './namespaces'; import { RetrieveMessageItem, RetrieveMessagesResultsBatched } from './types'; -import { ConfigMessageHandler } from '../../../receiver/configMessage'; -import { IncomingMessage } from '../../messages/incoming/IncomingMessage'; -// Some websocket nonsense -export function processMessage(message: string, options: any = {}, messageHash: string) { +export function extractWebSocketContent( + message: string, + options: any = {}, + messageHash: string +): null | { + body: Uint8Array; + messageHash: string; + options: any; +} { try { const dataPlaintext = new Uint8Array(StringUtils.encode(message, 'base64')); const messageBuf = SignalService.WebSocketMessage.decode(dataPlaintext); - if (messageBuf.type === SignalService.WebSocketMessage.Type.REQUEST) { - Receiver.handleRequest(messageBuf.request?.body, options, messageHash); + if ( + messageBuf.type === SignalService.WebSocketMessage.Type.REQUEST && + messageBuf.request?.body?.length + ) { + return { + body: messageBuf.request.body, + messageHash, + options, + }; } + return null; } catch (error) { - const info = { - message, - error: error.message, - }; - window?.log?.warn('HTTP-Resources Failed to handle message:', info); + window?.log?.warn('processMessage Failed to handle message:', error.message); + return null; } } @@ -276,27 +286,27 @@ export class SwarmPolling { const newMessages = await this.handleSeenMessages(messages); perfEnd(`handleSeenMessages-${pkStr}`, 'handleSeenMessages'); - // try { - // if ( - // window.sessionFeatureFlags.useSharedUtilForUserConfig && - // userConfigMessagesMerged.length - // ) { - // const asIncomingMessages = userConfigMessagesMerged.map(msg => { - // const incomingMessage: IncomingMessage = { - // envelopeTimestamp: msg.timestamp, - // message: msg.data, - // messageHash: msg.hash, - // }; - // }); - // await ConfigMessageHandler.handleConfigMessagesViaLibSession(); - // } - // } catch (e) { - // console.error('shared util lib process messages failed with: ', e); - // } - - newMessages.forEach((m: RetrieveMessageItem) => { - const options = isGroup ? { conversationId: pkStr } : {}; - processMessage(m.data, options, m.hash); + if (window.sessionFeatureFlags.useSharedUtilForUserConfig) { + const extractedUserConfigMessage = compact( + userConfigMessagesMerged.map((m: RetrieveMessageItem) => { + return extractWebSocketContent(m.data, {}, m.hash); + }) + ); + + extractedUserConfigMessage.forEach(m => { + Receiver.handleRequest(m.body, m.options, m.messageHash); + }); + } + + const extractedContentMessage = compact( + newMessages.map((m: RetrieveMessageItem) => { + const options = isGroup ? { conversationId: pkStr } : {}; + return extractWebSocketContent(m.data, options, m.hash); + }) + ); + + extractedContentMessage.forEach(m => { + Receiver.handleRequest(m.body, m.options, m.messageHash); }); } diff --git a/ts/session/group/closed-group.ts b/ts/session/group/closed-group.ts index d16c33a31..a484e2db2 100644 --- a/ts/session/group/closed-group.ts +++ b/ts/session/group/closed-group.ts @@ -245,7 +245,7 @@ export async function updateOrCreateClosedGroup(details: GroupInfo | GroupInfoV3 const updates: Pick< ConversationAttributes, | 'type' - | 'identityPrivateKey' + // | 'identityPrivateKey' | 'members' | 'displayNameInProfile' | 'is_medium_group' @@ -260,7 +260,7 @@ export async function updateOrCreateClosedGroup(details: GroupInfo | GroupInfoV3 active_at: details.activeAt ? details.activeAt : 0, left: details.activeAt ? false : true, lastJoinedTimestamp: details.activeAt && weWereJustAdded ? Date.now() : details.activeAt || 0, - identityPrivateKey: isV3(details) ? details.identityPrivateKey : undefined, + // identityPrivateKey: isV3(details) ? details.identityPrivateKey : undefined, }; console.warn('updates', updates); diff --git a/ts/session/messages/incoming/IncomingMessage.ts b/ts/session/messages/incoming/IncomingMessage.ts index cab939c2e..e489d2ca3 100644 --- a/ts/session/messages/incoming/IncomingMessage.ts +++ b/ts/session/messages/incoming/IncomingMessage.ts @@ -10,7 +10,7 @@ type IncomingMessageAvailableTypes = | SignalService.DataExtractionNotification | SignalService.Unsend | SignalService.MessageRequestResponse - | SignalService.SharedConfigMessage; + | SignalService.ISharedConfigMessage; export class IncomingMessage { public readonly envelopeTimestamp: number; diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index 3ceff61e3..6355e1cf1 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -333,7 +333,7 @@ export class MessageQueue { message instanceof SharedConfigMessage || (message as any).syncTarget?.length > 0 ) { - window?.log?.warn('Processing sync message'); + window?.log?.warn('OutgoingMessageQueue: Processing sync message'); isSyncMessage = true; } else { window?.log?.warn('Dropping message in process() to be sent to ourself'); diff --git a/ts/session/utils/AttachmentsDownload.ts b/ts/session/utils/AttachmentsDownload.ts index acdec08ea..15f1c2e29 100644 --- a/ts/session/utils/AttachmentsDownload.ts +++ b/ts/session/utils/AttachmentsDownload.ts @@ -9,6 +9,7 @@ import { downloadAttachment, downloadAttachmentSogsV3 } from '../../receiver/att import { initializeAttachmentLogic, processNewAttachment } from '../../types/MessageAttachment'; import { getAttachmentMetadata } from '../../types/message/initializeAttachmentMetadata'; import { was404Error } from '../apis/snode_api/onions'; +import { AttachmentDownloadMessageDetails } from '../../types/sqlSharedTypes'; // this may cause issues if we increment that value to > 1, but only having one job will block the whole queue while one attachment is downloading const MAX_ATTACHMENT_JOB_PARALLELISM = 3; @@ -49,7 +50,7 @@ export function stop() { } } -export async function addJob(attachment: any, job: any = {}) { +export async function addJob(attachment: any, job: AttachmentDownloadMessageDetails) { if (!attachment) { throw new Error('attachments_download/addJob: attachment is required'); } diff --git a/ts/session/utils/job_runners/JobRunner.ts b/ts/session/utils/job_runners/JobRunner.ts index a8a6f4a4a..6e305f409 100644 --- a/ts/session/utils/job_runners/JobRunner.ts +++ b/ts/session/utils/job_runners/JobRunner.ts @@ -43,7 +43,7 @@ export class PersistedJobRunner { constructor(jobRunnerType: JobRunnerType, jobEventsListener: JobEventListener | null) { this.jobRunnerType = jobRunnerType; this.jobEventsListener = jobEventsListener; - console.warn('new runner'); + window.log.warn('new runner'); } public async loadJobsFromDb() { @@ -157,7 +157,7 @@ export class PersistedJobRunner { private async writeJobsToDB() { const serialized = this.getSerializedJobs(); - console.warn('writing to db', serialized); + window.log.warn('writing to db', serialized); await Data.createOrUpdateItem({ id: this.getJobRunnerItemId(), value: JSON.stringify(serialized), @@ -171,7 +171,6 @@ export class PersistedJobRunner { // a new job was added. trigger it if we can/have to start it const result = this.planNextJob(); - console.warn('addJobUnchecked: ', result); if (result === 'no_job') { throw new Error('We just pushed a job, there cannot be no job'); } @@ -258,7 +257,7 @@ export class PersistedJobRunner { // if the time is 101, and that task is to be run at t=101, we need to start it right away. if (nextJob.nextAttemptTimestamp > Date.now()) { - console.warn( + window.log.warn( 'next job is not due to be run just yet. Going idle.', nextJob.nextAttemptTimestamp - Date.now() ); diff --git a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts index 0be0d07cc..88ce0c650 100644 --- a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts +++ b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts @@ -28,10 +28,10 @@ export class ConfigurationSyncJob extends Persistedjob { public async run() { // blablha do everything from the notion page, and if success, return true. - console.warn(`running job ${this.jobType} with id:"${this.identifier}" `); + window.log.warn(`running job ${this.jobType} with id:"${this.identifier}" `); await sleepFor(5000); - console.warn( + window.log.warn( `running job ${this.jobType} with id:"${this.identifier}" done and returning failed ` ); diff --git a/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts b/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts index 68a7340c7..c96abbe9b 100644 --- a/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts +++ b/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts @@ -42,11 +42,11 @@ export class FakeSleepForMultiJob extends Persistedjob { } public async run() { - console.warn( + window.log.warn( `running job ${this.jobType} with id:"${this.identifier}". sleeping for ${this.sleepDuration} & returning ${this.returnResult} ` ); await sleepFor(this.sleepDuration); - console.warn(`${this.jobType} with id:"${this.identifier}" done. returning success `); + window.log.warn(`${this.jobType} with id:"${this.identifier}" done. returning success `); return this.returnResult; } @@ -85,9 +85,9 @@ export class FakeSleepForJob extends Persistedjob { } public async run() { - console.warn(`running job ${this.jobType} with id:"${this.identifier}" `); + window.log.warn(`running job ${this.jobType} with id:"${this.identifier}" `); await sleepFor(5000); - console.warn(`${this.jobType} with id:"${this.identifier}" done. returning failed `); + window.log.warn(`${this.jobType} with id:"${this.identifier}" done. returning failed `); return false; } diff --git a/ts/test/session/unit/utils/job_runner/JobRunner_test.ts b/ts/test/session/unit/utils/job_runner/JobRunner_test.ts index 8dc5ff062..5299f3738 100644 --- a/ts/test/session/unit/utils/job_runner/JobRunner_test.ts +++ b/ts/test/session/unit/utils/job_runner/JobRunner_test.ts @@ -58,16 +58,16 @@ describe('JobRunner', () => { clock = Sinon.useFakeTimers({ shouldAdvanceTime: true }); jobEventsListener = { onJobDeferred: (_job: SerializedPersistedJob) => { - // console.warn('listener got deferred for job ', job); + // window.log.warn('listener got deferred for job ', job); }, onJobSuccess: (_job: SerializedPersistedJob) => { - // console.warn('listener got success for job ', job); + // window.log.warn('listener got success for job ', job); }, onJobError: (_job: SerializedPersistedJob) => { - // console.warn('listener got error for job ', job); + // window.log.warn('listener got error for job ', job); }, onJobStarted: (_job: SerializedPersistedJob) => { - // console.warn('listener got started for job ', job); + // window.log.warn('listener got started for job ', job); }, }; runner = new PersistedJobRunner('FakeSleepForJob', jobEventsListener); diff --git a/ts/types/sqlSharedTypes.ts b/ts/types/sqlSharedTypes.ts index dc7248e10..54c7da67e 100644 --- a/ts/types/sqlSharedTypes.ts +++ b/ts/types/sqlSharedTypes.ts @@ -1,3 +1,4 @@ +import { OpenGroupRequestCommonType } from '../session/apis/open_group_api/opengroupV2/ApiUtil'; import { ConfigWrapperObjectTypes } from '../webworker/workers/browser/libsession_worker_functions'; /** @@ -30,6 +31,8 @@ export type ConfigDumpRow = { // we might need to add a `seqno` field here. }; +// ========== configdump + export type GetByVariantAndPubkeyConfigDump = ( variant: ConfigWrapperObjectTypes, pubkey: string @@ -45,3 +48,39 @@ export type ConfigDumpDataNode = { getAllDumpsWithData: GetAllDumps; getAllDumpsWithoutData: GetAllDumps; }; + +// ========== unprocessed + +export type UnprocessedParameter = { + id: string; + version: number; + envelope: string; + timestamp: number; + // serverTimestamp: number; + attempts: number; + messageHash: string; + senderIdentity?: string; + decrypted?: string; // added once the envelopes's content is decrypted with updateCacheWithDecryptedContent + source?: string; // added once the envelopes's content is decrypted with updateCacheWithDecryptedContent +}; + +export type UnprocessedDataNode = { + saveUnprocessed: (data: UnprocessedParameter) => void; + updateUnprocessedAttempts: (id: string, attempts: number) => void; + updateUnprocessedWithData: (id: string, data: UnprocessedParameter) => void; + getUnprocessedById: (id: string) => UnprocessedParameter | undefined; + getUnprocessedCount: () => number; + getAllUnprocessed: () => Array; + removeUnprocessed: (id: string) => void; + removeAllUnprocessed: () => void; +}; + +// ======== attachment downloads + +export type AttachmentDownloadMessageDetails = { + messageId: string; + type: 'preview' | 'quote' | 'attachment'; + index: number; + isOpenGroupV2: boolean; + openGroupV2Details: OpenGroupRequestCommonType | undefined; +}; diff --git a/yarn.lock b/yarn.lock index de65cf2e6..006523619 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4965,6 +4965,13 @@ getobject@^1.0.0, getobject@~0.1.0, getobject@~1.0.0: resolved "https://registry.yarnpkg.com/getobject/-/getobject-1.1.1.tgz#29f7858609fee7ef1c58d062f1b2335e425bdb45" integrity sha512-Rftr+NsUMxFcCmFopFmyCCfsJPaqUmf7TW61CtKMu0aE93ir62I6VjXt2koiCQgcunGgVog/U6g24tBPq67rlg== +git@^0.1.5: + version "0.1.5" + resolved "https://registry.yarnpkg.com/git/-/git-0.1.5.tgz#9ef62df93f851c27542143bf52d1c68b1017ca15" + integrity sha512-N+bfOrXyKMU/fQtCj6D/U9MQOEN0DAA8TLHSLdUQRSWBOkeRvsjJHdrdkvcq05xO7GSDKWc3nDEGoTZ4DfCCSg== + dependencies: + mime "1.2.9" + glob-parent@^6.0.1, glob-parent@~5.1.2: version "6.0.2" resolved "https://registry.yarnpkg.com/glob-parent/-/glob-parent-6.0.2.tgz#6d237d99083950c79290f24c7642a3de9a28f9e3" @@ -6494,6 +6501,11 @@ mime-types@^2.1.12, mime-types@^2.1.27: dependencies: mime-db "1.52.0" +mime@1.2.9: + version "1.2.9" + resolved "https://registry.yarnpkg.com/mime/-/mime-1.2.9.tgz#009cd40867bd35de521b3b966f04e2f8d4d13d09" + integrity sha512-WiLgbHTIq5AYUvU/Luli4mZ1bUcHpGNHyCsbl+KPMg4zt+XUDpQehWjuBjdLaEvDTinvKj/FgfQt3fPoT7j08g== + mime@^2.4.6: version "2.6.0" resolved "https://registry.yarnpkg.com/mime/-/mime-2.6.0.tgz#a2a682a95cd4d0cb1d6257e28f83da7e35800367" @@ -6810,9 +6822,9 @@ node-gyp@9.0.0: which "^2.0.2" node-gyp@^9.3.0: - version "9.3.0" - resolved "https://registry.yarnpkg.com/node-gyp/-/node-gyp-9.3.0.tgz#f8eefe77f0ad8edb3b3b898409b53e697642b319" - integrity sha512-A6rJWfXFz7TQNjpldJ915WFb1LnhO4lIve3ANPbWreuEoLoKlFT3sxIepPBkLhM27crW8YmN+pjlgbasH6cH/Q== + version "9.3.1" + resolved "https://registry.yarnpkg.com/node-gyp/-/node-gyp-9.3.1.tgz#1e19f5f290afcc9c46973d68700cbd21a96192e4" + integrity sha512-4Q16ZCqq3g8awk6UplT7AuxQ35XN4R/yf/+wSAwcBUAjg7l58RTactWaP8fIDTi0FzI7YcVLujwExakZlfWkXg== dependencies: env-paths "^2.2.0" glob "^7.1.4" @@ -8392,15 +8404,14 @@ serialize-javascript@6.0.0, serialize-javascript@^6.0.0: dependencies: randombytes "^2.1.0" -"session_util_wrapper@file:../libsession-util-nodejs": +session_util_wrapper@/home/audric/pro/contribs/libsession-util-nodejs: version "0.1.0" dependencies: nan "^2.17.0" node-gyp "^9.3.0" -"session_util_wrapper@https://github.com/oxen-io/libsession-util-nodejs": +"session_util_wrapper@file:../libsession-util-nodejs": version "0.1.0" - resolved "https://github.com/oxen-io/libsession-util-nodejs#0951d0138a289795257198d556409b2e73068fc1" dependencies: nan "^2.17.0" node-gyp "^9.3.0"