From 0158fd5ebbdc9639bc5e4cc506b612d1f3a2c4e6 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Wed, 13 Apr 2022 13:52:05 +1000 Subject: [PATCH] filter duplicates on opengroup poll in a single sql call --- .../conversation/SessionConversation.tsx | 23 ++++++++-- ts/data/data.ts | 22 +++------ ts/data/dataInit.ts | 2 +- ts/node/sql.ts | 41 +++++++++-------- ts/receiver/dataMessage.ts | 30 +----------- ts/receiver/opengroup.ts | 42 ++++++++--------- .../opengroupV2/OpenGroupServerPoller.ts | 45 ++++++++++++++++-- ts/types/attachments/migrations.ts | 27 +---------- ts/types/sqlSharedTypes.ts | 4 ++ ts/util/attachmentsUtil.ts | 46 +++++++++++++++++-- 10 files changed, 158 insertions(+), 124 deletions(-) create mode 100644 ts/types/sqlSharedTypes.ts diff --git a/ts/components/conversation/SessionConversation.tsx b/ts/components/conversation/SessionConversation.tsx index b3e993e73..3b5847005 100644 --- a/ts/components/conversation/SessionConversation.tsx +++ b/ts/components/conversation/SessionConversation.tsx @@ -1,6 +1,8 @@ import React from 'react'; +import _ from 'lodash'; import classNames from 'classnames'; +import autoBind from 'auto-bind'; import { CompositionBox, @@ -8,13 +10,14 @@ import { StagedAttachmentType, } from './composition/CompositionBox'; -import _ from 'lodash'; +import { perfEnd, perfStart } from '../../session/utils/Performance'; + +const DEFAULT_JPEG_QUALITY = 0.85; import { SessionMessagesListContainer } from './SessionMessagesListContainer'; import { SessionFileDropzone } from './SessionFileDropzone'; -import autoBind from 'auto-bind'; import { InConversationCallContainer } from '../calling/InConversationCallContainer'; import { SplitViewContainer } from '../SplitViewContainer'; import { LightboxGallery, MediaItemType } from '../lightbox/LightboxGallery'; @@ -40,7 +43,6 @@ import { MessageView } from '../MainViewController'; import { ConversationHeaderWithDetails } from './ConversationHeader'; import { MessageDetail } from './message/message-item/MessageDetail'; import { SessionRightPanelWithDetails } from './SessionRightPanel'; -import { autoOrientJpegImage } from '../../types/attachments/migrations'; import { makeImageThumbnailBuffer, makeVideoScreenshot, @@ -51,6 +53,7 @@ import { MAX_ATTACHMENT_FILESIZE_BYTES } from '../../session/constants'; import { ConversationMessageRequestButtons } from './ConversationRequestButtons'; import { ConversationRequestinfo } from './ConversationRequestInfo'; import { getCurrentRecoveryPhrase } from '../../util/storage'; +import loadImage from 'blueimp-load-image'; // tslint:disable: jsx-curly-spacing interface State { @@ -177,7 +180,7 @@ export class SessionConversation extends React.Component { await this.scrollToNow(); }; - const recoveryPhrase = getCurrentRecoveryPhrase() as string; + const recoveryPhrase = getCurrentRecoveryPhrase(); // string replace to fix case where pasted text contains invis characters causing false negatives if (msg.body.replace(/\s/g, '').includes(recoveryPhrase.replace(/\s/g, ''))) { @@ -526,6 +529,18 @@ const renderVideoPreview = async (contentType: string, file: File, fileName: str } }; +const autoOrientJpegImage = async (fileOrBlobOrURL: File): Promise => { + perfStart('autoOrientJpegImage'); + const loadedImage = await loadImage(fileOrBlobOrURL, { orientation: true, canvas: true }); + perfEnd('autoOrientJpegImage', 'autoOrientJpegImage'); + const dataURL = (loadedImage.image as HTMLCanvasElement).toDataURL( + MIME.IMAGE_JPEG, + DEFAULT_JPEG_QUALITY + ); + + return dataURL; +}; + const renderImagePreview = async (contentType: string, file: File, fileName: string) => { if (!MIME.isJPEG(contentType)) { const urlImage = URL.createObjectURL(file); diff --git a/ts/data/data.ts b/ts/data/data.ts index 5149a8819..e0efcf99f 100644 --- a/ts/data/data.ts +++ b/ts/data/data.ts @@ -10,6 +10,7 @@ import { HexKeyPair } from '../receiver/keypairs'; import { getSodiumRenderer } from '../session/crypto'; import { PubKey } from '../session/types'; import { ReduxConversationType } from '../state/ducks/conversations'; +import { MsgDuplicateSearchOpenGroup } from '../types/sqlSharedTypes'; import { ExpirationTimerOptions } from '../util/expiringMessages'; import { Storage } from '../util/storage'; import { channels } from './channels'; @@ -373,22 +374,11 @@ export async function getMessageBySenderAndSentAt({ return new MessageModel(messages[0]); } -export async function getMessageBySenderAndServerTimestamp({ - source, - serverTimestamp, -}: { - source: string; - serverTimestamp: number; -}): Promise { - const messages = await channels.getMessageBySenderAndServerTimestamp({ - source, - serverTimestamp, - }); - if (!messages || !messages.length) { - return null; - } - - return new MessageModel(messages[0]); +export async function filterAlreadyFetchedOpengroupMessage( + msgDetails: MsgDuplicateSearchOpenGroup +): Promise { + const msgDetailsNotAlreadyThere = await channels.filterAlreadyFetchedOpengroupMessage(msgDetails); + return msgDetailsNotAlreadyThere || []; } /** diff --git a/ts/data/dataInit.ts b/ts/data/dataInit.ts index efdcc29fa..e1a33fb7b 100644 --- a/ts/data/dataInit.ts +++ b/ts/data/dataInit.ts @@ -48,7 +48,7 @@ const channelsToMake = new Set([ 'removeAllMessagesInConversation', 'getMessageCount', 'getMessageBySenderAndSentAt', - 'getMessageBySenderAndServerTimestamp', + 'filterAlreadyFetchedOpengroupMessage', 'getMessageBySenderAndTimestamp', 'getMessageIdsFromServerIds', 'getMessageById', diff --git a/ts/node/sql.ts b/ts/node/sql.ts index 22eda7ed2..d190dbd9d 100644 --- a/ts/node/sql.ts +++ b/ts/node/sql.ts @@ -2232,25 +2232,28 @@ function getMessageBySenderAndTimestamp({ return map(rows, row => jsonToObject(row.json)); } -function getMessageBySenderAndServerTimestamp({ - source, - serverTimestamp, -}: { - source: string; - serverTimestamp: number; -}) { - const rows = assertGlobalInstance() - .prepare( - `SELECT json FROM ${MESSAGES_TABLE} WHERE - source = $source AND +function filterAlreadyFetchedOpengroupMessage( + msgDetails: Array<{ sender: string; serverTimestamp: number }> // MsgDuplicateSearchOpenGroup +) { + return msgDetails.filter(msg => { + const rows = assertGlobalInstance() + .prepare( + `SELECT source, serverTimestamp FROM ${MESSAGES_TABLE} WHERE + source = $sender AND serverTimestamp = $serverTimestamp;` - ) - .all({ - source, - serverTimestamp, - }); - - return map(rows, row => jsonToObject(row.json)); + ) + .all({ + sender: msg.sender, + serverTimestamp: msg.serverTimestamp, + }); + if (rows.length) { + console.info( + `filtering out already received message from ${msg.sender} at ${msg.serverTimestamp} ` + ); + return false; + } + return true; + }); } function getUnreadByConversation(conversationId: string) { @@ -3521,7 +3524,7 @@ export const sqlNode = { getUnreadCountByConversation, getMessageCountByType, getMessageBySenderAndSentAt, - getMessageBySenderAndServerTimestamp, + filterAlreadyFetchedOpengroupMessage, getMessageBySenderAndTimestamp, getMessageIdsFromServerIds, getMessageById, diff --git a/ts/receiver/dataMessage.ts b/ts/receiver/dataMessage.ts index 7d22f68a6..69b821d9f 100644 --- a/ts/receiver/dataMessage.ts +++ b/ts/receiver/dataMessage.ts @@ -9,10 +9,7 @@ import _ from 'lodash'; import { StringUtils, UserUtils } from '../session/utils'; import { getConversationController } from '../session/conversations'; import { handleClosedGroupControlMessage } from './closedGroups'; -import { - getMessageBySenderAndSentAt, - getMessageBySenderAndServerTimestamp, -} from '../../ts/data/data'; +import { getMessageBySenderAndSentAt } from '../../ts/data/data'; import { ConversationModel, ConversationTypeEnum } from '../models/conversation'; import { toLogFormat } from '../types/attachments/Errors'; @@ -276,31 +273,6 @@ export async function isSwarmMessageDuplicate({ } } -export async function isOpengroupMessageDuplicate({ - sender, - serverTimestamp, -}: { - sender: string; - serverTimestamp: number; -}) { - // serverTimestamp is only used for opengroupv2 - try { - const result = await getMessageBySenderAndServerTimestamp({ - source: sender, - serverTimestamp, - }); - - // if we have a result, it means a specific user sent two messages either with the same serverTimestamp. - // no need to do anything else, those messages must be the same - // Note: this test is not based on which conversation the user sent the message - // but we consider that a user sending two messages with the same serverTimestamp is unlikely - return Boolean(result); - } catch (error) { - window?.log?.error('isOpengroupMessageDuplicate error:', toLogFormat(error)); - return false; - } -} - // tslint:disable:cyclomatic-complexity max-func-body-length */ async function handleSwarmMessage( msgModel: MessageModel, diff --git a/ts/receiver/opengroup.ts b/ts/receiver/opengroup.ts index 2575989ee..97ef0de17 100644 --- a/ts/receiver/opengroup.ts +++ b/ts/receiver/opengroup.ts @@ -1,5 +1,4 @@ -import { noop } from 'lodash'; -import { ConversationTypeEnum } from '../models/conversation'; +import _, { noop } from 'lodash'; import { createPublicMessageSentFromNotUs, createPublicMessageSentFromUs, @@ -11,8 +10,8 @@ import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/uti import { getConversationController } from '../session/conversations'; import { removeMessagePadding } from '../session/crypto/BufferPadding'; import { UserUtils } from '../session/utils'; +import { perfEnd, perfStart } from '../session/utils/Performance'; import { fromBase64ToArray } from '../session/utils/String'; -import { isOpengroupMessageDuplicate } from './dataMessage'; import { handleMessageJob, toRegularMessage } from './queuedJob'; export async function handleOpenGroupV2Message( @@ -26,8 +25,12 @@ export async function handleOpenGroupV2Message( return; } - // Note: opengroup messages are not padded - const dataUint = new Uint8Array(removeMessagePadding(fromBase64ToArray(base64EncodedData))); + // Note: opengroup messages should not be padded + perfStart(`fromBase64ToArray-${base64EncodedData.length}`); + const arr = fromBase64ToArray(base64EncodedData); + perfEnd(`fromBase64ToArray-${base64EncodedData.length}`, 'fromBase64ToArray'); + + const dataUint = new Uint8Array(removeMessagePadding(arr)); const decoded = SignalService.Content.decode(dataUint); @@ -42,43 +45,38 @@ export async function handleOpenGroupV2Message( return; } - if (!getConversationController().get(conversationId)) { - window?.log?.error('Received a message for an unknown convo. Skipping'); + if ( + !getConversationController() + .get(conversationId) + ?.isOpenGroupV2() + ) { + window?.log?.error('Received a message for an unknown convo or not an v2. Skipping'); return; } - const conversation = await getConversationController().getOrCreateAndWait( - conversationId, - ConversationTypeEnum.GROUP - ); + const groupConvo = getConversationController().get(conversationId); - if (!conversation) { + if (!groupConvo) { window?.log?.warn('Skipping handleJob for unknown convo: ', conversationId); return; } - void conversation.queueJob(async () => { + void groupConvo.queueJob(async () => { const isMe = UserUtils.isUsFromCache(sender); const commonAttributes = { serverTimestamp: sentTimestamp, serverId, conversationId }; const attributesForNotUs = { ...commonAttributes, sender }; - // those lines just create an empty message with some basic stuff set. + // those lines just create an empty message only in memory with some basic stuff set. // the whole decoding of data is happening in handleMessageJob() const msgModel = isMe ? createPublicMessageSentFromUs(commonAttributes) : createPublicMessageSentFromNotUs(attributesForNotUs); - // WARNING this is important that the isOpengroupMessageDuplicate is made INSIDE the conversation.queueJob call - const isDuplicate = await isOpengroupMessageDuplicate(attributesForNotUs); - - if (isDuplicate) { - window?.log?.info('Received duplicate opengroup message. Dropping it.'); - return; - } + // Note, deduplication is made in filterDuplicatesFromDbAndIncoming now await handleMessageJob( msgModel, - conversation, + groupConvo, toRegularMessage(decoded?.dataMessage as SignalService.DataMessage), noop, sender, diff --git a/ts/session/apis/open_group_api/opengroupV2/OpenGroupServerPoller.ts b/ts/session/apis/open_group_api/opengroupV2/OpenGroupServerPoller.ts index a975918fe..2e3e7123f 100644 --- a/ts/session/apis/open_group_api/opengroupV2/OpenGroupServerPoller.ts +++ b/ts/session/apis/open_group_api/opengroupV2/OpenGroupServerPoller.ts @@ -13,7 +13,11 @@ import { } from './OpenGroupAPIV2CompactPoll'; import _ from 'lodash'; import { ConversationModel } from '../../../../models/conversation'; -import { getMessageIdsFromServerIds, removeMessage } from '../../../../data/data'; +import { + filterAlreadyFetchedOpengroupMessage, + getMessageIdsFromServerIds, + removeMessage, +} from '../../../../data/data'; import { getV2OpenGroupRoom, saveV2OpenGroupRoom } from '../../../../data/opengroups'; import { OpenGroupMessageV2 } from './OpenGroupMessageV2'; import autoBind from 'auto-bind'; @@ -23,7 +27,6 @@ import { processNewAttachment } from '../../../../types/MessageAttachment'; import { MIME } from '../../../../types'; import { handleOpenGroupV2Message } from '../../../../receiver/opengroup'; import { callUtilsWorker } from '../../../../webworker/workers/util_worker_interface'; - const pollForEverythingInterval = DURATION.SECONDS * 10; const pollForRoomAvatarInterval = DURATION.DAYS * 1; const pollForMemberCountInterval = DURATION.MINUTES * 10; @@ -394,6 +397,39 @@ const handleDeletions = async ( } }; +const filterDuplicatesFromDbAndIncoming = async ( + newMessages: Array +): Promise> => { + const start = Date.now(); + // open group messages are deduplicated by sender and serverTimestamp only. + // first make sure that the incoming messages have no duplicates: + const filtered = _.uniqWith(newMessages, (a, b) => { + return ( + Boolean(a.sender) && + Boolean(a.sentTimestamp) && + a.sender === b.sender && + a.sentTimestamp === b.sentTimestamp + ); + // make sure a sender is set, as we cast it just below + }).filter(m => Boolean(m.sender)); + + // now, check database to make sure those messages are not already fetched + const filteredInDb = await filterAlreadyFetchedOpengroupMessage( + filtered.map(m => { + return { sender: m.sender as string, serverTimestamp: m.sentTimestamp }; + }) + ); + + window.log.debug( + `filterDuplicatesFromDbAndIncoming of ${newMessages.length} messages took ${Date.now() - + start}ms.` + ); + const opengroupMessagesFiltered = filteredInDb?.map(f => { + return newMessages.find(m => m.sender === f.sender && m.sentTimestamp === f.serverTimestamp); + }); + return _.compact(opengroupMessagesFiltered) || []; +}; + const handleNewMessages = async ( newMessages: Array, conversationId: string, @@ -419,10 +455,11 @@ const handleNewMessages = async ( // TODO filter out duplicates ? const roomDetails: OpenGroupRequestCommonType = _.pick(roomInfos, 'serverUrl', 'roomId'); + const filteredDuplicates = await filterDuplicatesFromDbAndIncoming(newMessages); // tslint:disable-next-line: prefer-for-of - for (let index = 0; index < newMessages.length; index++) { - const newMessage = newMessages[index]; + for (let index = 0; index < filteredDuplicates.length; index++) { + const newMessage = filteredDuplicates[index]; try { await handleOpenGroupV2Message(newMessage, roomDetails); } catch (e) { diff --git a/ts/types/attachments/migrations.ts b/ts/types/attachments/migrations.ts index 95407a551..5626803d6 100644 --- a/ts/types/attachments/migrations.ts +++ b/ts/types/attachments/migrations.ts @@ -1,9 +1,8 @@ import * as GoogleChrome from '../../../ts/util/GoogleChrome'; import * as MIME from '../../../ts/types/MIME'; import { toLogFormat } from './Errors'; -import { arrayBufferToBlob, blobToArrayBuffer, dataURLToBlob } from 'blob-util'; +import { arrayBufferToBlob, blobToArrayBuffer } from 'blob-util'; -import loadImage from 'blueimp-load-image'; import { isString } from 'lodash'; import { getImageDimensions, @@ -20,27 +19,6 @@ import { readAttachmentData, writeNewAttachmentData, } from '../MessageAttachment'; -import { perfEnd, perfStart } from '../../session/utils/Performance'; - -const DEFAULT_JPEG_QUALITY = 0.85; - -// File | Blob | URLString -> LoadImageOptions -> Promise -// -// Documentation for `options` (`LoadImageOptions`): -// https://github.com/blueimp/JavaScript-Load-Image/tree/v2.18.0#options -export const autoOrientJpegImage = async ( - fileOrBlobOrURL: string | File | Blob -): Promise => { - perfStart(`autoOrientJpegImage`); - const loadedImage = await loadImage(fileOrBlobOrURL, { orientation: true, canvas: true }); - perfEnd(`autoOrientJpegImage`, `autoOrientJpegImage`); - const dataURL = (loadedImage.image as HTMLCanvasElement).toDataURL( - MIME.IMAGE_JPEG, - DEFAULT_JPEG_QUALITY - ); - - return dataURL; -}; // Returns true if `rawAttachment` is a valid attachment based on our current schema. // Over time, we can expand this definition to become more narrow, e.g. require certain @@ -80,8 +58,7 @@ export const autoOrientJPEGAttachment = async (attachment: { } const dataBlob = arrayBufferToBlob(attachment.data, attachment.contentType); - const newDataBlob = dataURLToBlob(await autoOrientJpegImage(dataBlob)); - const newDataArrayBuffer = await blobToArrayBuffer(newDataBlob); + const newDataArrayBuffer = await blobToArrayBuffer(dataBlob); // IMPORTANT: We overwrite the existing `data` `ArrayBuffer` losing the original // image data. Ideally, we’d preserve the original image data for users who want to diff --git a/ts/types/sqlSharedTypes.ts b/ts/types/sqlSharedTypes.ts new file mode 100644 index 000000000..f4b90dacc --- /dev/null +++ b/ts/types/sqlSharedTypes.ts @@ -0,0 +1,4 @@ +export type MsgDuplicateSearchOpenGroup = Array<{ + sender: string; + serverTimestamp: number; +}>; diff --git a/ts/util/attachmentsUtil.ts b/ts/util/attachmentsUtil.ts index 270c47044..c601b72ae 100644 --- a/ts/util/attachmentsUtil.ts +++ b/ts/util/attachmentsUtil.ts @@ -5,7 +5,7 @@ import { sendDataExtractionNotification } from '../session/messages/outgoing/con import { AttachmentType, save } from '../types/Attachment'; import { StagedAttachmentType } from '../components/conversation/composition/CompositionBox'; import { getAbsoluteAttachmentPath, processNewAttachment } from '../types/MessageAttachment'; -import { arrayBufferToBlob, dataURLToBlob } from 'blob-util'; +import { arrayBufferToBlob } from 'blob-util'; import { IMAGE_GIF, IMAGE_JPEG, IMAGE_PNG, IMAGE_TIFF, IMAGE_UNKNOWN } from '../types/MIME'; import { THUMBNAIL_SIDE } from '../types/attachments/VisualAttachment'; @@ -118,6 +118,22 @@ export async function autoScaleForThumbnail { + return new Promise(resolve => { + canvas.toBlob( + blob => { + resolve(blob); + }, + type, + quality + ); + }); +} + /** * Scale down an image to fit in the required dimension. * Note: This method won't crop if needed, @@ -171,6 +187,8 @@ export async function autoScale( maxWidth: makeSquare ? maxMeasurements?.maxSide : maxWidth, maxHeight: makeSquare ? maxMeasurements?.maxSide : maxHeight, ...crop, + orientation: 1, + aspectRatio: makeSquare ? 1 : undefined, canvas: true, }; @@ -198,16 +216,34 @@ export async function autoScale( blob, }; } + window.log.debug('canvas.originalWidth', { + canvasOriginalWidth: canvas.originalWidth, + canvasOriginalHeight: canvas.originalHeight, + maxWidth, + maxHeight, + blobsize: blob.size, + maxSize, + makeSquare, + }); let quality = 0.95; let i = 4; + const start = Date.now(); do { i -= 1; - window.log.info('autoscale of ', attachment, i); - readAndResizedBlob = dataURLToBlob( - (canvas.image as HTMLCanvasElement).toDataURL('image/jpeg', quality) + window.log.info(`autoscale iteration: [${i}] for:`, attachment); + + perfStart(`autoscale-canvasToBlob-${attachment.blob.size}`); + const tempBlob = await canvasToBlob(canvas.image as HTMLCanvasElement, 'image/jpeg', quality); + perfEnd( + `autoscale-canvasToBlob-${attachment.blob.size}`, + `autoscale-canvasToBlob-${attachment.blob.size}` ); + if (!tempBlob) { + throw new Error('Failed to get blob during canvasToBlob.'); + } + readAndResizedBlob = tempBlob; quality = (quality * maxSize) / readAndResizedBlob.size; if (quality > 1) { @@ -218,6 +254,8 @@ export async function autoScale( if (readAndResizedBlob.size > maxSize) { throw new Error('Cannot add this attachment even after trying to scale it down.'); } + window.log.debug(`autoscale took ${Date.now() - start}ms `); + return { contentType: attachment.contentType, blob: readAndResizedBlob,