Browse Source

Merge branch 'master' into minor

Michael Bromley 1 year ago
parent
commit
cd80b7d7f9

+ 17 - 0
CHANGELOG.md

@@ -1,3 +1,20 @@
+## <small>2.1.7 (2024-02-06)</small>
+
+
+#### Fixes
+
+* **admin-ui** Add missing Ukrainian translation (#2638) ([67c8c90](https://github.com/vendure-ecommerce/vendure/commit/67c8c90)), closes [#2638](https://github.com/vendure-ecommerce/vendure/issues/2638)
+* **admin-ui** Improve German translation (#2639) ([30f5e91](https://github.com/vendure-ecommerce/vendure/commit/30f5e91)), closes [#2639](https://github.com/vendure-ecommerce/vendure/issues/2639)
+* **admin-ui** Reset page to 1 on viewing collection contents from list ([daa4731](https://github.com/vendure-ecommerce/vendure/commit/daa4731))
+* **core** Add product translation to product variant entity resolver (#2644) ([9289a1c](https://github.com/vendure-ecommerce/vendure/commit/9289a1c)), closes [#2644](https://github.com/vendure-ecommerce/vendure/issues/2644)
+* **core** Ensure deterministic sorting in case of duplicates in DefaultSearchPlugin search query (#2632) ([81b4607](https://github.com/vendure-ecommerce/vendure/commit/81b4607)), closes [#2632](https://github.com/vendure-ecommerce/vendure/issues/2632)
+* **core** Fix undefined reference error in product variant resolver ([4cceb70](https://github.com/vendure-ecommerce/vendure/commit/4cceb70))
+* **core** Improve handling of active jobs on worker shutdown ([e1e0987](https://github.com/vendure-ecommerce/vendure/commit/e1e0987))
+* **core** Improved resolution of Administrator.user ([c2a4685](https://github.com/vendure-ecommerce/vendure/commit/c2a4685)), closes [#1489](https://github.com/vendure-ecommerce/vendure/issues/1489)
+* **core** Update translations parent `updatedAt` column when updating translation (#2630) ([44fc828](https://github.com/vendure-ecommerce/vendure/commit/44fc828)), closes [#2630](https://github.com/vendure-ecommerce/vendure/issues/2630)
+* **elasticsearch-plugin** Optimize memory usage when indexing ([#2327](https://github.com/vendure-ecommerce/vendure/pull/2327))
+* **payments-plugin** Don't handle mollie webhook for any state after PaymentSettled (#2657) ([754da02](https://github.com/vendure-ecommerce/vendure/commit/754da02)), closes [#2657](https://github.com/vendure-ecommerce/vendure/issues/2657)
+
 ## <small>2.1.6 (2024-01-16)</small>
 
 

+ 4 - 5
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -150,10 +150,10 @@ class ActiveQueue<Data extends JobData<Data> = object> {
         void runNextJobs();
     }
 
-    async stop(): Promise<void> {
+    async stop(stopActiveQueueTimeout = 20_000): Promise<void> {
         this.running = false;
         clearTimeout(this.timer);
-        await this.awaitRunningJobsOrTimeout();
+        await this.awaitRunningJobsOrTimeout(stopActiveQueueTimeout);
         Logger.info(`Stopped queue: ${this.queueName}`);
         this.subscription.unsubscribe();
         // Allow any job status changes to be persisted
@@ -163,9 +163,8 @@ class ActiveQueue<Data extends JobData<Data> = object> {
         await new Promise(resolve => setTimeout(resolve, 1000));
     }
 
-    private awaitRunningJobsOrTimeout(): Promise<void> {
+    private awaitRunningJobsOrTimeout(stopActiveQueueTimeout = 20_000): Promise<void> {
         const start = +new Date();
-        const stopActiveQueueTimeout = 20_000;
         let timeout: ReturnType<typeof setTimeout>;
         return new Promise(resolve => {
             let lastStatusUpdate = +new Date();
@@ -234,7 +233,7 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
     public setRetries: (queueName: string, job: Job) => number;
     public backOffStrategy?: BackoffStrategy;
 
-    private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
+    protected activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
 
     constructor(config?: PollingJobQueueStrategyConfig);
     constructor(concurrency?: number, pollInterval?: number);

+ 12 - 0
packages/core/src/job-queue/testing-job-queue-strategy.ts

@@ -1,5 +1,6 @@
 import { InMemoryJobQueueStrategy } from './in-memory-job-queue-strategy';
 import { Job } from './job';
+import { JobData } from './types';
 
 /**
  * @description
@@ -11,4 +12,15 @@ export class TestingJobQueueStrategy extends InMemoryJobQueueStrategy {
             await this.add(job);
         }
     }
+
+    override async stop<Data extends JobData<Data> = object>(
+        queueName: string,
+        process: (job: Job<Data>) => Promise<any>,
+    ) {
+        const active = this.activeQueues.getAndDelete(queueName, process);
+        if (!active) {
+            return;
+        }
+        await active.stop(1_000);
+    }
 }

+ 204 - 157
packages/elasticsearch-plugin/src/indexing/indexer.controller.ts

@@ -30,7 +30,7 @@ import {
 import { Observable } from 'rxjs';
 import { In, IsNull } from 'typeorm';
 
-import { ELASTIC_SEARCH_OPTIONS, loggerCtx, VARIANT_INDEX_NAME } from '../constants';
+import { ELASTIC_SEARCH_OPTIONS, VARIANT_INDEX_NAME, loggerCtx } from '../constants';
 import { ElasticsearchOptions } from '../options';
 import {
     BulkOperation,
@@ -49,9 +49,6 @@ import {
 
 import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils';
 
-const REINDEX_CHUNK_SIZE = 2500;
-const REINDEX_OPERATION_CHUNK_SIZE = 3000;
-
 export const defaultProductRelations: Array<EntityRelationPaths<Product>> = [
     'featuredAsset',
     'facetValues',
@@ -129,11 +126,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
      * Updates the search index only for the affected product.
      */
     async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
-        const operations = await this.deleteProductOperations(
-            RequestContext.deserialize(rawContext),
-            productId,
-        );
-        await this.executeBulkOperations(operations);
+        await this.deleteProductOperations(RequestContext.deserialize(rawContext), productId);
         return true;
     }
 
@@ -244,9 +237,9 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 const ctx = MutableRequestContext.deserialize(rawContext);
 
                 const reindexTempName = new Date().getTime();
-                const variantIndexName = this.options.indexPrefix + VARIANT_INDEX_NAME;
-                const variantIndexNameForReindex = VARIANT_INDEX_NAME + `-reindex-${reindexTempName}`;
-                const reindexVariantAliasName = this.options.indexPrefix + variantIndexNameForReindex;
+                const variantIndexName = `${this.options.indexPrefix}${VARIANT_INDEX_NAME}`;
+                const variantIndexNameForReindex = `${VARIANT_INDEX_NAME}-reindex-${reindexTempName}`;
+                const reindexVariantAliasName = `${this.options.indexPrefix}${variantIndexNameForReindex}`;
                 try {
                     await createIndices(
                         this.client,
@@ -262,7 +255,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                     throw e;
                 }
 
-                const totalProductIds = await this.connection
+                const totalProductIds = await this.connection.rawConnection
                     .getRepository(Product)
                     .createQueryBuilder('product')
                     .where('product.deletedAt IS NULL')
@@ -274,19 +267,17 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 let skip = 0;
                 let finishedProductsCount = 0;
                 do {
-                    const operations: BulkVariantOperation[] = [];
-
-                    productIds = await this.connection
+                    productIds = await this.connection.rawConnection
                         .getRepository(Product)
                         .createQueryBuilder('product')
                         .select('product.id')
                         .where('product.deletedAt IS NULL')
                         .skip(skip)
-                        .take(REINDEX_CHUNK_SIZE)
+                        .take(this.options.reindexProductsChunkSize)
                         .getMany();
 
                     for (const { id: productId } of productIds) {
-                        operations.push(...(await this.updateProductsOperationsOnly(ctx, productId)));
+                        await this.updateProductsOperationsOnly(ctx, productId, variantIndexNameForReindex);
                         finishedProductsCount++;
                         observer.next({
                             total: totalProductIds,
@@ -295,98 +286,13 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         });
                     }
 
-                    Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx);
-
-                    // Because we can have a huge amount of variant for 1 product, we also chunk update operations
-                    await this.executeBulkOperationsByChunks(
-                        REINDEX_OPERATION_CHUNK_SIZE,
-                        operations,
-                        variantIndexNameForReindex,
-                    );
-
-                    skip += REINDEX_CHUNK_SIZE;
+                    skip += this.options.reindexProductsChunkSize;
 
                     Logger.verbose(`Done ${finishedProductsCount} / ${totalProductIds} products`);
-                } while (productIds.length >= REINDEX_CHUNK_SIZE);
+                } while (productIds.length >= this.options.reindexProductsChunkSize);
 
                 // Switch the index to the new reindexed one
-                try {
-                    const reindexVariantAliasExist = await this.client.indices.existsAlias({
-                        name: reindexVariantAliasName,
-                    });
-                    if (reindexVariantAliasExist) {
-                        const reindexVariantIndexName = await getIndexNameByAlias(
-                            this.client,
-                            reindexVariantAliasName,
-                        );
-                        const originalVariantAliasExist = await this.client.indices.existsAlias({
-                            name: variantIndexName,
-                        });
-                        const originalVariantIndexExist = await this.client.indices.exists({
-                            index: variantIndexName,
-                        });
-
-                        const originalVariantIndexName = await getIndexNameByAlias(
-                            this.client,
-                            variantIndexName,
-                        );
-
-                        const actions = [
-                            {
-                                remove: {
-                                    index: reindexVariantIndexName,
-                                    alias: reindexVariantAliasName,
-                                },
-                            },
-                            {
-                                add: {
-                                    index: reindexVariantIndexName,
-                                    alias: variantIndexName,
-                                },
-                            },
-                        ];
-
-                        if (originalVariantAliasExist.body) {
-                            actions.push({
-                                remove: {
-                                    index: originalVariantIndexName,
-                                    alias: variantIndexName,
-                                },
-                            });
-                        } else if (originalVariantIndexExist.body) {
-                            await this.client.indices.delete({
-                                index: [variantIndexName],
-                            });
-                        }
-
-                        await this.client.indices.updateAliases({
-                            body: {
-                                actions,
-                            },
-                        });
-
-                        if (originalVariantAliasExist.body) {
-                            await this.client.indices.delete({
-                                index: [originalVariantIndexName],
-                            });
-                        }
-                    }
-                } catch (e: any) {
-                    Logger.error('Could not switch indexes');
-                } finally {
-                    const reindexVariantAliasExist = await this.client.indices.existsAlias({
-                        name: reindexVariantAliasName,
-                    });
-                    if (reindexVariantAliasExist.body) {
-                        const reindexVariantAliasResult = await this.client.indices.getAlias({
-                            name: reindexVariantAliasName,
-                        });
-                        const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0];
-                        await this.client.indices.delete({
-                            index: [reindexVariantIndexName],
-                        });
-                    }
-                }
+                await this.switchAlias(reindexVariantAliasName, variantIndexName);
 
                 Logger.verbose('Completed reindexing!', loggerCtx);
 
@@ -404,6 +310,10 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         operations: BulkVariantOperation[],
         index = VARIANT_INDEX_NAME,
     ): Promise<void> {
+        Logger.verbose(
+            `Will execute ${operations.length} bulk update operations with index ${index}`,
+            loggerCtx,
+        );
         let i;
         let j;
         let processedOperation = 0;
@@ -499,61 +409,146 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     }
 
     private async updateProductsInternal(ctx: MutableRequestContext, productIds: ID[]) {
-        const operations = await this.updateProductsOperations(ctx, productIds);
-        await this.executeBulkOperations(operations);
+        await this.updateProductsOperations(ctx, productIds);
+    }
+
+    private async switchAlias(reindexVariantAliasName: string, variantIndexName: string): Promise<void> {
+        try {
+            const reindexVariantAliasExist = await this.client.indices.existsAlias({
+                name: reindexVariantAliasName,
+            });
+            if (reindexVariantAliasExist) {
+                const reindexVariantIndexName = await getIndexNameByAlias(
+                    this.client,
+                    reindexVariantAliasName,
+                );
+                const originalVariantAliasExist = await this.client.indices.existsAlias({
+                    name: variantIndexName,
+                });
+                const originalVariantIndexExist = await this.client.indices.exists({
+                    index: variantIndexName,
+                });
+
+                const originalVariantIndexName = await getIndexNameByAlias(this.client, variantIndexName);
+
+                const actions = [
+                    {
+                        remove: {
+                            index: reindexVariantIndexName,
+                            alias: reindexVariantAliasName,
+                        },
+                    },
+                    {
+                        add: {
+                            index: reindexVariantIndexName,
+                            alias: variantIndexName,
+                        },
+                    },
+                ];
+
+                if (originalVariantAliasExist.body) {
+                    actions.push({
+                        remove: {
+                            index: originalVariantIndexName,
+                            alias: variantIndexName,
+                        },
+                    });
+                } else if (originalVariantIndexExist.body) {
+                    await this.client.indices.delete({
+                        index: [variantIndexName],
+                    });
+                }
+
+                await this.client.indices.updateAliases({
+                    body: {
+                        actions,
+                    },
+                });
+
+                if (originalVariantAliasExist.body) {
+                    await this.client.indices.delete({
+                        index: [originalVariantIndexName],
+                    });
+                }
+            }
+        } catch (e: any) {
+            Logger.error('Could not switch indexes');
+        } finally {
+            const reindexVariantAliasExist = await this.client.indices.existsAlias({
+                name: reindexVariantAliasName,
+            });
+            if (reindexVariantAliasExist.body) {
+                const reindexVariantAliasResult = await this.client.indices.getAlias({
+                    name: reindexVariantAliasName,
+                });
+                const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0];
+                await this.client.indices.delete({
+                    index: [reindexVariantIndexName],
+                });
+            }
+        }
     }
 
     private async updateProductsOperationsOnly(
         ctx: MutableRequestContext,
         productId: ID,
-    ): Promise<BulkVariantOperation[]> {
-        const operations: BulkVariantOperation[] = [];
+        index = VARIANT_INDEX_NAME,
+    ): Promise<void> {
+        let operations: BulkVariantOperation[] = [];
         let product: Product | undefined;
         try {
             product = await this.connection
-                .getRepository(Product)
-                .findOne({
+                .getRepository(ctx, Product)
+                .find({
                     where: { id: productId, deletedAt: IsNull() },
                     relations: this.productRelations,
                 })
-                .then(result => result ?? undefined);
+                .then(result => result[0] ?? undefined);
         } catch (e: any) {
             Logger.error(e.message, loggerCtx, e.stack);
             throw e;
         }
         if (!product) {
-            return operations;
+            return;
         }
-        const updatedProductVariants = await this.connection.getRepository(ProductVariant).find({
-            relations: this.variantRelations,
-            where: {
-                productId,
-                deletedAt: IsNull(),
-            },
-            order: {
-                id: 'ASC',
-            },
-        });
+
+        let updatedProductVariants: ProductVariant[] = [];
+        try {
+            updatedProductVariants = await this.connection.rawConnection.getRepository(ProductVariant).find({
+                relations: this.variantRelations,
+                where: {
+                    productId,
+                    deletedAt: IsNull(),
+                },
+                order: {
+                    id: 'ASC',
+                },
+            });
+        } catch (e: any) {
+            Logger.error(e.message, loggerCtx, e.stack);
+        }
+
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         updatedProductVariants.forEach(variant => (variant.product = product!));
         if (!product.enabled) {
             updatedProductVariants.forEach(v => (v.enabled = false));
         }
+
         Logger.debug(`Updating Product (${productId})`, loggerCtx);
         const languageVariants: LanguageCode[] = [];
         languageVariants.push(...product.translations.map(t => t.languageCode));
-        for (const variant of updatedProductVariants) {
+        for (const variant of updatedProductVariants)
             languageVariants.push(...variant.translations.map(t => t.languageCode));
-        }
+
         const uniqueLanguageVariants = unique(languageVariants);
         for (const channel of product.channels) {
             ctx.setChannel(channel);
             const variantsInChannel = updatedProductVariants.filter(v =>
                 v.channels.map(c => c.id).includes(ctx.channelId),
             );
-            for (const variant of variantsInChannel) {
+            for (const variant of variantsInChannel)
                 await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
-            }
+
             for (const languageCode of uniqueLanguageVariants) {
                 if (variantsInChannel.length) {
                     for (const variant of variantsInChannel) {
@@ -583,6 +578,16 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                                 },
                             },
                         );
+
+                        if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
+                            // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+                            await this.executeBulkOperationsByChunks(
+                                this.options.reindexBulkOperationSizeLimit,
+                                operations,
+                                index,
+                            );
+                            operations = [];
+                        }
                     }
                 } else {
                     operations.push(
@@ -607,24 +612,35 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         },
                     );
                 }
+                if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
+                    // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+                    await this.executeBulkOperationsByChunks(
+                        this.options.reindexBulkOperationSizeLimit,
+                        operations,
+                        index,
+                    );
+                    operations = [];
+                }
             }
         }
 
-        return operations;
+        // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+        await this.executeBulkOperationsByChunks(
+            this.options.reindexBulkOperationSizeLimit,
+            operations,
+            index,
+        );
+
+        return;
     }
 
-    private async updateProductsOperations(
-        ctx: MutableRequestContext,
-        productIds: ID[],
-    ): Promise<BulkVariantOperation[]> {
+    private async updateProductsOperations(ctx: MutableRequestContext, productIds: ID[]): Promise<void> {
         Logger.debug(`Updating ${productIds.length} Products`, loggerCtx);
-        const operations: BulkVariantOperation[] = [];
-
         for (const productId of productIds) {
-            operations.push(...(await this.deleteProductOperations(ctx, productId)));
-            operations.push(...(await this.updateProductsOperationsOnly(ctx, productId)));
+            await this.deleteProductOperations(ctx, productId);
+            await this.updateProductsOperationsOnly(ctx, productId);
         }
-        return operations;
+        return;
     }
 
     /**
@@ -645,7 +661,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         for (const relation of hydratedRelations) {
             let path = relation.split('.');
             if (path[0] === 'customFields') {
-                if (2 < path.length) {
+                if (path.length > 2) {
                     throw new InternalServerError(
                         [
                             'hydrateProductRelations / hydrateProductVariantRelations does not currently support nested custom field relations',
@@ -670,9 +686,10 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     private async deleteProductOperations(
         ctx: RequestContext,
         productId: ID,
-    ): Promise<BulkVariantOperation[]> {
+        index: string = VARIANT_INDEX_NAME,
+    ): Promise<void> {
         const channels = await this.requestContextCache.get(ctx, 'elastic-index-all-channels', () =>
-            this.connection
+            this.connection.rawConnection
                 .getRepository(Channel)
                 .createQueryBuilder('channel')
                 .select('channel.id')
@@ -696,17 +713,15 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
             .andWhere('channel.id = :channelId', { channelId: ctx.channelId })
             .getOne();
 
-        if (!product) {
-            return [];
-        }
+        if (!product) return;
 
         Logger.debug(`Deleting 1 Product (id: ${productId})`, loggerCtx);
-        const operations: BulkVariantOperation[] = [];
+        let operations: BulkVariantOperation[] = [];
         const languageVariants: LanguageCode[] = [];
         languageVariants.push(...product.translations.map(t => t.languageCode));
-        for (const variant of product.variants) {
+        for (const variant of product.variants)
             languageVariants.push(...variant.translations.map(t => t.languageCode));
-        }
+
         const uniqueLanguageVariants = unique(languageVariants);
 
         for (const { id: channelId } of channels) {
@@ -719,25 +734,42 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         },
                     },
                 });
+                if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
+                    // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+                    await this.executeBulkOperationsByChunks(
+                        this.options.reindexBulkOperationSizeLimit,
+                        operations,
+                        index,
+                    );
+                    operations = [];
+                }
             }
         }
-        operations.push(
-            ...(await this.deleteVariantsInternalOperations(
-                product.variants,
-                channels.map(c => c.id),
-                uniqueLanguageVariants,
-            )),
+        // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+        await this.executeBulkOperationsByChunks(
+            this.options.reindexBulkOperationSizeLimit,
+            operations,
+            index,
+        );
+
+        await this.deleteVariantsInternalOperations(
+            product.variants,
+            channels.map(c => c.id),
+            uniqueLanguageVariants,
+            index,
         );
-        return operations;
+
+        return;
     }
 
     private async deleteVariantsInternalOperations(
         variants: ProductVariant[],
         channelIds: ID[],
         languageVariants: LanguageCode[],
-    ): Promise<BulkVariantOperation[]> {
+        index = VARIANT_INDEX_NAME,
+    ): Promise<void> {
         Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx);
-        const operations: BulkVariantOperation[] = [];
+        let operations: BulkVariantOperation[] = [];
         for (const variant of variants) {
             for (const channelId of channelIds) {
                 for (const languageCode of languageVariants) {
@@ -753,10 +785,25 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                             },
                         },
                     });
+                    if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
+                        // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+                        await this.executeBulkOperationsByChunks(
+                            this.options.reindexBulkOperationSizeLimit,
+                            operations,
+                            index,
+                        );
+                        operations = [];
+                    }
                 }
             }
         }
-        return operations;
+        // Because we can have a huge amount of variant for 1 product, we also chunk update operations
+        await this.executeBulkOperationsByChunks(
+            this.options.reindexBulkOperationSizeLimit,
+            operations,
+            index,
+        );
+        return;
     }
 
     private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {

+ 16 - 6
packages/elasticsearch-plugin/src/options.ts

@@ -16,7 +16,6 @@ import {
     CustomScriptMapping,
     ElasticSearchInput,
     ElasticSearchSortInput,
-    ElasticSearchSortParameter,
     GraphQlPrimitive,
     PrimitiveTypeVariations,
 } from './types';
@@ -159,12 +158,22 @@ export interface ElasticsearchOptions {
     };
     /**
      * @description
-     * Batch size for bulk operations (e.g. when rebuilding the indices).
+     * Products limit chunk size for each loop iteration when indexing products.
      *
-     * @default
-     * 2000
+     * @default 2500
+     * @since 2.1.7
+     */
+    reindexProductsChunkSize?: number;
+    /**
+     * @description
+     * Index operations are performed in bulk, with each bulk operation containing a number of individual
+     * index operations. This option sets the maximum number of operations in the memory buffer before a
+     * bulk operation is executed.
+     *
+     * @default 3000
+     * @since 2.1.7
      */
-    batchSize?: number;
+    reindexBulkOperationSizeLimit?: number;
     /**
      * @description
      * Configuration of the internal Elasticsearch query.
@@ -711,7 +720,8 @@ export const defaultOptions: ElasticsearchRuntimeOptions = {
     indexPrefix: 'vendure-',
     indexSettings: {},
     indexMappingProperties: {},
-    batchSize: 2000,
+    reindexProductsChunkSize: 2500,
+    reindexBulkOperationSizeLimit: 3000,
     searchConfig: {
         facetValueMaxSize: 50,
         collectionMaxSize: 50,