Browse Source

perf(elasticsearch): added optimization for each opirations during reindex and other operations

Eugene Nitsenko 2 years ago
parent
commit
719cea5882

+ 223 - 176
packages/elasticsearch-plugin/src/indexing/indexer.controller.ts

@@ -1,18 +1,18 @@
-import { Client } from '@elastic/elasticsearch';
-import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
+import type { Client } from '@elastic/elasticsearch';
+import type { OnModuleDestroy, OnModuleInit } from '@nestjs/common';
+import { Inject, Injectable } from '@nestjs/common';
 import { unique } from '@vendure/common/lib/unique';
 import { unique } from '@vendure/common/lib/unique';
 import {
 import {
     Asset,
     Asset,
-    asyncObservable,
-    AsyncQueue,
-    Channel,
     Collection,
     Collection,
-    ConfigService,
     EntityRelationPaths,
     EntityRelationPaths,
     FacetValue,
     FacetValue,
     ID,
     ID,
-    InternalServerError,
     LanguageCode,
     LanguageCode,
+    AsyncQueue,
+    Channel,
+    ConfigService,
+    InternalServerError,
     Logger,
     Logger,
     Product,
     Product,
     ProductPriceApplicator,
     ProductPriceApplicator,
@@ -27,9 +27,9 @@ import {
 import { Observable } from 'rxjs';
 import { Observable } from 'rxjs';
 import { In, IsNull } from 'typeorm';
 import { In, IsNull } from 'typeorm';
 
 
-import { ELASTIC_SEARCH_OPTIONS, loggerCtx, VARIANT_INDEX_NAME } from '../constants';
-import { ElasticsearchOptions } from '../options';
-import {
+import { ELASTIC_SEARCH_OPTIONS, VARIANT_INDEX_NAME, loggerCtx } from '../constants';
+import type { ElasticsearchOptions } from '../options';
+import type {
     BulkOperation,
     BulkOperation,
     BulkOperationDoc,
     BulkOperationDoc,
     BulkResponseBody,
     BulkResponseBody,
@@ -46,6 +46,7 @@ import {
 
 
 import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils';
 import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils';
 import { MutableRequestContext } from './mutable-request-context';
 import { MutableRequestContext } from './mutable-request-context';
+import { asyncObservable } from "@vendure/core/src";
 
 
 export const defaultProductRelations: Array<EntityRelationPaths<Product>> = [
 export const defaultProductRelations: Array<EntityRelationPaths<Product>> = [
     'featuredAsset',
     'featuredAsset',
@@ -111,7 +112,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     /**
     /**
      * Updates the search index only for the affected product.
      * Updates the search index only for the affected product.
      */
      */
-    async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
+    async updateProduct({ctx: rawContext, productId}: UpdateProductMessageData): Promise<boolean> {
         const ctx = MutableRequestContext.deserialize(rawContext);
         const ctx = MutableRequestContext.deserialize(rawContext);
         await this.updateProductsInternal(ctx, [productId]);
         await this.updateProductsInternal(ctx, [productId]);
         return true;
         return true;
@@ -120,12 +121,11 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     /**
     /**
      * Updates the search index only for the affected product.
      * Updates the search index only for the affected product.
      */
      */
-    async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
-        const operations = await this.deleteProductOperations(
+    async deleteProduct({ctx: rawContext, productId}: UpdateProductMessageData): Promise<boolean> {
+        await this.deleteProductOperations(
             RequestContext.deserialize(rawContext),
             RequestContext.deserialize(rawContext),
             productId,
             productId,
         );
         );
-        await this.executeBulkOperations(operations);
         return true;
         return true;
     }
     }
 
 
@@ -180,7 +180,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     /**
     /**
      * Updates the search index only for the affected entities.
      * Updates the search index only for the affected entities.
      */
      */
-    async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
+    async updateVariants({ctx: rawContext, variantIds}: UpdateVariantMessageData): Promise<boolean> {
         const ctx = MutableRequestContext.deserialize(rawContext);
         const ctx = MutableRequestContext.deserialize(rawContext);
         return this.asyncQueue.push(async () => {
         return this.asyncQueue.push(async () => {
             const productIds = await this.getProductIdsByVariantIds(variantIds);
             const productIds = await this.getProductIdsByVariantIds(variantIds);
@@ -189,7 +189,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         });
         });
     }
     }
 
 
-    async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
+    async deleteVariants({ctx: rawContext, variantIds}: UpdateVariantMessageData): Promise<boolean> {
         const ctx = MutableRequestContext.deserialize(rawContext);
         const ctx = MutableRequestContext.deserialize(rawContext);
         const productIds = await this.getProductIdsByVariantIds(variantIds);
         const productIds = await this.getProductIdsByVariantIds(variantIds);
         for (const productId of productIds) {
         for (const productId of productIds) {
@@ -236,9 +236,9 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 const ctx = MutableRequestContext.deserialize(rawContext);
                 const ctx = MutableRequestContext.deserialize(rawContext);
 
 
                 const reindexTempName = new Date().getTime();
                 const reindexTempName = new Date().getTime();
-                const variantIndexName = this.options.indexPrefix + VARIANT_INDEX_NAME;
-                const variantIndexNameForReindex = VARIANT_INDEX_NAME + `-reindex-${reindexTempName}`;
-                const reindexVariantAliasName = this.options.indexPrefix + variantIndexNameForReindex;
+                const variantIndexName = `${this.options.indexPrefix}${VARIANT_INDEX_NAME}`;
+                const variantIndexNameForReindex = `${VARIANT_INDEX_NAME}-reindex-${reindexTempName}`;
+                const reindexVariantAliasName = `${this.options.indexPrefix}${variantIndexNameForReindex}`;
                 try {
                 try {
                     await createIndices(
                     await createIndices(
                         this.client,
                         this.client,
@@ -267,8 +267,6 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 let skip = 0;
                 let skip = 0;
                 let finishedProductsCount = 0;
                 let finishedProductsCount = 0;
                 do {
                 do {
-                    let operations: BulkVariantOperation[] = [];
-
                     productIds = await this.connection
                     productIds = await this.connection
                         .rawConnection
                         .rawConnection
                         .getRepository(Product)
                         .getRepository(Product)
@@ -279,18 +277,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         .take(this.options.reindexProductsChunkSize)
                         .take(this.options.reindexProductsChunkSize)
                         .getMany();
                         .getMany();
 
 
-                    for (const { id: productId } of productIds) {
-                        operations.push(...(await this.updateProductsOperationsOnly(ctx, productId)));
-                        if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
-                            // Because we can have a huge amount of variant for 1 product, we also chunk update operations
-                            await this.executeBulkOperationsByChunks(
-                                this.options.reindexBulkOperationSizeLimit,
-                                operations,
-                                variantIndexNameForReindex,
-                            );
-                            // Reset operations to avoid memory peaks with huge amount of operations
-                            operations = [];
-                        }
+                    for (const {id: productId} of productIds) {
+                        await this.updateProductsOperationsOnly(ctx, productId, variantIndexNameForReindex);
                         finishedProductsCount++;
                         finishedProductsCount++;
                         observer.next({
                         observer.next({
                             total: totalProductIds,
                             total: totalProductIds,
@@ -299,98 +287,13 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         });
                         });
                     }
                     }
 
 
-                    Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx);
-
-                    // Because we can have a huge amount of variant for 1 product, we also chunk update operations
-                    await this.executeBulkOperationsByChunks(
-                        this.options.reindexBulkOperationSizeLimit,
-                        operations,
-                        variantIndexNameForReindex,
-                    );
-
                     skip += this.options.reindexProductsChunkSize;
                     skip += this.options.reindexProductsChunkSize;
 
 
                     Logger.verbose(`Done ${finishedProductsCount} / ${totalProductIds} products`);
                     Logger.verbose(`Done ${finishedProductsCount} / ${totalProductIds} products`);
-                } while (productIds.length >= this.options.reindexProductsChunkSize);
+                } while (productIds.length >= this.options.reindexProductsChunkSize)
 
 
                 // Switch the index to the new reindexed one
                 // Switch the index to the new reindexed one
-                try {
-                    const reindexVariantAliasExist = await this.client.indices.existsAlias({
-                        name: reindexVariantAliasName,
-                    });
-                    if (reindexVariantAliasExist) {
-                        const reindexVariantIndexName = await getIndexNameByAlias(
-                            this.client,
-                            reindexVariantAliasName,
-                        );
-                        const originalVariantAliasExist = await this.client.indices.existsAlias({
-                            name: variantIndexName,
-                        });
-                        const originalVariantIndexExist = await this.client.indices.exists({
-                            index: variantIndexName,
-                        });
-
-                        const originalVariantIndexName = await getIndexNameByAlias(
-                            this.client,
-                            variantIndexName,
-                        );
-
-                        const actions = [
-                            {
-                                remove: {
-                                    index: reindexVariantIndexName,
-                                    alias: reindexVariantAliasName,
-                                },
-                            },
-                            {
-                                add: {
-                                    index: reindexVariantIndexName,
-                                    alias: variantIndexName,
-                                },
-                            },
-                        ];
-
-                        if (originalVariantAliasExist.body) {
-                            actions.push({
-                                remove: {
-                                    index: originalVariantIndexName,
-                                    alias: variantIndexName,
-                                },
-                            });
-                        } else if (originalVariantIndexExist.body) {
-                            await this.client.indices.delete({
-                                index: [variantIndexName],
-                            });
-                        }
-
-                        await this.client.indices.updateAliases({
-                            body: {
-                                actions,
-                            },
-                        });
-
-                        if (originalVariantAliasExist.body) {
-                            await this.client.indices.delete({
-                                index: [originalVariantIndexName],
-                            });
-                        }
-                    }
-                } catch (e: any) {
-                    Logger.error('Could not switch indexes');
-                } finally {
-                    const reindexVariantAliasExist = await this.client.indices.existsAlias({
-                        name: reindexVariantAliasName,
-                    });
-                    if (reindexVariantAliasExist.body) {
-                        const reindexVariantAliasResult = await this.client.indices.getAlias({
-                            name: reindexVariantAliasName,
-                        });
-                        const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0];
-                        await this.client.indices.delete({
-                            index: [reindexVariantIndexName],
-                        });
-                    }
-                }
+                await this.switchAlias(reindexVariantAliasName, variantIndexName);
 
 
                 Logger.verbose('Completed reindexing!', loggerCtx);
                 Logger.verbose('Completed reindexing!', loggerCtx);
 
 
@@ -400,7 +303,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                     duration: +new Date() - timeStart,
                     duration: +new Date() - timeStart,
                 };
                 };
             });
             });
-        });
+        })
     }
     }
 
 
     async executeBulkOperationsByChunks(
     async executeBulkOperationsByChunks(
@@ -408,6 +311,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         operations: BulkVariantOperation[],
         operations: BulkVariantOperation[],
         index = VARIANT_INDEX_NAME,
         index = VARIANT_INDEX_NAME,
     ): Promise<void> {
     ): Promise<void> {
+        Logger.verbose(`Will execute ${operations.length} bulk update operations with index ${index}`, loggerCtx);
         let i;
         let i;
         let j;
         let j;
         let processedOperation = 0;
         let processedOperation = 0;
@@ -441,7 +345,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
 
 
     private async updateAssetFocalPointForIndex(indexName: string, asset: Asset): Promise<boolean> {
     private async updateAssetFocalPointForIndex(indexName: string, asset: Asset): Promise<boolean> {
         const focalPoint = asset.focalPoint || null;
         const focalPoint = asset.focalPoint || null;
-        const params = { focalPoint };
+        const params = {focalPoint};
         return this.updateAssetForIndex(
         return this.updateAssetForIndex(
             indexName,
             indexName,
             asset,
             asset,
@@ -482,9 +386,9 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 },
                 },
             },
             },
         });
         });
-        for (const failure of result1.body.failures) {
+        for (const failure of result1.body.failures)
             Logger.error(`${failure.cause.type as string}: ${failure.cause.reason as string}`, loggerCtx);
             Logger.error(`${failure.cause.type as string}: ${failure.cause.reason as string}`, loggerCtx);
-        }
+
         const result2 = await this.client.update_by_query({
         const result2 = await this.client.update_by_query({
             index: this.options.indexPrefix + indexName,
             index: this.options.indexPrefix + indexName,
             body: {
             body: {
@@ -503,32 +407,117 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     }
     }
 
 
     private async updateProductsInternal(ctx: MutableRequestContext, productIds: ID[]) {
     private async updateProductsInternal(ctx: MutableRequestContext, productIds: ID[]) {
-        const operations = await this.updateProductsOperations(ctx, productIds);
-        await this.executeBulkOperations(operations);
+        await this.updateProductsOperations(ctx, productIds);
+    }
+
+    private async switchAlias(
+        reindexVariantAliasName: string,
+        variantIndexName: string,
+    ): Promise<void> {
+        try {
+            const reindexVariantAliasExist = await this.client.indices.existsAlias({
+                name: reindexVariantAliasName,
+            });
+            if (reindexVariantAliasExist) {
+                const reindexVariantIndexName = await getIndexNameByAlias(
+                    this.client,
+                    reindexVariantAliasName,
+                );
+                const originalVariantAliasExist = await this.client.indices.existsAlias({
+                    name: variantIndexName,
+                });
+                const originalVariantIndexExist = await this.client.indices.exists({
+                    index: variantIndexName,
+                });
+
+                const originalVariantIndexName = await getIndexNameByAlias(
+                    this.client,
+                    variantIndexName,
+                );
+
+                const actions = [
+                    {
+                        remove: {
+                            index: reindexVariantIndexName,
+                            alias: reindexVariantAliasName,
+                        },
+                    },
+                    {
+                        add: {
+                            index: reindexVariantIndexName,
+                            alias: variantIndexName,
+                        },
+                    },
+                ];
+
+                if (originalVariantAliasExist.body) {
+                    actions.push({
+                        remove: {
+                            index: originalVariantIndexName,
+                            alias: variantIndexName,
+                        },
+                    });
+                } else if (originalVariantIndexExist.body) {
+                    await this.client.indices.delete({
+                        index: [variantIndexName],
+                    });
+                }
+
+                await this.client.indices.updateAliases({
+                    body: {
+                        actions,
+                    },
+                });
+
+                if (originalVariantAliasExist.body) {
+                    await this.client.indices.delete({
+                        index: [originalVariantIndexName],
+                    });
+                }
+            }
+        } catch (e: any) {
+            Logger.error('Could not switch indexes');
+        } finally {
+            const reindexVariantAliasExist = await this.client.indices.existsAlias({
+                name: reindexVariantAliasName,
+            });
+            if (reindexVariantAliasExist.body) {
+                const reindexVariantAliasResult = await this.client.indices.getAlias({
+                    name: reindexVariantAliasName,
+                });
+                const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0]
+                await this.client.indices.delete({
+                    index: [reindexVariantIndexName],
+                });
+            }
+        }
     }
     }
 
 
     private async updateProductsOperationsOnly(
     private async updateProductsOperationsOnly(
         ctx: MutableRequestContext,
         ctx: MutableRequestContext,
         productId: ID,
         productId: ID,
-    ): Promise<BulkVariantOperation[]> {
-        const operations: BulkVariantOperation[] = [];
+        index = VARIANT_INDEX_NAME
+    ): Promise<void> {
+        let operations: BulkVariantOperation[] = [];
         let product: Product | undefined;
         let product: Product | undefined;
         try {
         try {
             product = await this.connection
             product = await this.connection
+                .rawConnection
                 .getRepository(Product)
                 .getRepository(Product)
-                .findOne({
-                    where: { id: productId, deletedAt: IsNull() },
+                .find({
+                    relationLoadStrategy: 'query',
+                    where: {id: productId, deletedAt: IsNull()},
                     relations: this.productRelations,
                     relations: this.productRelations,
                 })
                 })
-                .then(result => result ?? undefined);
+                .then(result => result[0] ?? undefined);
         } catch (e: any) {
         } catch (e: any) {
             Logger.error(e.message, loggerCtx, e.stack);
             Logger.error(e.message, loggerCtx, e.stack);
             throw e;
             throw e;
         }
         }
-        if (!product) {
-            return operations;
-        }
-        const updatedProductVariants = await this.connection.getRepository(ProductVariant).find({
+        if (!product)
+            return;
+
+        const updatedProductVariants = await this.connection.rawConnection.getRepository(ProductVariant).find({
             relations: this.variantRelations,
             relations: this.variantRelations,
             where: {
             where: {
                 productId,
                 productId,
@@ -537,27 +526,28 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
             order: {
             order: {
                 id: 'ASC',
                 id: 'ASC',
             },
             },
+            relationLoadStrategy: 'query',
         });
         });
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+
         updatedProductVariants.forEach(variant => (variant.product = product!));
         updatedProductVariants.forEach(variant => (variant.product = product!));
-        if (!product.enabled) {
+        if (!product.enabled)
             updatedProductVariants.forEach(v => (v.enabled = false));
             updatedProductVariants.forEach(v => (v.enabled = false));
-        }
-        Logger.debug(`Updating Product (${productId})`, loggerCtx);
-        const languageVariants: LanguageCode[] = [];
+
+        Logger.debug(`Updating Product (${productId})`, loggerCtx)
+        const languageVariants: LanguageCode[] = []
         languageVariants.push(...product.translations.map(t => t.languageCode));
         languageVariants.push(...product.translations.map(t => t.languageCode));
-        for (const variant of updatedProductVariants) {
+        for (const variant of updatedProductVariants)
             languageVariants.push(...variant.translations.map(t => t.languageCode));
             languageVariants.push(...variant.translations.map(t => t.languageCode));
-        }
-        const uniqueLanguageVariants = unique(languageVariants);
+
+        const uniqueLanguageVariants = unique(languageVariants)
         for (const channel of product.channels) {
         for (const channel of product.channels) {
             ctx.setChannel(channel);
             ctx.setChannel(channel);
             const variantsInChannel = updatedProductVariants.filter(v =>
             const variantsInChannel = updatedProductVariants.filter(v =>
                 v.channels.map(c => c.id).includes(ctx.channelId),
                 v.channels.map(c => c.id).includes(ctx.channelId),
             );
             );
-            for (const variant of variantsInChannel) {
+            for (const variant of variantsInChannel)
                 await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
                 await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
-            }
+
             for (const languageCode of uniqueLanguageVariants) {
             for (const languageCode of uniqueLanguageVariants) {
                 if (variantsInChannel.length) {
                 if (variantsInChannel.length) {
                     for (const variant of variantsInChannel) {
                     for (const variant of variantsInChannel) {
@@ -586,7 +576,17 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                                     doc_as_upsert: true,
                                     doc_as_upsert: true,
                                 },
                                 },
                             },
                             },
-                        );
+                        )
+
+                        if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
+                            // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+                            await this.executeBulkOperationsByChunks(
+                                this.options.reindexBulkOperationSizeLimit,
+                                operations,
+                                index,
+                            );
+                            operations = []
+                        }
                     }
                     }
                 } else {
                 } else {
                     operations.push(
                     operations.push(
@@ -605,30 +605,44 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         {
                         {
                             index: VARIANT_INDEX_NAME,
                             index: VARIANT_INDEX_NAME,
                             operation: {
                             operation: {
-                                doc: this.createSyntheticProductIndexItem(product, ctx, languageCode),
+                                doc: await this.createSyntheticProductIndexItem(product, ctx, languageCode),
                                 doc_as_upsert: true,
                                 doc_as_upsert: true,
                             },
                             },
                         },
                         },
                     );
                     );
                 }
                 }
+                if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
+                    // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+                    await this.executeBulkOperationsByChunks(
+                        this.options.reindexBulkOperationSizeLimit,
+                        operations,
+                        index,
+                    );
+                    operations = []
+                }
             }
             }
         }
         }
 
 
-        return operations;
+        // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+        await this.executeBulkOperationsByChunks(
+            this.options.reindexBulkOperationSizeLimit,
+            operations,
+            index,
+        );
+
+        return;
     }
     }
 
 
     private async updateProductsOperations(
     private async updateProductsOperations(
         ctx: MutableRequestContext,
         ctx: MutableRequestContext,
         productIds: ID[],
         productIds: ID[],
-    ): Promise<BulkVariantOperation[]> {
-        Logger.debug(`Updating ${productIds.length} Products`, loggerCtx);
-        const operations: BulkVariantOperation[] = [];
-
+    ): Promise<void> {
+        Logger.debug(`Updating ${productIds.length} Products`, loggerCtx)
         for (const productId of productIds) {
         for (const productId of productIds) {
-            operations.push(...(await this.deleteProductOperations(ctx, productId)));
-            operations.push(...(await this.updateProductsOperationsOnly(ctx, productId)));
+            await this.deleteProductOperations(ctx, productId);
+            await this.updateProductsOperationsOnly(ctx, productId);
         }
         }
-        return operations;
+        return;
     }
     }
 
 
     /**
     /**
@@ -649,7 +663,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         for (const relation of hydratedRelations) {
         for (const relation of hydratedRelations) {
             let path = relation.split('.');
             let path = relation.split('.');
             if (path[0] === 'customFields') {
             if (path[0] === 'customFields') {
-                if (2 < path.length) {
+                if (path.length > 2) {
                     throw new InternalServerError(
                     throw new InternalServerError(
                         [
                         [
                             'hydrateProductRelations / hydrateProductVariantRelations does not currently support nested custom field relations',
                             'hydrateProductRelations / hydrateProductVariantRelations does not currently support nested custom field relations',
@@ -674,7 +688,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     private async deleteProductOperations(
     private async deleteProductOperations(
         ctx: RequestContext,
         ctx: RequestContext,
         productId: ID,
         productId: ID,
-    ): Promise<BulkVariantOperation[]> {
+        index: string = VARIANT_INDEX_NAME,
+    ): Promise<void> {
         const channels = await this.requestContextCache.get(ctx, 'elastic-index-all-channels', () =>
         const channels = await this.requestContextCache.get(ctx, 'elastic-index-all-channels', () =>
             this.connection
             this.connection
                 .rawConnection
                 .rawConnection
@@ -701,20 +716,20 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
             .andWhere('channel.id = :channelId', { channelId: ctx.channelId })
             .andWhere('channel.id = :channelId', { channelId: ctx.channelId })
             .getOne();
             .getOne();
 
 
-        if (!product) {
-            return [];
-        }
+        if (!product)
+            return;
 
 
         Logger.debug(`Deleting 1 Product (id: ${productId})`, loggerCtx);
         Logger.debug(`Deleting 1 Product (id: ${productId})`, loggerCtx);
-        const operations: BulkVariantOperation[] = [];
+        let operations: BulkVariantOperation[] = [];
         const languageVariants: LanguageCode[] = [];
         const languageVariants: LanguageCode[] = [];
         languageVariants.push(...product.translations.map(t => t.languageCode));
         languageVariants.push(...product.translations.map(t => t.languageCode));
-        for (const variant of product.variants) {
+        for (const variant of product.variants)
             languageVariants.push(...variant.translations.map(t => t.languageCode));
             languageVariants.push(...variant.translations.map(t => t.languageCode));
-        }
+
         const uniqueLanguageVariants = unique(languageVariants);
         const uniqueLanguageVariants = unique(languageVariants);
 
 
-        for (const { id: channelId } of channels) {
+
+        for (const {id: channelId} of channels) {
             for (const languageCode of uniqueLanguageVariants) {
             for (const languageCode of uniqueLanguageVariants) {
                 operations.push({
                 operations.push({
                     index: VARIANT_INDEX_NAME,
                     index: VARIANT_INDEX_NAME,
@@ -724,25 +739,42 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         },
                         },
                     },
                     },
                 });
                 });
+                if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
+                    // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+                    await this.executeBulkOperationsByChunks(
+                        this.options.reindexBulkOperationSizeLimit,
+                        operations,
+                        index,
+                    );
+                    operations = []
+                }
             }
             }
         }
         }
-        operations.push(
-            ...(await this.deleteVariantsInternalOperations(
-                product.variants,
-                channels.map(c => c.id),
-                uniqueLanguageVariants,
-            )),
+        // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+        await this.executeBulkOperationsByChunks(
+            this.options.reindexBulkOperationSizeLimit,
+            operations,
+            index,
+        );
+
+        await this.deleteVariantsInternalOperations(
+            product.variants,
+            channels.map(c => c.id),
+            uniqueLanguageVariants,
+            index,
         );
         );
-        return operations;
+
+        return;
     }
     }
 
 
     private async deleteVariantsInternalOperations(
     private async deleteVariantsInternalOperations(
         variants: ProductVariant[],
         variants: ProductVariant[],
         channelIds: ID[],
         channelIds: ID[],
         languageVariants: LanguageCode[],
         languageVariants: LanguageCode[],
-    ): Promise<BulkVariantOperation[]> {
-        Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx);
-        const operations: BulkVariantOperation[] = [];
+        index = VARIANT_INDEX_NAME,
+    ): Promise<void> {
+        Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx)
+        let operations: BulkVariantOperation[] = [];
         for (const variant of variants) {
         for (const variant of variants) {
             for (const channelId of channelIds) {
             for (const channelId of channelIds) {
                 for (const languageCode of languageVariants) {
                 for (const languageCode of languageVariants) {
@@ -757,11 +789,26 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                                 ),
                                 ),
                             },
                             },
                         },
                         },
-                    });
+                    })
+                    if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
+                        // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+                        await this.executeBulkOperationsByChunks(
+                            this.options.reindexBulkOperationSizeLimit,
+                            operations,
+                            index,
+                        )
+                        operations = []
+                    }
                 }
                 }
             }
             }
         }
         }
-        return operations;
+        // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+        await this.executeBulkOperationsByChunks(
+            this.options.reindexBulkOperationSizeLimit,
+            operations,
+            index,
+        )
+        return
     }
     }
 
 
     private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {
     private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {

+ 1 - 1
packages/elasticsearch-plugin/src/options.ts

@@ -712,7 +712,7 @@ export const defaultOptions: ElasticsearchRuntimeOptions = {
     indexPrefix: 'vendure-',
     indexPrefix: 'vendure-',
     indexSettings: {},
     indexSettings: {},
     indexMappingProperties: {},
     indexMappingProperties: {},
-    reindexProductsChunkSize: 500,
+    reindexProductsChunkSize: 2500,
     reindexBulkOperationSizeLimit: 3000,
     reindexBulkOperationSizeLimit: 3000,
     searchConfig: {
     searchConfig: {
         facetValueMaxSize: 50,
         facetValueMaxSize: 50,