|
|
@ -58,6 +58,7 @@ import { OpenGroupRequestCommonType } from '../../data/types';
|
|
|
|
import { NetworkTime } from '../../util/NetworkTime';
|
|
|
|
import { NetworkTime } from '../../util/NetworkTime';
|
|
|
|
import { MergedAbortSignal } from '../apis/snode_api/requestWith';
|
|
|
|
import { MergedAbortSignal } from '../apis/snode_api/requestWith';
|
|
|
|
import { WithAllow401s } from '../types/with';
|
|
|
|
import { WithAllow401s } from '../types/with';
|
|
|
|
|
|
|
|
import { ERROR_421_HANDLED_RETRY_REQUEST } from '../apis/snode_api/onions';
|
|
|
|
|
|
|
|
|
|
|
|
// ================ SNODE STORE ================
|
|
|
|
// ================ SNODE STORE ================
|
|
|
|
|
|
|
|
|
|
|
@ -247,66 +248,76 @@ async function sendSingleMessage({
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return pRetry(
|
|
|
|
return pRetry(
|
|
|
|
async () => {
|
|
|
|
async () => {
|
|
|
|
const recipient = PubKey.cast(message.device);
|
|
|
|
try {
|
|
|
|
// we can only have a single message in this send function for now
|
|
|
|
const recipient = PubKey.cast(message.device);
|
|
|
|
const [encryptedAndWrapped] = await MessageWrapper.encryptMessagesAndWrap([
|
|
|
|
// we can only have a single message in this send function for now
|
|
|
|
{
|
|
|
|
const [encryptedAndWrapped] = await MessageWrapper.encryptMessagesAndWrap([
|
|
|
|
destination: message.device,
|
|
|
|
{
|
|
|
|
plainTextBuffer: message.plainTextBuffer,
|
|
|
|
destination: message.device,
|
|
|
|
namespace: message.namespace,
|
|
|
|
plainTextBuffer: message.plainTextBuffer,
|
|
|
|
ttl: message.ttl,
|
|
|
|
namespace: message.namespace,
|
|
|
|
identifier: message.identifier,
|
|
|
|
ttl: message.ttl,
|
|
|
|
networkTimestamp: message.networkTimestampCreated,
|
|
|
|
identifier: message.identifier,
|
|
|
|
isSyncMessage: Boolean(isSyncMessage),
|
|
|
|
networkTimestamp: message.networkTimestampCreated,
|
|
|
|
},
|
|
|
|
isSyncMessage: Boolean(isSyncMessage),
|
|
|
|
]);
|
|
|
|
},
|
|
|
|
|
|
|
|
]);
|
|
|
|
// make sure to update the local sent_at timestamp, because sometimes, we will get the just pushed message in the receiver side
|
|
|
|
|
|
|
|
// before we return from the await below.
|
|
|
|
// make sure to update the local sent_at timestamp, because sometimes, we will get the just pushed message in the receiver side
|
|
|
|
// and the isDuplicate messages relies on sent_at timestamp to be valid.
|
|
|
|
// before we return from the await below.
|
|
|
|
const found = await Data.getMessageById(encryptedAndWrapped.identifier);
|
|
|
|
// and the isDuplicate messages relies on sent_at timestamp to be valid.
|
|
|
|
|
|
|
|
const found = await Data.getMessageById(encryptedAndWrapped.identifier);
|
|
|
|
// make sure to not update the sent timestamp if this a currently syncing message
|
|
|
|
|
|
|
|
if (found && !found.get('sentSync')) {
|
|
|
|
// make sure to not update the sent timestamp if this a currently syncing message
|
|
|
|
found.set({ sent_at: encryptedAndWrapped.networkTimestamp });
|
|
|
|
if (found && !found.get('sentSync')) {
|
|
|
|
await found.commit();
|
|
|
|
found.set({ sent_at: encryptedAndWrapped.networkTimestamp });
|
|
|
|
}
|
|
|
|
await found.commit();
|
|
|
|
const isSyncedDeleteAfterReadMessage =
|
|
|
|
}
|
|
|
|
found &&
|
|
|
|
const isSyncedDeleteAfterReadMessage =
|
|
|
|
UserUtils.isUsFromCache(recipient.key) &&
|
|
|
|
found &&
|
|
|
|
found.getExpirationType() === 'deleteAfterRead' &&
|
|
|
|
UserUtils.isUsFromCache(recipient.key) &&
|
|
|
|
found.getExpireTimerSeconds() > 0 &&
|
|
|
|
found.getExpirationType() === 'deleteAfterRead' &&
|
|
|
|
encryptedAndWrapped.isSyncMessage;
|
|
|
|
found.getExpireTimerSeconds() > 0 &&
|
|
|
|
|
|
|
|
encryptedAndWrapped.isSyncMessage;
|
|
|
|
let overriddenTtl = encryptedAndWrapped.ttl;
|
|
|
|
|
|
|
|
if (isSyncedDeleteAfterReadMessage && found.getExpireTimerSeconds() > 0) {
|
|
|
|
let overriddenTtl = encryptedAndWrapped.ttl;
|
|
|
|
const asMs = found.getExpireTimerSeconds() * 1000;
|
|
|
|
if (isSyncedDeleteAfterReadMessage && found.getExpireTimerSeconds() > 0) {
|
|
|
|
window.log.debug(`overriding ttl for synced DaR message to ${asMs}`);
|
|
|
|
const asMs = found.getExpireTimerSeconds() * 1000;
|
|
|
|
overriddenTtl = asMs;
|
|
|
|
window.log.debug(`overriding ttl for synced DaR message to ${asMs}`);
|
|
|
|
}
|
|
|
|
overriddenTtl = asMs;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const subRequests = await messagesToRequests({
|
|
|
|
|
|
|
|
encryptedAndWrappedArr: [{ ...encryptedAndWrapped, ttl: overriddenTtl }],
|
|
|
|
|
|
|
|
destination,
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const subRequests = await messagesToRequests({
|
|
|
|
const targetNode = await SnodePool.getNodeFromSwarmOrThrow(destination);
|
|
|
|
encryptedAndWrappedArr: [{ ...encryptedAndWrapped, ttl: overriddenTtl }],
|
|
|
|
|
|
|
|
destination,
|
|
|
|
const batchResult = await BatchRequests.doUnsignedSnodeBatchRequestNoRetries({
|
|
|
|
});
|
|
|
|
unsignedSubRequests: subRequests,
|
|
|
|
|
|
|
|
targetNode,
|
|
|
|
const targetNode = await SnodePool.getNodeFromSwarmOrThrow(destination);
|
|
|
|
timeoutMs: 10 * DURATION.SECONDS,
|
|
|
|
|
|
|
|
associatedWith: destination,
|
|
|
|
const batchResult = await BatchRequests.doUnsignedSnodeBatchRequestNoRetries({
|
|
|
|
allow401s: false,
|
|
|
|
unsignedSubRequests: subRequests,
|
|
|
|
method: 'sequence',
|
|
|
|
targetNode,
|
|
|
|
abortSignal: null,
|
|
|
|
timeoutMs: 10 * DURATION.SECONDS,
|
|
|
|
});
|
|
|
|
associatedWith: destination,
|
|
|
|
|
|
|
|
allow401s: false,
|
|
|
|
await handleBatchResultWithSubRequests({ batchResult, subRequests, destination });
|
|
|
|
method: 'sequence',
|
|
|
|
return {
|
|
|
|
abortSignal: null,
|
|
|
|
wrappedEnvelope: encryptedAndWrapped.encryptedAndWrappedData,
|
|
|
|
});
|
|
|
|
effectiveTimestamp: encryptedAndWrapped.networkTimestamp,
|
|
|
|
|
|
|
|
};
|
|
|
|
await handleBatchResultWithSubRequests({ batchResult, subRequests, destination });
|
|
|
|
} catch (e) {
|
|
|
|
return {
|
|
|
|
if (e instanceof pRetry.AbortError && e.message === ERROR_421_HANDLED_RETRY_REQUEST) {
|
|
|
|
wrappedEnvelope: encryptedAndWrapped.encryptedAndWrappedData,
|
|
|
|
// sendSingleMessage handles fetching a new snode itself once 421 was handled, but a pRetry.AbortError thrown
|
|
|
|
effectiveTimestamp: encryptedAndWrapped.networkTimestamp,
|
|
|
|
// will stop the retry process. We need to catch this specific error and throw it again (as a normal error)
|
|
|
|
};
|
|
|
|
// to let pRetry retry the request.
|
|
|
|
|
|
|
|
throw new Error(e.message);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
throw e;
|
|
|
|
|
|
|
|
}
|
|
|
|
},
|
|
|
|
},
|
|
|
|
{
|
|
|
|
{
|
|
|
|
retries: Math.max(attempts - 1, 0),
|
|
|
|
retries: Math.max(attempts - 1, 0),
|
|
|
|