فهرست منبع

fix(elasticsearch-plugin): Fix high memory usage on reindex

Fixes  #1120
Thierry Tournie 4 سال پیش
والد
کامیت
bce86f6259
1فایلهای تغییر یافته به همراه222 افزوده شده و 177 حذف شده
  1. 222 177
      packages/elasticsearch-plugin/src/indexing/indexer.controller.ts

+ 222 - 177
packages/elasticsearch-plugin/src/indexing/indexer.controller.ts

@@ -45,6 +45,9 @@ import {
 import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils';
 import { MutableRequestContext } from './mutable-request-context';
 
+const REINDEX_CHUNK_SIZE = 2500;
+const REINDEX_OPERATION_CHUNK_SIZE = 3000;
+
 export const defaultProductRelations: Array<EntityRelationPaths<Product>> = [
     'variants',
     'featuredAsset',
@@ -232,12 +235,12 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         return asyncObservable(async observer => {
             return this.asyncQueue.push(async () => {
                 const timeStart = Date.now();
-                const operations: BulkVariantOperation[] = [];
                 const ctx = MutableRequestContext.deserialize(rawContext);
 
                 const reindexTempName = new Date().getTime();
                 const variantIndexName = this.options.indexPrefix + VARIANT_INDEX_NAME;
-                const reindexVariantAliasName = variantIndexName + `-reindex-${reindexTempName}`;
+                const variantIndexNameForReindex = VARIANT_INDEX_NAME + `-reindex-${reindexTempName}`;
+                const reindexVariantAliasName = this.options.indexPrefix + variantIndexNameForReindex;
                 try {
                     await createIndices(
                         this.client,
@@ -247,79 +250,123 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         true,
                         `-reindex-${reindexTempName}`,
                     );
+                } catch (e) {
+                    Logger.error(`Could not recreate indices.`, loggerCtx);
+                    Logger.error(JSON.stringify(e), loggerCtx);
+                    throw e;
+                }
 
-                    const reindexVariantIndexName = await getIndexNameByAlias(
-                        this.client,
-                        reindexVariantAliasName,
+                const totalProductIds = await this.connection
+                    .getRepository(Product)
+                    .createQueryBuilder('product')
+                    .where('product.deletedAt IS NULL')
+                    .getCount();
+
+                Logger.verbose(`Will reindex ${totalProductIds} products`, loggerCtx);
+
+                let productIds = [];
+                let skip = 0;
+                let finishedProductsCount = 0;
+                do {
+                    const operations: BulkVariantOperation[] = [];
+
+                    productIds = await this.connection
+                        .getRepository(Product)
+                        .createQueryBuilder('product')
+                        .select('product.id')
+                        .where('product.deletedAt IS NULL')
+                        .skip(skip)
+                        .take(REINDEX_CHUNK_SIZE)
+                        .getMany();
+
+                    for (const { id: productId } of productIds) {
+                        operations.push(...(await this.updateProductsOperationsOnly(ctx, productId)));
+                        finishedProductsCount++;
+                        observer.next({
+                            total: totalProductIds,
+                            completed: Math.min(finishedProductsCount, totalProductIds),
+                            duration: +new Date() - timeStart,
+                        });
+                    }
+
+                    Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx);
+
+                    // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+                    await this.executeBulkOperationsByChunks(
+                        REINDEX_OPERATION_CHUNK_SIZE,
+                        operations,
+                        variantIndexNameForReindex,
                     );
-                    const originalVariantAliasExist = await this.client.indices.existsAlias({
-                        name: variantIndexName,
-                    });
-                    const originalVariantIndexExist = await this.client.indices.exists({
-                        index: variantIndexName,
+
+                    skip += REINDEX_CHUNK_SIZE;
+
+                    Logger.verbose(`Done ${finishedProductsCount} / ${totalProductIds} products`);
+                } while (productIds.length >= REINDEX_CHUNK_SIZE);
+
+                // Switch the index to the new reindexed one
+                try {
+                    const reindexVariantAliasExist = await this.client.indices.existsAlias({
+                        name: reindexVariantAliasName,
                     });
+                    if (reindexVariantAliasExist) {
+                        const reindexVariantIndexName = await getIndexNameByAlias(
+                            this.client,
+                            reindexVariantAliasName,
+                        );
+                        const originalVariantAliasExist = await this.client.indices.existsAlias({
+                            name: variantIndexName,
+                        });
+                        const originalVariantIndexExist = await this.client.indices.exists({
+                            index: variantIndexName,
+                        });
 
-                    const originalVariantIndexName = await getIndexNameByAlias(this.client, variantIndexName);
+                        const originalVariantIndexName = await getIndexNameByAlias(
+                            this.client,
+                            variantIndexName,
+                        );
 
-                    if (originalVariantAliasExist.body || originalVariantIndexExist.body) {
-                        await this.client.reindex({
-                            refresh: true,
-                            body: {
-                                source: {
-                                    index: variantIndexName,
+                        const actions = [
+                            {
+                                remove: {
+                                    index: reindexVariantIndexName,
+                                    alias: reindexVariantAliasName,
                                 },
-                                dest: {
-                                    index: reindexVariantAliasName,
+                            },
+                            {
+                                add: {
+                                    index: reindexVariantIndexName,
+                                    alias: variantIndexName,
                                 },
                             },
-                        });
-                    }
+                        ];
 
-                    const actions = [
-                        {
-                            remove: {
-                                index: reindexVariantIndexName,
-                                alias: reindexVariantAliasName,
-                            },
-                        },
-                        {
-                            add: {
-                                index: reindexVariantIndexName,
-                                alias: variantIndexName,
-                            },
-                        },
-                    ];
+                        if (originalVariantAliasExist.body) {
+                            actions.push({
+                                remove: {
+                                    index: originalVariantIndexName,
+                                    alias: variantIndexName,
+                                },
+                            });
+                        } else if (originalVariantIndexExist.body) {
+                            await this.client.indices.delete({
+                                index: [variantIndexName],
+                            });
+                        }
 
-                    if (originalVariantAliasExist.body) {
-                        actions.push({
-                            remove: {
-                                index: originalVariantIndexName,
-                                alias: variantIndexName,
+                        await this.client.indices.updateAliases({
+                            body: {
+                                actions,
                             },
                         });
-                    } else if (originalVariantIndexExist.body) {
-                        await this.client.indices.delete({
-                            index: [variantIndexName],
-                        });
-                    }
 
-                    await this.client.indices.updateAliases({
-                        body: {
-                            actions,
-                        },
-                    });
-
-                    if (originalVariantAliasExist.body) {
-                        await this.client.indices.delete({
-                            index: [originalVariantIndexName],
-                        });
+                        if (originalVariantAliasExist.body) {
+                            await this.client.indices.delete({
+                                index: [originalVariantIndexName],
+                            });
+                        }
                     }
                 } catch (e) {
-                    Logger.warn(
-                        `Could not recreate indices. Reindexing continue with existing indices.`,
-                        loggerCtx,
-                    );
-                    Logger.warn(JSON.stringify(e), loggerCtx);
+                    Logger.error('Could not switch indexes');
                 } finally {
                     const reindexVariantAliasExist = await this.client.indices.existsAlias({
                         name: reindexVariantAliasName,
@@ -335,48 +382,37 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                     }
                 }
 
-                const deletedProductIds = await this.connection
-                    .getRepository(Product)
-                    .createQueryBuilder('product')
-                    .select('product.id')
-                    .where('product.deletedAt IS NOT NULL')
-                    .getMany();
-
-                for (const { id: deletedProductId } of deletedProductIds) {
-                    operations.push(...(await this.deleteProductOperations(ctx, deletedProductId)));
-                }
-
-                const productIds = await this.connection
-                    .getRepository(Product)
-                    .createQueryBuilder('product')
-                    .select('product.id')
-                    .where('product.deletedAt IS NULL')
-                    .getMany();
-
-                Logger.verbose(`Reindexing ${productIds.length} Products`, loggerCtx);
-
-                let finishedProductsCount = 0;
-                for (const { id: productId } of productIds) {
-                    operations.push(...(await this.updateProductsOperations(ctx, [productId])));
-                    finishedProductsCount++;
-                    observer.next({
-                        total: productIds.length,
-                        completed: Math.min(finishedProductsCount, productIds.length),
-                        duration: +new Date() - timeStart,
-                    });
-                }
-                Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx);
-                await this.executeBulkOperations(operations);
                 Logger.verbose(`Completed reindexing!`, loggerCtx);
+
                 return {
-                    total: productIds.length,
-                    completed: productIds.length,
+                    total: totalProductIds,
+                    completed: totalProductIds,
                     duration: +new Date() - timeStart,
                 };
             });
         });
     }
 
+    async executeBulkOperationsByChunks(
+        chunkSize: number,
+        operations: BulkVariantOperation[],
+        index = VARIANT_INDEX_NAME,
+    ): Promise<void> {
+        let i;
+        let j;
+        let processedOperation = 0;
+        for (i = 0, j = operations.length; i < j; i += chunkSize) {
+            const operationsChunks = operations.slice(i, i + chunkSize);
+            await this.executeBulkOperations(operationsChunks, index);
+            processedOperation += operationsChunks.length;
+
+            Logger.verbose(
+                `Executing operation chunks ${processedOperation}/${operations.length}`,
+                loggerCtx,
+            );
+        }
+    }
+
     async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
         const result = await this.updateAssetFocalPointForIndex(VARIANT_INDEX_NAME, data.asset);
         await this.client.indices.refresh({
@@ -461,101 +497,68 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         await this.executeBulkOperations(operations);
     }
 
-    private async updateProductsOperations(
+    private async updateProductsOperationsOnly(
         ctx: MutableRequestContext,
-        productIds: ID[],
+        productId: ID,
     ): Promise<BulkVariantOperation[]> {
-        Logger.debug(`Updating ${productIds.length} Products`, loggerCtx);
         const operations: BulkVariantOperation[] = [];
-
-        for (const productId of productIds) {
-            operations.push(...(await this.deleteProductOperations(ctx, productId)));
-
-            let product: Product | undefined;
-            try {
-                product = await this.connection.getRepository(Product).findOne(productId, {
-                    relations: this.productRelations,
+        let product: Product | undefined;
+        try {
+            product = await this.connection.getRepository(Product).findOne(productId, {
+                relations: this.productRelations,
+                where: {
+                    deletedAt: null,
+                },
+            });
+        } catch (e) {
+            Logger.error(e.message, loggerCtx, e.stack);
+            throw e;
+        }
+        if (product) {
+            const updatedProductVariants = await this.connection.getRepository(ProductVariant).findByIds(
+                product.variants.map(v => v.id),
+                {
+                    relations: this.variantRelations,
                     where: {
                         deletedAt: null,
                     },
-                });
-            } catch (e) {
-                Logger.error(e.message, loggerCtx, e.stack);
-                throw e;
-            }
-            if (product) {
-                const updatedProductVariants = await this.connection.getRepository(ProductVariant).findByIds(
-                    product.variants.map(v => v.id),
-                    {
-                        relations: this.variantRelations,
-                        where: {
-                            deletedAt: null,
-                        },
-                        order: {
-                            id: 'ASC',
-                        },
+                    order: {
+                        id: 'ASC',
                     },
-                );
-                // tslint:disable-next-line:no-non-null-assertion
-                updatedProductVariants.forEach(variant => (variant.product = product!));
-                if (!product.enabled) {
-                    updatedProductVariants.forEach(v => (v.enabled = false));
-                }
-                Logger.debug(`Updating Product (${productId})`, loggerCtx);
-                const languageVariants: LanguageCode[] = [];
-                languageVariants.push(...product.translations.map(t => t.languageCode));
-                for (const variant of product.variants) {
-                    languageVariants.push(...variant.translations.map(t => t.languageCode));
-                }
-                const uniqueLanguageVariants = unique(languageVariants);
+                },
+            );
+            // tslint:disable-next-line:no-non-null-assertion
+            updatedProductVariants.forEach(variant => (variant.product = product!));
+            if (!product.enabled) {
+                updatedProductVariants.forEach(v => (v.enabled = false));
+            }
+            Logger.debug(`Updating Product (${productId})`, loggerCtx);
+            const languageVariants: LanguageCode[] = [];
+            languageVariants.push(...product.translations.map(t => t.languageCode));
+            for (const variant of product.variants) {
+                languageVariants.push(...variant.translations.map(t => t.languageCode));
+            }
+            const uniqueLanguageVariants = unique(languageVariants);
 
-                for (const channel of product.channels) {
-                    ctx.setChannel(channel);
+            for (const channel of product.channels) {
+                ctx.setChannel(channel);
 
-                    const variantsInChannel = updatedProductVariants.filter(v =>
-                        v.channels.map(c => c.id).includes(ctx.channelId),
-                    );
-                    for (const variant of variantsInChannel) {
-                        await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
-                    }
-                    for (const languageCode of uniqueLanguageVariants) {
-                        if (variantsInChannel.length) {
-                            for (const variant of variantsInChannel) {
-                                operations.push(
-                                    {
-                                        index: VARIANT_INDEX_NAME,
-                                        operation: {
-                                            update: {
-                                                _id: ElasticsearchIndexerController.getId(
-                                                    variant.id,
-                                                    ctx.channelId,
-                                                    languageCode,
-                                                ),
-                                            },
-                                        },
-                                    },
-                                    {
-                                        index: VARIANT_INDEX_NAME,
-                                        operation: {
-                                            doc: await this.createVariantIndexItem(
-                                                variant,
-                                                variantsInChannel,
-                                                ctx,
-                                                languageCode,
-                                            ),
-                                            doc_as_upsert: true,
-                                        },
-                                    },
-                                );
-                            }
-                        } else {
+                const variantsInChannel = updatedProductVariants.filter(v =>
+                    v.channels.map(c => c.id).includes(ctx.channelId),
+                );
+                for (const variant of variantsInChannel) {
+                    await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
+                }
+                for (const languageCode of uniqueLanguageVariants) {
+                    if (variantsInChannel.length) {
+                        for (const variant of variantsInChannel) {
                             operations.push(
                                 {
                                     index: VARIANT_INDEX_NAME,
                                     operation: {
                                         update: {
                                             _id: ElasticsearchIndexerController.getId(
-                                                -product.id,
+                                                variant.id,
                                                 ctx.channelId,
                                                 languageCode,
                                             ),
@@ -565,16 +568,58 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                                 {
                                     index: VARIANT_INDEX_NAME,
                                     operation: {
-                                        doc: this.createSyntheticProductIndexItem(product, ctx, languageCode),
+                                        doc: await this.createVariantIndexItem(
+                                            variant,
+                                            variantsInChannel,
+                                            ctx,
+                                            languageCode,
+                                        ),
                                         doc_as_upsert: true,
                                     },
                                 },
                             );
                         }
+                    } else {
+                        operations.push(
+                            {
+                                index: VARIANT_INDEX_NAME,
+                                operation: {
+                                    update: {
+                                        _id: ElasticsearchIndexerController.getId(
+                                            -product.id,
+                                            ctx.channelId,
+                                            languageCode,
+                                        ),
+                                    },
+                                },
+                            },
+                            {
+                                index: VARIANT_INDEX_NAME,
+                                operation: {
+                                    doc: this.createSyntheticProductIndexItem(product, ctx, languageCode),
+                                    doc_as_upsert: true,
+                                },
+                            },
+                        );
                     }
                 }
             }
         }
+
+        return operations;
+    }
+
+    private async updateProductsOperations(
+        ctx: MutableRequestContext,
+        productIds: ID[],
+    ): Promise<BulkVariantOperation[]> {
+        Logger.debug(`Updating ${productIds.length} Products`, loggerCtx);
+        const operations: BulkVariantOperation[] = [];
+
+        for (const productId of productIds) {
+            operations.push(...(await this.deleteProductOperations(ctx, productId)));
+            operations.push(...(await this.updateProductsOperationsOnly(ctx, productId)));
+        }
         return operations;
     }
 
@@ -692,14 +737,14 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         return unique(variants.map(v => v.product.id));
     }
 
-    private async executeBulkOperations(operations: BulkVariantOperation[]) {
+    private async executeBulkOperations(operations: BulkVariantOperation[], indexName = VARIANT_INDEX_NAME) {
         const variantOperations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
 
         for (const operation of operations) {
             variantOperations.push(operation.operation);
         }
 
-        return Promise.all([this.runBulkOperationsOnIndex(VARIANT_INDEX_NAME, variantOperations)]);
+        return Promise.all([this.runBulkOperationsOnIndex(indexName, variantOperations)]);
     }
 
     private async runBulkOperationsOnIndex(