|
|
@@ -258,6 +258,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
}
|
|
|
|
|
|
const totalProductIds = await this.connection
|
|
|
+ .rawConnection
|
|
|
.getRepository(Product)
|
|
|
.createQueryBuilder('product')
|
|
|
.where('product.deletedAt IS NULL')
|
|
|
@@ -269,7 +270,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
let skip = 0;
|
|
|
let finishedProductsCount = 0;
|
|
|
do {
|
|
|
- const operations: BulkVariantOperation[] = [];
|
|
|
+ let operations: BulkVariantOperation[] = [];
|
|
|
|
|
|
productIds = await this.connection
|
|
|
.getRepository(Product)
|
|
|
@@ -277,11 +278,21 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
.select('product.id')
|
|
|
.where('product.deletedAt IS NULL')
|
|
|
.skip(skip)
|
|
|
- .take(REINDEX_CHUNK_SIZE)
|
|
|
+ .take(this.options.reindexProductsChunkSize)
|
|
|
.getMany();
|
|
|
|
|
|
for (const { id: productId } of productIds) {
|
|
|
operations.push(...(await this.updateProductsOperationsOnly(ctx, productId)));
|
|
|
+ if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
|
|
|
+ // Because we can have a huge amount of variant for 1 product, we also chunk update operations
|
|
|
+ await this.executeBulkOperationsByChunks(
|
|
|
+ this.options.reindexBulkOperationSizeLimit,
|
|
|
+ operations,
|
|
|
+ variantIndexNameForReindex,
|
|
|
+ );
|
|
|
+ // Reset operations to avoid memory peaks with huge amount of operations
|
|
|
+ operations = [];
|
|
|
+ }
|
|
|
finishedProductsCount++;
|
|
|
observer.next({
|
|
|
total: totalProductIds,
|
|
|
@@ -294,15 +305,15 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
|
|
|
// Because we can have a huge amount of variant for 1 product, we also chunk update operations
|
|
|
await this.executeBulkOperationsByChunks(
|
|
|
- REINDEX_OPERATION_CHUNK_SIZE,
|
|
|
+ this.options.reindexBulkOperationSizeLimit,
|
|
|
operations,
|
|
|
variantIndexNameForReindex,
|
|
|
);
|
|
|
|
|
|
- skip += REINDEX_CHUNK_SIZE;
|
|
|
+ skip += this.options.reindexProductsChunkSize;
|
|
|
|
|
|
Logger.verbose(`Done ${finishedProductsCount} / ${totalProductIds} products`);
|
|
|
- } while (productIds.length >= REINDEX_CHUNK_SIZE);
|
|
|
+ } while (productIds.length >= this.options.reindexProductsChunkSize);
|
|
|
|
|
|
// Switch the index to the new reindexed one
|
|
|
try {
|