fix async problem with receive forEach, implement most of the annotation processing for multidevice

pull/539/head
Ryan Tharp 6 years ago
parent 7b22806bf0
commit 87474d48b5

@ -1,5 +1,5 @@
/* global log, textsecure, libloki, Signal, Whisper, Headers, ConversationController, /* global log, textsecure, libloki, Signal, Whisper, Headers, ConversationController,
clearTimeout, MessageController, libsignal, StringView, window, _ */ clearTimeout, MessageController, libsignal, StringView, window, _, lokiFileServerAPI */
const EventEmitter = require('events'); const EventEmitter = require('events');
const nodeFetch = require('node-fetch'); const nodeFetch = require('node-fetch');
const { URL, URLSearchParams } = require('url'); const { URL, URLSearchParams } = require('url');
@ -383,8 +383,7 @@ class LokiAppDotNetServerAPI {
if (pubKeys.length > 200) { if (pubKeys.length > 200) {
log.warn('Too many pubKeys given to getUsersAnnotations!'); log.warn('Too many pubKeys given to getUsersAnnotations!');
} }
console.log('getUsersAnnotations', pubKeys) const res = await this.serverRequest('users', {
const res = await this.serverRequest(`users`, {
method: 'GET', method: 'GET',
params: { params: {
ids: pubKeys.join(','), ids: pubKeys.join(','),
@ -445,6 +444,10 @@ class LokiPublicChannelAPI {
// Cache for duplicate checking // Cache for duplicate checking
this.lastMessagesCache = []; this.lastMessagesCache = [];
// Multidevice states
this.slavePrimaryMap = {};
this.primaryUserProfileName = {};
// end properties // end properties
log.info(`registered LokiPublicChannel ${channelId}`); log.info(`registered LokiPublicChannel ${channelId}`);
@ -750,158 +753,290 @@ class LokiPublicChannelAPI {
if (!res.err && res.response) { if (!res.err && res.response) {
let receivedAt = new Date().getTime(); let receivedAt = new Date().getTime();
const pubKeys = [] const pubKeys = [];
const pendingMessages = [] let pendingMessages = [];
res.response.data.reverse().forEach(async adnMessage => { pendingMessages = await Promise.all(
// still update our last received if deleted, not signed or not valid res.response.data.reverse().map(async adnMessage => {
this.lastGot = !this.lastGot // still update our last received if deleted, not signed or not valid
? adnMessage.id this.lastGot = !this.lastGot
: Math.max(this.lastGot, adnMessage.id); ? adnMessage.id
: Math.max(this.lastGot, adnMessage.id);
if (
!adnMessage.id || if (
!adnMessage.user || !adnMessage.id ||
!adnMessage.user.username || // pubKey lives in the username field !adnMessage.user ||
!adnMessage.user.name || // profileName lives in the name field !adnMessage.user.username || // pubKey lives in the username field
!adnMessage.text || !adnMessage.user.name || // profileName lives in the name field
adnMessage.is_deleted !adnMessage.text ||
) { adnMessage.is_deleted
return; // Invalid or delete message ) {
} return; // Invalid or delete message
}
const messengerData = await this.getMessengerData(adnMessage);
if (messengerData === false) {
return;
}
const { timestamp, quote } = messengerData; const messengerData = await this.getMessengerData(adnMessage);
if (!timestamp) { if (messengerData === false) {
return; // Invalid message return;
} }
// Duplicate check const { timestamp, quote } = messengerData;
const isDuplicate = message => { if (!timestamp) {
// The username in this case is the users pubKey return; // Invalid message
const sameUsername = message.username === adnMessage.user.username; }
const sameText = message.text === adnMessage.text;
// Don't filter out messages that are too far apart from each other
const timestampsSimilar =
Math.abs(message.timestamp - timestamp) <=
PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES;
return sameUsername && sameText && timestampsSimilar;
};
// Filter out any messages that we got previously // Duplicate check
if (this.lastMessagesCache.some(isDuplicate)) { const isDuplicate = message => {
return; // Duplicate message // The username in this case is the users pubKey
} const sameUsername = message.username === adnMessage.user.username;
const sameText = message.text === adnMessage.text;
// Don't filter out messages that are too far apart from each other
const timestampsSimilar =
Math.abs(message.timestamp - timestamp) <=
PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES;
return sameUsername && sameText && timestampsSimilar;
};
// FIXME: maybe move after the de-multidev-decode // Filter out any messages that we got previously
// Add the message to the lastMessage cache and keep the last 5 recent messages if (this.lastMessagesCache.some(isDuplicate)) {
this.lastMessagesCache = [ return; // Duplicate message
...this.lastMessagesCache, }
{
username: adnMessage.user.username,
text: adnMessage.text,
timestamp,
},
].splice(-5);
const from = adnMessage.user.name; // profileName
console.log('checking', adnMessage.user.username, 'in', pubKeys)
if (pubKeys.indexOf(adnMessage.user.username) == -1) {
console.log('pushing pubkey', adnMessage.user.username)
pubKeys.push(adnMessage.user.username)
console.log('pubKeys', pubKeys)
}
const messageData = { // FIXME: maybe move after the de-multidev-decode
serverId: adnMessage.id, // Add the message to the lastMessage cache and keep the last 5 recent messages
clientVerified: true, this.lastMessagesCache = [
friendRequest: false, ...this.lastMessagesCache,
source: adnMessage.user.username, {
sourceDevice: 1, username: adnMessage.user.username,
timestamp, text: adnMessage.text,
serverTimestamp: timestamp, timestamp,
receivedAt,
isPublic: true,
message: {
body: adnMessage.text,
attachments: [],
group: {
id: this.conversationId,
type: textsecure.protobuf.GroupContext.Type.DELIVER,
}, },
flags: 0, ].splice(-5);
expireTimer: 0,
profileKey: null, const from = adnMessage.user.name; // profileName
if (pubKeys.indexOf(`@${adnMessage.user.username}`) === -1) {
pubKeys.push(`@${adnMessage.user.username}`);
}
const messageData = {
serverId: adnMessage.id,
clientVerified: true,
friendRequest: false,
source: adnMessage.user.username,
sourceDevice: 1,
timestamp, timestamp,
received_at: receivedAt, serverTimestamp: timestamp,
sent_at: timestamp, receivedAt,
quote, isPublic: true,
contact: [], message: {
preview: [], body: adnMessage.text,
profile: { attachments: [],
displayName: from, group: {
id: this.conversationId,
type: textsecure.protobuf.GroupContext.Type.DELIVER,
},
flags: 0,
expireTimer: 0,
profileKey: null,
timestamp,
received_at: receivedAt,
sent_at: timestamp,
quote,
contact: [],
preview: [],
profile: {
displayName: from,
},
}, },
}, };
}; receivedAt += 1; // Ensure different arrival times
receivedAt += 1; // Ensure different arrival times
/*
this.serverAPI.chatAPI.emit('publicMessage', {
message: messageData,
});
*/
console.log('pushing back message from', adnMessage.user.username)
pendingMessages.push(messageData)
// now process any user meta data updates // now process any user meta data updates
// - update their conversation with a potentially new avatar // - update their conversation with a potentially new avatar
}); return messageData;
})
);
this.conversation.setLastRetrievedMessage(this.lastGot); this.conversation.setLastRetrievedMessage(this.lastGot);
//
console.log('pendingMessages', pendingMessages.length)
if (pendingMessages.length) { if (pendingMessages.length) {
const slavePrimaryMap = {}; this.slavePrimaryMap = {};
console.log('premultiDeviceResults', pubKeys) // console.log('premultiDeviceResults', pubKeys)
if (pubKeys.length) { if (pubKeys.length) {
const multiDeviceResults = await lokiFileServerAPI.getDeviceMappingForUsers(pubKeys); const multiDeviceResults = await lokiFileServerAPI.getDeviceMappingForUsers(
console.log('multiDeviceResults', multiDeviceResults) pubKeys
// no user or isPrimary means not multidevice, send event now );
// console.log('multiDeviceResults', multiDeviceResults);
multiDeviceResults.forEach(user => {
if (user.annotations) {
user.annotations.forEach(note => {
if (note.type === 'network.loki.messenger.devicemapping') {
// console.log('devmap note', note);
// is slave?
if (note.value.isPrimary === '0') {
const { authorisations } = note.value;
if (Array.isArray(authorisations)) {
authorisations.forEach(auth => {
// console.log('auth', auth);
// FIXME: verify secondary sig
if (1) {
// add map to slavePrimaryMap
if (
this.slavePrimaryMap[user.username] &&
this.slavePrimaryMap[user.username] !==
auth.primaryDevicePubKey
) {
log.warn(
`file server user annotation primaryKey mismatch, had ${
this.slavePrimaryMap[user.username]
} now ${auth.primaryDevicePubKey} for ${
user.username
}`
);
return;
}
this.slavePrimaryMap[user.username] =
auth.primaryDevicePubKey;
}
});
}
}
}
});
}
});
log.info('updated slavePrimaryMap', this.slavePrimaryMap);
const primaryPubKeys = []; const primaryPubKeys = [];
const slaveMessages = {};
// go through multiDeviceResults and get primary Pubkey // go through multiDeviceResults and get primary Pubkey
// verify secondary sig pendingMessages.forEach(messageData => {
// add map to slavePrimaryMap // why am I getting these?
if (messageData === undefined) {
log.warn('invalid pendingMessages');
return;
}
if (this.slavePrimaryMap[messageData.source]) {
const primaryPubKey = this.slavePrimaryMap[messageData.source];
// add to lookup (if we don't already have it)
if (primaryPubKeys.indexOf(`@${primaryPubKey}`) === -1) {
primaryPubKeys.push(`@${primaryPubKey}`);
}
// delay sending the message
if (slaveMessages[messageData.source] === undefined) {
slaveMessages[messageData.source] = [messageData];
} else {
slaveMessages[messageData.source].push(messageData);
}
} else {
// no user or isPrimary means not multidevice, send event now
this.serverAPI.chatAPI.emit('publicMessage', {
message: messageData,
});
}
});
pendingMessages = []; // free memory
const verifiedPrimaryPKs = []; const verifiedPrimaryPKs = [];
// get a list of all of primary pubKeys to verify the secondaryDevice // get a list of all of primary pubKeys to verify the secondaryDevice
const primaryDeviceResults = await lokiFileServerAPI.getDeviceMappingForUsers(primaryPubKeys); const primaryDeviceResults = await lokiFileServerAPI.getDeviceMappingForUsers(
// go through primaryDeviceResults and verify primary sig primaryPubKeys
// if not verified remove from slavePrimaryMap );
// go through primaryDeviceResults
primaryDeviceResults.forEach(user => {
if (user.annotations) {
user.annotations.forEach(note => {
if (note.type === 'network.loki.messenger.devicemapping') {
if (note.value.isPrimary) {
const { authorisations } = note.value;
let found = false;
if (Array.isArray(authorisations)) {
authorisations.forEach(auth => {
// FIXME: verify primary sig
if (
verifiedPrimaryPKs.indexOf(
`@${auth.primaryDevicePubKey}`
) === -1
) {
verifiedPrimaryPKs.push(
`@${auth.primaryDevicePubKey}`
);
}
found = true;
});
}
if (found) {
return;
}
// if not verified remove from slavePrimaryMap
}
// not primary or verified
/*
Object.keys(slavePrimaryMap).forEach(slaveKey => {
if (slavePrimaryMap[slaveKey] ==
})
*/
}
});
}
});
// get final list of verified chat server profile names // get final list of verified chat server profile names
const verifiedDeviceResults = await this.getDeviceMappingForUsers(verifiedPrimaryPKs); const verifiedDeviceResults = await this.serverAPI.getUsersAnnotations(
pubKeys
);
// console.log('verifiedDeviceResults', verifiedDeviceResults)
// go through verifiedDeviceResults // go through verifiedDeviceResults
// find messages for original slave key using slavePrimaryMap this.primaryUserProfileName = {};
verifiedDeviceResults.forEach(user => {
this.primaryUserProfileName[user.username] = user.name;
});
Object.keys(slaveMessages).forEach(slaveKey => {
const primaryPubKey = this.slavePrimaryMap[slaveKey];
slaveMessages[slaveKey].forEach(messageDataP => {
const messageData = messageDataP; // for linter
if (this.slavePrimaryMap[messageData.source]) {
// rewrite source, profile
messageData.source = primaryPubKey;
messageData.message.profile.displayName = this.primaryUserProfileName[
primaryPubKey
];
}
// console.log('messageData', messageData)
this.serverAPI.chatAPI.emit('publicMessage', {
message: messageData,
});
});
});
} }
pendingMessages.forEach(messageData => { // console.log('pendingMessages len', pendingMessages.length);
if (slavePrimaryMap[messageData.source]) { // console.log('pendingMessages', pendingMessages);
// rewrite source, profile // find messages for original slave key using slavePrimaryMap
messageData.source = slavePrimaryMap.username if (pendingMessages.length) {
messageData.message.profile.displayName = slavePrimaryMap.name pendingMessages.forEach(messageDataP => {
} const messageData = messageDataP; // for linter
this.serverAPI.chatAPI.emit('publicMessage', { // why am I getting these?
message: messageData, if (messageData === undefined) {
log.warn(`invalid pendingMessages ${pendingMessages}`);
return;
}
if (this.slavePrimaryMap[messageData.source]) {
// rewrite source, profile
const primaryPubKey = this.slavePrimaryMap[messageData.source];
log.info('Rewriting', messageData.source, 'to', primaryPubKey);
messageData.source = primaryPubKey;
messageData.message.profile.displayName = this.primaryUserProfileName[
primaryPubKey
];
}
this.serverAPI.chatAPI.emit('publicMessage', {
message: messageData,
});
}); });
}) }
pendingMessages = [] pendingMessages = [];
} }
} }
} }

Loading…
Cancel
Save