Rename `count` to `numMessagesPerBatch`

pull/1/head
Daniel Gasienica 7 years ago
parent 38ac9972e8
commit 12cdeee7ec

@ -90,12 +90,13 @@
storage.fetch(); storage.fetch();
const idleDetector = new IdleDetector(); const idleDetector = new IdleDetector();
idleDetector.on('idle', async () => { idleDetector.on('idle', async () => {
const NUM_MESSAGES_PER_BATCH = 1;
const database = Migrations0DatabaseWithAttachmentData.getDatabase(); const database = Migrations0DatabaseWithAttachmentData.getDatabase();
const batch = await MessageDataMigrator.processNextBatchWithoutIndex({ const batch = await MessageDataMigrator.processNextBatchWithoutIndex({
databaseName: database.name, databaseName: database.name,
minDatabaseVersion: database.version, minDatabaseVersion: database.version,
numMessagesPerBatch: NUM_MESSAGES_PER_BATCH,
upgradeMessageSchema, upgradeMessageSchema,
}); });
console.log('Upgrade message schema:', batch); console.log('Upgrade message schema:', batch);

@ -21,12 +21,11 @@ const { deferredToPromise } = require('./deferred_to_promise');
const MESSAGES_STORE_NAME = 'messages'; const MESSAGES_STORE_NAME = 'messages';
const NUM_MESSAGES_PER_BATCH = 1;
exports.processNext = async ({ exports.processNext = async ({
BackboneMessage, BackboneMessage,
BackboneMessageCollection, BackboneMessageCollection,
count, numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
} = {}) => { } = {}) => {
if (!isFunction(BackboneMessage)) { if (!isFunction(BackboneMessage)) {
@ -38,8 +37,8 @@ exports.processNext = async ({
' constructor is required'); ' constructor is required');
} }
if (!isNumber(count)) { if (!isNumber(numMessagesPerBatch)) {
throw new TypeError('"count" is required'); throw new TypeError('"numMessagesPerBatch" is required');
} }
if (!isFunction(upgradeMessageSchema)) { if (!isFunction(upgradeMessageSchema)) {
@ -50,7 +49,10 @@ exports.processNext = async ({
const fetchStartTime = Date.now(); const fetchStartTime = Date.now();
const messagesRequiringSchemaUpgrade = const messagesRequiringSchemaUpgrade =
await _fetchMessagesRequiringSchemaUpgrade({ BackboneMessageCollection, count }); await _fetchMessagesRequiringSchemaUpgrade({
BackboneMessageCollection,
count: numMessagesPerBatch,
});
const fetchDuration = Date.now() - fetchStartTime; const fetchDuration = Date.now() - fetchStartTime;
const upgradeStartTime = Date.now(); const upgradeStartTime = Date.now();
@ -65,7 +67,7 @@ exports.processNext = async ({
const totalDuration = Date.now() - startTime; const totalDuration = Date.now() - startTime;
const numProcessed = messagesRequiringSchemaUpgrade.length; const numProcessed = messagesRequiringSchemaUpgrade.length;
const done = numProcessed < count; const done = numProcessed < numMessagesPerBatch;
return { return {
done, done,
numProcessed, numProcessed,
@ -79,6 +81,7 @@ exports.processNext = async ({
exports.dangerouslyProcessAllWithoutIndex = async ({ exports.dangerouslyProcessAllWithoutIndex = async ({
databaseName, databaseName,
minDatabaseVersion, minDatabaseVersion,
numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
} = {}) => { } = {}) => {
if (!isString(databaseName)) { if (!isString(databaseName)) {
@ -89,6 +92,10 @@ exports.dangerouslyProcessAllWithoutIndex = async ({
throw new TypeError('"minDatabaseVersion" must be a number'); throw new TypeError('"minDatabaseVersion" must be a number');
} }
if (!isNumber(numMessagesPerBatch)) {
throw new TypeError('"numMessagesPerBatch" must be a number');
}
if (!isFunction(upgradeMessageSchema)) { if (!isFunction(upgradeMessageSchema)) {
throw new TypeError('"upgradeMessageSchema" is required'); throw new TypeError('"upgradeMessageSchema" is required');
} }
@ -116,7 +123,11 @@ exports.dangerouslyProcessAllWithoutIndex = async ({
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
const status = await _processBatch({ connection, upgradeMessageSchema }); const status = await _processBatch({
connection,
numMessagesPerBatch,
upgradeMessageSchema,
});
if (status.done) { if (status.done) {
break; break;
} }
@ -140,6 +151,7 @@ exports.dangerouslyProcessAllWithoutIndex = async ({
exports.processNextBatchWithoutIndex = async ({ exports.processNextBatchWithoutIndex = async ({
databaseName, databaseName,
minDatabaseVersion, minDatabaseVersion,
numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
} = {}) => { } = {}) => {
if (!isFunction(upgradeMessageSchema)) { if (!isFunction(upgradeMessageSchema)) {
@ -147,7 +159,11 @@ exports.processNextBatchWithoutIndex = async ({
} }
const connection = await _getConnection({ databaseName, minDatabaseVersion }); const connection = await _getConnection({ databaseName, minDatabaseVersion });
const batch = await _processBatch({ connection, upgradeMessageSchema }); const batch = await _processBatch({
connection,
numMessagesPerBatch,
upgradeMessageSchema,
});
return batch; return batch;
}; };
@ -172,7 +188,11 @@ const _getConnection = async ({ databaseName, minDatabaseVersion }) => {
return connection; return connection;
}; };
const _processBatch = async ({ connection, upgradeMessageSchema } = {}) => { const _processBatch = async ({
connection,
numMessagesPerBatch,
upgradeMessageSchema,
} = {}) => {
if (!isObject(connection)) { if (!isObject(connection)) {
throw new TypeError('"connection" must be a string'); throw new TypeError('"connection" must be a string');
} }
@ -181,6 +201,10 @@ const _processBatch = async ({ connection, upgradeMessageSchema } = {}) => {
throw new TypeError('"upgradeMessageSchema" is required'); throw new TypeError('"upgradeMessageSchema" is required');
} }
if (!isNumber(numMessagesPerBatch)) {
throw new TypeError('"numMessagesPerBatch" is required');
}
const isAttachmentMigrationComplete = const isAttachmentMigrationComplete =
await settings.isAttachmentMigrationComplete(connection); await settings.isAttachmentMigrationComplete(connection);
if (isAttachmentMigrationComplete) { if (isAttachmentMigrationComplete) {
@ -192,12 +216,11 @@ const _processBatch = async ({ connection, upgradeMessageSchema } = {}) => {
const lastProcessedIndex = const lastProcessedIndex =
await settings.getAttachmentMigrationLastProcessedIndex(connection); await settings.getAttachmentMigrationLastProcessedIndex(connection);
const count = NUM_MESSAGES_PER_BATCH;
const fetchUnprocessedMessagesStartTime = Date.now(); const fetchUnprocessedMessagesStartTime = Date.now();
const unprocessedMessages = const unprocessedMessages =
await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({ await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({
connection, connection,
count, count: numMessagesPerBatch,
lastIndex: lastProcessedIndex, lastIndex: lastProcessedIndex,
}); });
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime; const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
@ -215,7 +238,7 @@ const _processBatch = async ({ connection, upgradeMessageSchema } = {}) => {
const saveDuration = Date.now() - saveMessagesStartTime; const saveDuration = Date.now() - saveMessagesStartTime;
const numMessagesProcessed = upgradedMessages.length; const numMessagesProcessed = upgradedMessages.length;
const done = numMessagesProcessed < count; const done = numMessagesProcessed < numMessagesPerBatch;
const lastMessage = last(upgradedMessages); const lastMessage = last(upgradedMessages);
const newLastProcessedIndex = lastMessage ? lastMessage.id : null; const newLastProcessedIndex = lastMessage ? lastMessage.id : null;
if (!done) { if (!done) {

Loading…
Cancel
Save