Browse Source

feat(core): Use batching when reindexing search index

On a test dataset of ~9k ProductVariants, the reindex was using an extra ~550MB. With this batching, the memory usage stays pretty flat around the idle level. The trade-off is that the operation takes around 20% longer, but during that time the server is still responsive (albeit with increased latency). Formerly, the server would not serve requests for the duration of the reindexing op.
Michael Bromley 6 years ago
parent
commit
40c594679f

+ 24 - 6
packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts

@@ -9,6 +9,7 @@ import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
 
 import { RequestContext } from '../../api/common/request-context';
 import { InternalServerError } from '../../common/error/errors';
+import { Logger } from '../../config/logger/vendure-logger';
 import { FacetValue, Product, ProductVariant } from '../../entity';
 import { EventBus } from '../../event-bus/event-bus';
 import { translateDeep } from '../../service/helpers/utils/translate-entity';
@@ -92,21 +93,38 @@ export class FulltextSearchService implements SearchService {
      */
     async reindex(ctx: RequestContext): Promise<DefaultSearchReindexResponse> {
         const timeStart = Date.now();
+        const BATCH_SIZE = 100;
+        Logger.verbose('Reindexing search index...');
         const qb = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
         FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
             relations: this.variantRelations,
         });
         FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
-        const variants = await qb.where('variants__product.deletedAt IS NULL').getMany();
+        const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
+        Logger.verbose(`Getting ${count} variants`);
+        const batches = Math.ceil(count / BATCH_SIZE);
 
-        await this.taskQueue.push(async () => {
-            await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
-            await this.saveSearchIndexItems(ctx, variants);
-        });
+        Logger.verbose('Deleting existing index items...');
+        await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
+        Logger.verbose('Deleted!');
+
+        for (let i = 0; i < batches; i++) {
+            Logger.verbose(`Processing batch ${i + 1} of ${batches}, heap used: `
+                + (process.memoryUsage().heapUsed / 1000 / 1000).toFixed(2) + 'MB');
+            const variants = await qb
+                .where('variants__product.deletedAt IS NULL')
+                .take(BATCH_SIZE)
+                .skip(i * BATCH_SIZE)
+                .getMany();
+            await this.taskQueue.push(async () => {
+                await this.saveSearchIndexItems(ctx, variants);
+            });
+        }
 
+        Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`);
         return {
             success: true,
-            indexedItemCount: variants.length,
+            indexedItemCount: count,
             timeTaken: Date.now() - timeStart,
         };
     }