Browse Source

refactor(elasticsearch-plugin): Execute all bulk index updates at once

Previously, Elasticsearch bulk update operations were scattered throughout the execution of a method
e.g. `updateProductsInternal` would bulk delete products, bulk update variants, bulk insert
products. Between each of these bulk operations there was async DB queries. This intervening time
allowed for race conditions to creep in which manifested as non-deterministically failing e2e tests.

In real-world usage it may not have been noticeable, but in any case this commit groups _all_ batch
operations together, and only at the very end of the method, all batch operations are passed to
ES at once.
Michael Bromley 4 years ago
parent
commit
06da6d5b93

+ 1 - 0
packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts

@@ -403,6 +403,7 @@ describe('Elasticsearch plugin', () => {
                     },
                     },
                 );
                 );
 
 
+                await awaitRunningJobs(adminClient);
                 await awaitRunningJobs(adminClient);
                 await awaitRunningJobs(adminClient);
                 const { search: search2 } = await doAdminSearchQuery(adminClient, {
                 const { search: search2 } = await doAdminSearchQuery(adminClient, {
                     term: 'drive',
                     term: 'drive',

+ 138 - 64
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -65,6 +65,15 @@ export interface ReindexMessageResponse {
     duration: number;
     duration: number;
 }
 }
 
 
+type BulkProductOperation = {
+    index: typeof PRODUCT_INDEX_NAME;
+    operation: BulkOperation | BulkOperationDoc<ProductIndexItem>;
+};
+type BulkVariantOperation = {
+    index: typeof VARIANT_INDEX_NAME;
+    operation: BulkOperation | BulkOperationDoc<VariantIndexItem>;
+};
+
 @Injectable()
 @Injectable()
 export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
 export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
     private client: Client;
     private client: Client;
@@ -100,7 +109,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
      * Updates the search index only for the affected product.
      * Updates the search index only for the affected product.
      */
      */
     async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
     async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
-        await this.deleteProductInternal(productId);
+        const operations = await this.deleteProductOperations(productId);
+        await this.executeBulkOperations(operations);
         return true;
         return true;
     }
     }
 
 
@@ -201,6 +211,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         return asyncObservable(async observer => {
         return asyncObservable(async observer => {
             return this.asyncQueue.push(async () => {
             return this.asyncQueue.push(async () => {
                 const timeStart = Date.now();
                 const timeStart = Date.now();
+                const operations: Array<BulkProductOperation | BulkVariantOperation> = [];
 
 
                 if (dropIndices) {
                 if (dropIndices) {
                     await deleteIndices(this.client, this.options.indexPrefix);
                     await deleteIndices(this.client, this.options.indexPrefix);
@@ -219,7 +230,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                     .getMany();
                     .getMany();
 
 
                 for (const { id: deletedProductId } of deletedProductIds) {
                 for (const { id: deletedProductId } of deletedProductIds) {
-                    await this.deleteProductInternal(deletedProductId);
+                    operations.push(...(await this.deleteProductOperations(deletedProductId)));
                 }
                 }
 
 
                 const productIds = await this.connection
                 const productIds = await this.connection
@@ -233,7 +244,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
 
 
                 let finishedProductsCount = 0;
                 let finishedProductsCount = 0;
                 for (const { id: productId } of productIds) {
                 for (const { id: productId } of productIds) {
-                    await this.updateProductsInternal([productId]);
+                    operations.push(...(await this.updateProductsOperations([productId])));
                     finishedProductsCount++;
                     finishedProductsCount++;
                     observer.next({
                     observer.next({
                         total: productIds.length,
                         total: productIds.length,
@@ -241,6 +252,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         duration: +new Date() - timeStart,
                         duration: +new Date() - timeStart,
                     });
                     });
                 }
                 }
+                Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx);
+                await this.executeBulkOperations(operations);
                 Logger.verbose(`Completed reindexing!`, loggerCtx);
                 Logger.verbose(`Completed reindexing!`, loggerCtx);
                 return {
                 return {
                     total: productIds.length,
                     total: productIds.length,
@@ -338,42 +351,19 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         return result1.body.failures.length === 0 && result2.body.failures === 0;
         return result1.body.failures.length === 0 && result2.body.failures === 0;
     }
     }
 
 
-    private async updateVariantsInternal(productVariants: ProductVariant[]) {
-        if (productVariants.length) {
-            const operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
-            for (const variant of productVariants) {
-                const languageVariants = variant.translations.map(t => t.languageCode);
-                for (const channel of variant.channels) {
-                    const channelCtx = new RequestContext({
-                        channel,
-                        apiType: 'admin',
-                        authorizedAsOwnerOnly: false,
-                        isAuthorized: true,
-                        session: {} as any,
-                    });
-                    await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
-                    for (const languageCode of languageVariants) {
-                        operations.push(
-                            { update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) } },
-                            {
-                                doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode),
-                                doc_as_upsert: true,
-                            },
-                        );
-                    }
-                }
-            }
-            Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx);
-            await this.executeBulkOperations(VARIANT_INDEX_NAME, operations);
-        }
+    private async updateProductsInternal(productIds: ID[]) {
+        const operations = await this.updateProductsOperations(productIds);
+        await this.executeBulkOperations(operations);
     }
     }
 
 
-    private async updateProductsInternal(productIds: ID[]) {
+    private async updateProductsOperations(
+        productIds: ID[],
+    ): Promise<Array<BulkProductOperation | BulkVariantOperation>> {
         Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx);
         Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx);
-        const operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
+        const operations: Array<BulkProductOperation | BulkVariantOperation> = [];
 
 
         for (const productId of productIds) {
         for (const productId of productIds) {
-            await this.deleteProductInternal(productId);
+            operations.push(...(await this.deleteProductOperations(productId)));
             const product = await this.connection.getRepository(Product).findOne(productId, {
             const product = await this.connection.getRepository(Product).findOne(productId, {
                 relations: productRelations,
                 relations: productRelations,
                 where: {
                 where: {
@@ -399,7 +389,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 }
                 }
                 Logger.verbose(`Updating Product (${productId})`, loggerCtx);
                 Logger.verbose(`Updating Product (${productId})`, loggerCtx);
                 if (updatedProductVariants.length) {
                 if (updatedProductVariants.length) {
-                    await this.updateVariantsInternal(updatedProductVariants);
+                    operations.push(...(await this.updateVariantsOperations(updatedProductVariants)));
                 }
                 }
 
 
                 const languageVariants = product.translations.map(t => t.languageCode);
                 const languageVariants = product.translations.map(t => t.languageCode);
@@ -422,29 +412,82 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                     for (const languageCode of languageVariants) {
                     for (const languageCode of languageVariants) {
                         operations.push(
                         operations.push(
                             {
                             {
-                                update: {
-                                    _id: this.getId(product.id, channelCtx.channelId, languageCode),
+                                index: PRODUCT_INDEX_NAME,
+                                operation: {
+                                    update: {
+                                        _id: this.getId(product.id, channelCtx.channelId, languageCode),
+                                    },
                                 },
                                 },
                             },
                             },
                             {
                             {
-                                doc: variantsInChannel.length
-                                    ? this.createProductIndexItem(
-                                          variantsInChannel,
-                                          channelCtx.channelId,
-                                          languageCode,
-                                      )
-                                    : this.createSyntheticProductIndexItem(channelCtx, product, languageCode),
-                                doc_as_upsert: true,
+                                index: PRODUCT_INDEX_NAME,
+                                operation: {
+                                    doc: variantsInChannel.length
+                                        ? this.createProductIndexItem(
+                                              variantsInChannel,
+                                              channelCtx.channelId,
+                                              languageCode,
+                                          )
+                                        : this.createSyntheticProductIndexItem(
+                                              channelCtx,
+                                              product,
+                                              languageCode,
+                                          ),
+                                    doc_as_upsert: true,
+                                },
                             },
                             },
                         );
                         );
                     }
                     }
                 }
                 }
             }
             }
         }
         }
-        await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
+        return operations;
     }
     }
 
 
-    private async deleteProductInternal(productId: ID) {
+    private async updateVariantsOperations(
+        productVariants: ProductVariant[],
+    ): Promise<BulkVariantOperation[]> {
+        if (productVariants.length === 0) {
+            return [];
+        }
+        const operations: BulkVariantOperation[] = [];
+        for (const variant of productVariants) {
+            const languageVariants = variant.translations.map(t => t.languageCode);
+            for (const channel of variant.channels) {
+                const channelCtx = new RequestContext({
+                    channel,
+                    apiType: 'admin',
+                    authorizedAsOwnerOnly: false,
+                    isAuthorized: true,
+                    session: {} as any,
+                });
+                await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
+                for (const languageCode of languageVariants) {
+                    operations.push(
+                        {
+                            index: VARIANT_INDEX_NAME,
+                            operation: {
+                                update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) },
+                            },
+                        },
+                        {
+                            index: VARIANT_INDEX_NAME,
+                            operation: {
+                                doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode),
+                                doc_as_upsert: true,
+                            },
+                        },
+                    );
+                }
+            }
+        }
+        Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx);
+        return operations;
+    }
+
+    private async deleteProductOperations(
+        productId: ID,
+    ): Promise<Array<BulkProductOperation | BulkVariantOperation>> {
         const channels = await this.connection
         const channels = await this.connection
             .getRepository(Channel)
             .getRepository(Channel)
             .createQueryBuilder('channel')
             .createQueryBuilder('channel')
@@ -453,37 +496,50 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         const product = await this.connection.getRepository(Product).findOne(productId, {
         const product = await this.connection.getRepository(Product).findOne(productId, {
             relations: ['variants'],
             relations: ['variants'],
         });
         });
-        if (product) {
-            Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx);
-            const operations: BulkOperation[] = [];
-            for (const { id: channelId } of channels) {
-                const languageVariants = product.translations.map(t => t.languageCode);
-                for (const languageCode of languageVariants) {
-                    operations.push({ delete: { _id: this.getId(product.id, channelId, languageCode) } });
-                }
+        if (!product) {
+            return [];
+        }
+
+        Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx);
+        const operations: Array<BulkProductOperation | BulkVariantOperation> = [];
+        for (const { id: channelId } of channels) {
+            const languageVariants = product.translations.map(t => t.languageCode);
+            for (const languageCode of languageVariants) {
+                operations.push({
+                    index: PRODUCT_INDEX_NAME,
+                    operation: { delete: { _id: this.getId(product.id, channelId, languageCode) } },
+                });
             }
             }
-            await this.deleteVariantsInternal(
+        }
+        operations.push(
+            ...(await this.deleteVariantsInternalOperations(
                 product.variants,
                 product.variants,
                 channels.map(c => c.id),
                 channels.map(c => c.id),
-            );
-            await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
-        }
+            )),
+        );
+        return operations;
     }
     }
 
 
-    private async deleteVariantsInternal(variants: ProductVariant[], channelIds: ID[]) {
+    private async deleteVariantsInternalOperations(
+        variants: ProductVariant[],
+        channelIds: ID[],
+    ): Promise<BulkVariantOperation[]> {
         Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx);
         Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx);
-        const operations: BulkOperation[] = [];
+        const operations: BulkVariantOperation[] = [];
         for (const variant of variants) {
         for (const variant of variants) {
             for (const channelId of channelIds) {
             for (const channelId of channelIds) {
                 const languageVariants = variant.translations.map(t => t.languageCode);
                 const languageVariants = variant.translations.map(t => t.languageCode);
                 for (const languageCode of languageVariants) {
                 for (const languageCode of languageVariants) {
                     operations.push({
                     operations.push({
-                        delete: { _id: this.getId(variant.id, channelId, languageCode) },
+                        index: VARIANT_INDEX_NAME,
+                        operation: {
+                            delete: { _id: this.getId(variant.id, channelId, languageCode) },
+                        },
                     });
                     });
                 }
                 }
             }
             }
         }
         }
-        await this.executeBulkOperations(VARIANT_INDEX_NAME, operations);
+        return operations;
     }
     }
 
 
     private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {
     private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {
@@ -494,7 +550,25 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         return unique(variants.map(v => v.product.id));
         return unique(variants.map(v => v.product.id));
     }
     }
 
 
-    private async executeBulkOperations(
+    private async executeBulkOperations(operations: Array<BulkProductOperation | BulkVariantOperation>) {
+        const productOperations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
+        const variantOperations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
+
+        for (const operation of operations) {
+            if (operation.index === PRODUCT_INDEX_NAME) {
+                productOperations.push(operation.operation);
+            } else {
+                variantOperations.push(operation.operation);
+            }
+        }
+
+        return Promise.all([
+            this.runBulkOperationsOnIndex(PRODUCT_INDEX_NAME, productOperations),
+            this.runBulkOperationsOnIndex(VARIANT_INDEX_NAME, variantOperations),
+        ]);
+    }
+
+    private async runBulkOperationsOnIndex(
         indexName: string,
         indexName: string,
         operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
         operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
     ) {
     ) {