You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			713 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			JavaScript
		
	
			
		
		
	
	
			713 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			JavaScript
		
	
| const path = require('path');
 | |
| const mkdirp = require('mkdirp');
 | |
| const rimraf = require('rimraf');
 | |
| const sql = require('@journeyapps/sqlcipher');
 | |
| const pify = require('pify');
 | |
| const uuidv4 = require('uuid/v4');
 | |
| const { map, isString } = require('lodash');
 | |
| 
 | |
| // To get long stack traces
 | |
| //   https://github.com/mapbox/node-sqlite3/wiki/API#sqlite3verbose
 | |
| sql.verbose();
 | |
| 
 | |
| module.exports = {
 | |
|   initialize,
 | |
|   close,
 | |
|   removeDB,
 | |
| 
 | |
|   saveMessage,
 | |
|   saveMessages,
 | |
|   removeMessage,
 | |
|   getUnreadByConversation,
 | |
|   getMessageBySender,
 | |
|   getMessageById,
 | |
|   getAllMessageIds,
 | |
|   getMessagesBySentAt,
 | |
|   getExpiredMessages,
 | |
|   getNextExpiringMessage,
 | |
|   getMessagesByConversation,
 | |
| 
 | |
|   getAllUnprocessed,
 | |
|   saveUnprocessed,
 | |
|   getUnprocessedById,
 | |
|   saveUnprocesseds,
 | |
|   removeUnprocessed,
 | |
|   removeAllUnprocessed,
 | |
| 
 | |
|   removeAll,
 | |
| 
 | |
|   getMessagesNeedingUpgrade,
 | |
|   getMessagesWithVisualMediaAttachments,
 | |
|   getMessagesWithFileAttachments,
 | |
| };
 | |
| 
 | |
| function generateUUID() {
 | |
|   return uuidv4();
 | |
| }
 | |
| 
 | |
| function objectToJSON(data) {
 | |
|   return JSON.stringify(data);
 | |
| }
 | |
| function jsonToObject(json) {
 | |
|   return JSON.parse(json);
 | |
| }
 | |
| 
 | |
| async function openDatabase(filePath) {
 | |
|   return new Promise((resolve, reject) => {
 | |
|     const instance = new sql.Database(filePath, error => {
 | |
|       if (error) {
 | |
|         return reject(error);
 | |
|       }
 | |
| 
 | |
|       return resolve(instance);
 | |
|     });
 | |
|   });
 | |
| }
 | |
| 
 | |
| function promisify(rawInstance) {
 | |
|   /* eslint-disable no-param-reassign */
 | |
|   rawInstance.close = pify(rawInstance.close.bind(rawInstance));
 | |
|   rawInstance.run = pify(rawInstance.run.bind(rawInstance));
 | |
|   rawInstance.get = pify(rawInstance.get.bind(rawInstance));
 | |
|   rawInstance.all = pify(rawInstance.all.bind(rawInstance));
 | |
|   rawInstance.each = pify(rawInstance.each.bind(rawInstance));
 | |
|   rawInstance.exec = pify(rawInstance.exec.bind(rawInstance));
 | |
|   rawInstance.prepare = pify(rawInstance.prepare.bind(rawInstance));
 | |
|   /* eslint-enable */
 | |
| 
 | |
|   return rawInstance;
 | |
| }
 | |
| 
 | |
| async function getSQLiteVersion(instance) {
 | |
|   const row = await instance.get('select sqlite_version() AS sqlite_version');
 | |
|   return row.sqlite_version;
 | |
| }
 | |
| 
 | |
| async function getSchemaVersion(instance) {
 | |
|   const row = await instance.get('PRAGMA schema_version;');
 | |
|   return row.schema_version;
 | |
| }
 | |
| 
 | |
| async function getSQLCipherVersion(instance) {
 | |
|   const row = await instance.get('PRAGMA cipher_version;');
 | |
|   try {
 | |
|     return row.cipher_version;
 | |
|   } catch (e) {
 | |
|     return null;
 | |
|   }
 | |
| }
 | |
| 
 | |
| const INVALID_KEY = /[^0-9A-Fa-f]/;
 | |
| async function setupSQLCipher(instance, { key }) {
 | |
|   const match = INVALID_KEY.exec(key);
 | |
|   if (match) {
 | |
|     throw new Error(`setupSQLCipher: key '${key}' is not valid`);
 | |
|   }
 | |
| 
 | |
|   // https://www.zetetic.net/sqlcipher/sqlcipher-api/#key
 | |
|   await instance.run(`PRAGMA key = "x'${key}'";`);
 | |
| }
 | |
| 
 | |
| async function updateToSchemaVersion1(currentVersion, instance) {
 | |
|   if (currentVersion >= 1) {
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   console.log('updateToSchemaVersion1: starting...');
 | |
| 
 | |
|   await instance.run('BEGIN TRANSACTION;');
 | |
| 
 | |
|   await instance.run(
 | |
|     `CREATE TABLE messages(
 | |
|         id STRING PRIMARY KEY ASC,
 | |
|         json TEXT,
 | |
| 
 | |
|         unread INTEGER,
 | |
|         expires_at INTEGER,
 | |
|         sent_at INTEGER,
 | |
|         schemaVersion INTEGER,
 | |
|         conversationId STRING,
 | |
|         received_at INTEGER,
 | |
|         source STRING,
 | |
|         sourceDevice STRING,
 | |
|         hasAttachments INTEGER,
 | |
|         hasFileAttachments INTEGER,
 | |
|         hasVisualMediaAttachments INTEGER
 | |
|       );`
 | |
|   );
 | |
| 
 | |
|   await instance.run(`CREATE INDEX messages_unread ON messages (
 | |
|       unread
 | |
|     );`);
 | |
|   await instance.run(`CREATE INDEX messages_expires_at ON messages (
 | |
|       expires_at
 | |
|     );`);
 | |
|   await instance.run(`CREATE INDEX messages_receipt ON messages (
 | |
|       sent_at
 | |
|     );`);
 | |
|   await instance.run(`CREATE INDEX messages_schemaVersion ON messages (
 | |
|       schemaVersion
 | |
|     );`);
 | |
| 
 | |
|   await instance.run(`CREATE INDEX messages_conversation ON messages (
 | |
|       conversationId,
 | |
|       received_at
 | |
|     );`);
 | |
| 
 | |
|   await instance.run(`CREATE INDEX messages_duplicate_check ON messages (
 | |
|       source,
 | |
|       sourceDevice,
 | |
|       sent_at
 | |
|     );`);
 | |
|   await instance.run(`CREATE INDEX messages_hasAttachments ON messages (
 | |
|       conversationId,
 | |
|       hasAttachments,
 | |
|       received_at
 | |
|     );`);
 | |
|   await instance.run(`CREATE INDEX messages_hasFileAttachments ON messages (
 | |
|       conversationId,
 | |
|       hasFileAttachments,
 | |
|       received_at
 | |
|     );`);
 | |
|   await instance.run(`CREATE INDEX messages_hasVisualMediaAttachments ON messages (
 | |
|       conversationId,
 | |
|       hasVisualMediaAttachments,
 | |
|       received_at
 | |
|     );`);
 | |
| 
 | |
|   await instance.run(`CREATE TABLE unprocessed(
 | |
|     id STRING,
 | |
|     timestamp INTEGER,
 | |
|     json TEXT
 | |
|   );`);
 | |
|   await instance.run(`CREATE INDEX unprocessed_id ON unprocessed (
 | |
|     id
 | |
|   );`);
 | |
|   await instance.run(`CREATE INDEX unprocessed_timestamp ON unprocessed (
 | |
|     timestamp
 | |
|   );`);
 | |
| 
 | |
|   await instance.run('PRAGMA schema_version = 1;');
 | |
|   await instance.run('COMMIT TRANSACTION;');
 | |
| 
 | |
|   console.log('updateToSchemaVersion1: success!');
 | |
| }
 | |
| 
 | |
| const SCHEMA_VERSIONS = [updateToSchemaVersion1];
 | |
| 
 | |
| async function updateSchema(instance) {
 | |
|   const sqliteVersion = await getSQLiteVersion(instance);
 | |
|   const schemaVersion = await getSchemaVersion(instance);
 | |
|   const cipherVersion = await getSQLCipherVersion(instance);
 | |
|   console.log(
 | |
|     'updateSchema:',
 | |
|     `Current schema version: ${schemaVersion};`,
 | |
|     `Most recent schema version: ${SCHEMA_VERSIONS.length};`,
 | |
|     `SQLite version: ${sqliteVersion};`,
 | |
|     `SQLCipher version: ${cipherVersion};`
 | |
|   );
 | |
| 
 | |
|   for (let index = 0, max = SCHEMA_VERSIONS.length; index < max; index += 1) {
 | |
|     const runSchemaUpdate = SCHEMA_VERSIONS[index];
 | |
| 
 | |
|     // Yes, we really want to do this asynchronously, in order
 | |
|     // eslint-disable-next-line no-await-in-loop
 | |
|     await runSchemaUpdate(schemaVersion, instance);
 | |
|   }
 | |
| }
 | |
| 
 | |
| let db;
 | |
| let filePath;
 | |
| 
 | |
| async function initialize({ configDir, key }) {
 | |
|   if (db) {
 | |
|     throw new Error('Cannot initialize more than once!');
 | |
|   }
 | |
| 
 | |
|   if (!isString(configDir)) {
 | |
|     throw new Error('initialize: configDir is required!');
 | |
|   }
 | |
|   if (!isString(key)) {
 | |
|     throw new Error('initialize: key` is required!');
 | |
|   }
 | |
| 
 | |
|   const dbDir = path.join(configDir, 'sql');
 | |
|   mkdirp.sync(dbDir);
 | |
| 
 | |
|   filePath = path.join(dbDir, 'db.sqlite');
 | |
|   const sqlInstance = await openDatabase(filePath);
 | |
|   const promisified = promisify(sqlInstance);
 | |
| 
 | |
|   // promisified.on('trace', statement => console._log(statement));
 | |
| 
 | |
|   await setupSQLCipher(promisified, { key });
 | |
|   await updateSchema(promisified);
 | |
| 
 | |
|   db = promisified;
 | |
| }
 | |
| 
 | |
| async function close() {
 | |
|   const dbRef = db;
 | |
|   db = null;
 | |
|   await dbRef.close();
 | |
| }
 | |
| 
 | |
| async function removeDB() {
 | |
|   if (db) {
 | |
|     throw new Error('removeDB: Cannot erase database when it is open!');
 | |
|   }
 | |
| 
 | |
|   rimraf.sync(filePath);
 | |
| }
 | |
| 
 | |
| async function saveMessage(data, { forceSave } = {}) {
 | |
|   const {
 | |
|     conversationId,
 | |
|     // eslint-disable-next-line camelcase
 | |
|     expires_at,
 | |
|     hasAttachments,
 | |
|     hasFileAttachments,
 | |
|     hasVisualMediaAttachments,
 | |
|     id,
 | |
|     // eslint-disable-next-line camelcase
 | |
|     received_at,
 | |
|     schemaVersion,
 | |
|     // eslint-disable-next-line camelcase
 | |
|     sent_at,
 | |
|     source,
 | |
|     sourceDevice,
 | |
|     unread,
 | |
|   } = data;
 | |
| 
 | |
|   if (id && !forceSave) {
 | |
|     await db.run(
 | |
|       `UPDATE messages SET
 | |
|         json = $json,
 | |
|         conversationId = $conversationId,
 | |
|         expires_at = $expires_at,
 | |
|         hasAttachments = $hasAttachments,
 | |
|         hasFileAttachments = $hasFileAttachments,
 | |
|         hasVisualMediaAttachments = $hasVisualMediaAttachments,
 | |
|         id = $id,
 | |
|         received_at = $received_at,
 | |
|         schemaVersion = $schemaVersion,
 | |
|         sent_at = $sent_at,
 | |
|         source = $source,
 | |
|         sourceDevice = $sourceDevice,
 | |
|         unread = $unread
 | |
|       WHERE id = $id;`,
 | |
|       {
 | |
|         $id: id,
 | |
|         $json: objectToJSON(data),
 | |
| 
 | |
|         $conversationId: conversationId,
 | |
|         $expires_at: expires_at,
 | |
|         $hasAttachments: hasAttachments,
 | |
|         $hasFileAttachments: hasFileAttachments,
 | |
|         $hasVisualMediaAttachments: hasVisualMediaAttachments,
 | |
|         $received_at: received_at,
 | |
|         $schemaVersion: schemaVersion,
 | |
|         $sent_at: sent_at,
 | |
|         $source: source,
 | |
|         $sourceDevice: sourceDevice,
 | |
|         $unread: unread,
 | |
|       }
 | |
|     );
 | |
| 
 | |
|     return id;
 | |
|   }
 | |
| 
 | |
|   const toCreate = {
 | |
|     ...data,
 | |
|     id: id || generateUUID(),
 | |
|   };
 | |
| 
 | |
|   await db.run(
 | |
|     `INSERT INTO messages (
 | |
|     id,
 | |
|     json,
 | |
| 
 | |
|     conversationId,
 | |
|     expires_at,
 | |
|     hasAttachments,
 | |
|     hasFileAttachments,
 | |
|     hasVisualMediaAttachments,
 | |
|     received_at,
 | |
|     schemaVersion,
 | |
|     sent_at,
 | |
|     source,
 | |
|     sourceDevice,
 | |
|     unread
 | |
|   ) values (
 | |
|     $id,
 | |
|     $json,
 | |
| 
 | |
|     $conversationId,
 | |
|     $expires_at,
 | |
|     $hasAttachments,
 | |
|     $hasFileAttachments,
 | |
|     $hasVisualMediaAttachments,
 | |
|     $received_at,
 | |
|     $schemaVersion,
 | |
|     $sent_at,
 | |
|     $source,
 | |
|     $sourceDevice,
 | |
|     $unread
 | |
|   );`,
 | |
|     {
 | |
|       $id: toCreate.id,
 | |
|       $json: objectToJSON(toCreate),
 | |
| 
 | |
|       $conversationId: conversationId,
 | |
|       $expires_at: expires_at,
 | |
|       $hasAttachments: hasAttachments,
 | |
|       $hasFileAttachments: hasFileAttachments,
 | |
|       $hasVisualMediaAttachments: hasVisualMediaAttachments,
 | |
|       $received_at: received_at,
 | |
|       $schemaVersion: schemaVersion,
 | |
|       $sent_at: sent_at,
 | |
|       $source: source,
 | |
|       $sourceDevice: sourceDevice,
 | |
|       $unread: unread,
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   return toCreate.id;
 | |
| }
 | |
| 
 | |
| async function saveMessages(arrayOfMessages, { forceSave } = {}) {
 | |
|   let promise;
 | |
| 
 | |
|   db.serialize(() => {
 | |
|     promise = Promise.all([
 | |
|       db.run('BEGIN TRANSACTION;'),
 | |
|       ...map(arrayOfMessages, message => saveMessage(message, { forceSave })),
 | |
|       db.run('COMMIT TRANSACTION;'),
 | |
|     ]);
 | |
|   });
 | |
| 
 | |
|   await promise;
 | |
| }
 | |
| 
 | |
| async function removeMessage(id) {
 | |
|   if (!Array.isArray(id)) {
 | |
|     await db.run('DELETE FROM messages WHERE id = $id;', { $id: id });
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (!id.length) {
 | |
|     throw new Error('removeMessages: No ids to delete!');
 | |
|   }
 | |
| 
 | |
|   // Our node interface doesn't seem to allow you to replace one single ? with an array
 | |
|   await db.run(
 | |
|     `DELETE FROM messages WHERE id IN ( ${id.map(() => '?').join(', ')} );`,
 | |
|     id
 | |
|   );
 | |
| }
 | |
| 
 | |
| async function getMessageById(id) {
 | |
|   const row = await db.get('SELECT * FROM messages WHERE id = $id;', {
 | |
|     $id: id,
 | |
|   });
 | |
| 
 | |
|   if (!row) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return jsonToObject(row.json);
 | |
| }
 | |
| 
 | |
| async function getAllMessageIds() {
 | |
|   const rows = await db.all('SELECT id FROM messages ORDER BY id ASC;');
 | |
|   return map(rows, row => row.id);
 | |
| }
 | |
| 
 | |
| // eslint-disable-next-line camelcase
 | |
| async function getMessageBySender({ source, sourceDevice, sent_at }) {
 | |
|   const rows = db.all(
 | |
|     `SELECT json FROM messages WHERE
 | |
|       source = $source AND
 | |
|       sourceDevice = $sourceDevice AND
 | |
|       sent_at = $sent_at;`,
 | |
|     {
 | |
|       $source: source,
 | |
|       $sourceDevice: sourceDevice,
 | |
|       $sent_at: sent_at,
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 | |
| 
 | |
| async function getUnreadByConversation(conversationId) {
 | |
|   const rows = await db.all(
 | |
|     `SELECT json FROM messages WHERE
 | |
|       conversationId = $conversationId AND
 | |
|       unread = $unread
 | |
|      ORDER BY received_at DESC;`,
 | |
|     {
 | |
|       $conversationId: conversationId,
 | |
|       $unread: 1,
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   if (!rows) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 | |
| 
 | |
| async function getMessagesByConversation(
 | |
|   conversationId,
 | |
|   { limit = 100, receivedAt = Number.MAX_VALUE } = {}
 | |
| ) {
 | |
|   const rows = await db.all(
 | |
|     `SELECT json FROM messages WHERE
 | |
|        conversationId = $conversationId AND
 | |
|        received_at < $received_at
 | |
|      ORDER BY received_at DESC
 | |
|      LIMIT $limit;`,
 | |
|     {
 | |
|       $conversationId: conversationId,
 | |
|       $received_at: receivedAt,
 | |
|       $limit: limit,
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   if (!rows) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 | |
| 
 | |
| async function getMessagesBySentAt(sentAt) {
 | |
|   const rows = await db.all(
 | |
|     `SELECT * FROM messages
 | |
|      WHERE sent_at = $sent_at
 | |
|      ORDER BY received_at DESC;`,
 | |
|     {
 | |
|       $sent_at: sentAt,
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   if (!rows) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 | |
| 
 | |
| async function getExpiredMessages() {
 | |
|   const now = Date.now();
 | |
| 
 | |
|   const rows = await db.all(
 | |
|     `SELECT json FROM messages WHERE
 | |
|       expires_at IS NOT NULL AND
 | |
|       expires_at <= $expires_at
 | |
|      ORDER BY expires_at ASC;`,
 | |
|     {
 | |
|       $expires_at: now,
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   if (!rows) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 | |
| 
 | |
| async function getNextExpiringMessage() {
 | |
|   const rows = await db.all(`
 | |
|     SELECT json FROM messages
 | |
|     WHERE expires_at IS NOT NULL
 | |
|     ORDER BY expires_at ASC
 | |
|     LIMIT 1;
 | |
|   `);
 | |
| 
 | |
|   if (!rows) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 | |
| 
 | |
| async function saveUnprocessed(data, { forceSave } = {}) {
 | |
|   const { id, timestamp } = data;
 | |
| 
 | |
|   if (forceSave) {
 | |
|     await db.run(
 | |
|       `INSERT INTO unprocessed (
 | |
|         id,
 | |
|         timestamp,
 | |
|         json
 | |
|       ) values (
 | |
|         $id,
 | |
|         $timestamp,
 | |
|         $json
 | |
|       );`,
 | |
|       {
 | |
|         $id: id,
 | |
|         $timestamp: timestamp,
 | |
|         $json: objectToJSON(data),
 | |
|       }
 | |
|     );
 | |
| 
 | |
|     return id;
 | |
|   }
 | |
| 
 | |
|   await db.run(
 | |
|     `UPDATE unprocessed SET
 | |
|       json = $json,
 | |
|       timestamp = $timestamp
 | |
|     WHERE id = $id;`,
 | |
|     {
 | |
|       $id: id,
 | |
|       $timestamp: timestamp,
 | |
|       $json: objectToJSON(data),
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   return id;
 | |
| }
 | |
| 
 | |
| async function saveUnprocesseds(arrayOfUnprocessed, { forceSave } = {}) {
 | |
|   let promise;
 | |
| 
 | |
|   db.serialize(() => {
 | |
|     promise = Promise.all([
 | |
|       db.run('BEGIN TRANSACTION;'),
 | |
|       ...map(arrayOfUnprocessed, unprocessed =>
 | |
|         saveUnprocessed(unprocessed, { forceSave })
 | |
|       ),
 | |
|       db.run('COMMIT TRANSACTION;'),
 | |
|     ]);
 | |
|   });
 | |
| 
 | |
|   await promise;
 | |
| }
 | |
| 
 | |
| async function getUnprocessedById(id) {
 | |
|   const row = await db.get('SELECT json FROM unprocessed WHERE id = $id;', {
 | |
|     $id: id,
 | |
|   });
 | |
| 
 | |
|   if (!row) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return jsonToObject(row.json);
 | |
| }
 | |
| 
 | |
| async function getAllUnprocessed() {
 | |
|   const rows = await db.all(
 | |
|     'SELECT json FROM unprocessed ORDER BY timestamp ASC;'
 | |
|   );
 | |
| 
 | |
|   if (!rows) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 | |
| 
 | |
| async function removeUnprocessed(id) {
 | |
|   if (!Array.isArray(id)) {
 | |
|     await db.run('DELETE FROM unprocessed WHERE id = $id;', { $id: id });
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (!id.length) {
 | |
|     throw new Error('removeUnprocessed: No ids to delete!');
 | |
|   }
 | |
| 
 | |
|   // Our node interface doesn't seem to allow you to replace one single ? with an array
 | |
|   await db.run(
 | |
|     `DELETE FROM unprocessed WHERE id IN ( ${id.map(() => '?').join(', ')} );`,
 | |
|     id
 | |
|   );
 | |
| }
 | |
| 
 | |
| async function removeAllUnprocessed() {
 | |
|   await db.run('DELETE FROM unprocessed;');
 | |
| }
 | |
| 
 | |
| async function removeAll() {
 | |
|   let promise;
 | |
| 
 | |
|   db.serialize(() => {
 | |
|     promise = Promise.all([
 | |
|       db.run('BEGIN TRANSACTION;'),
 | |
|       db.run('DELETE FROM messages;'),
 | |
|       db.run('DELETE FROM unprocessed;'),
 | |
|       db.run('COMMIT TRANSACTION;'),
 | |
|     ]);
 | |
|   });
 | |
| 
 | |
|   await promise;
 | |
| }
 | |
| 
 | |
| async function getMessagesNeedingUpgrade(limit, { maxVersion }) {
 | |
|   const rows = await db.all(
 | |
|     `SELECT json FROM messages
 | |
|      WHERE schemaVersion IS NOT $maxVersion
 | |
|      LIMIT $limit;`,
 | |
|     {
 | |
|       $maxVersion: maxVersion,
 | |
|       $limit: limit,
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   if (!rows) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 | |
| 
 | |
| async function getMessagesWithVisualMediaAttachments(
 | |
|   conversationId,
 | |
|   { limit }
 | |
| ) {
 | |
|   const rows = await db.all(
 | |
|     `SELECT json FROM messages WHERE
 | |
|       conversationId = $conversationId AND
 | |
|       hasVisualMediaAttachments = 1
 | |
|      ORDER BY received_at DESC
 | |
|      LIMIT $limit;`,
 | |
|     {
 | |
|       $conversationId: conversationId,
 | |
|       $limit: limit,
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   if (!rows) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 | |
| 
 | |
| async function getMessagesWithFileAttachments(conversationId, { limit }) {
 | |
|   const rows = await db.all(
 | |
|     `SELECT json FROM messages WHERE
 | |
|       conversationId = $conversationId AND
 | |
|       hasFileAttachments = 1
 | |
|      ORDER BY received_at DESC
 | |
|      LIMIT $limit;`,
 | |
|     {
 | |
|       $conversationId: conversationId,
 | |
|       $limit: limit,
 | |
|     }
 | |
|   );
 | |
| 
 | |
|   if (!rows) {
 | |
|     return null;
 | |
|   }
 | |
| 
 | |
|   return map(rows, row => jsonToObject(row.json));
 | |
| }
 |