ソースを参照

perf(elasticsearch-plugin): Optimize indexing using RequestContextCache

This perf optimization uses the RequestContextCacheService to cache the results of DB calls which
get performed many times during indexing, i.e. at least once for every ProductVariant being indexed.

In testing this cut down the time to index ~10k variants by around 20% (3 mins to 2.5 mins).

This commit also adjusts some logging to `debug` from `verbose` as it was too noisy even for
verbose.
Michael Bromley 4 年 前
コミット
75da3b39a3

+ 62 - 45
packages/elasticsearch-plugin/src/indexing/indexer.controller.ts

@@ -43,6 +43,7 @@ import {
 } from '../types';
 
 import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils';
+import { MutableRequestContext } from './mutable-request-context';
 
 export const defaultProductRelations: Array<EntityRelationPaths<Product>> = [
     'variants',
@@ -110,7 +111,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
      * Updates the search index only for the affected product.
      */
     async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
-        await this.updateProductsInternal([productId]);
+        const ctx = MutableRequestContext.deserialize(rawContext);
+        await this.updateProductsInternal(ctx, [productId]);
         return true;
     }
 
@@ -118,7 +120,10 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
      * Updates the search index only for the affected product.
      */
     async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
-        const operations = await this.deleteProductOperations(productId);
+        const operations = await this.deleteProductOperations(
+            RequestContext.deserialize(rawContext),
+            productId,
+        );
         await this.executeBulkOperations(operations);
         return true;
     }
@@ -131,7 +136,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         productId,
         channelId,
     }: ProductChannelMessageData): Promise<boolean> {
-        await this.updateProductsInternal([productId]);
+        const ctx = MutableRequestContext.deserialize(rawContext);
+        await this.updateProductsInternal(ctx, [productId]);
         return true;
     }
 
@@ -143,7 +149,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         productId,
         channelId,
     }: ProductChannelMessageData): Promise<boolean> {
-        await this.updateProductsInternal([productId]);
+        const ctx = MutableRequestContext.deserialize(rawContext);
+        await this.updateProductsInternal(ctx, [productId]);
         return true;
     }
 
@@ -153,7 +160,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         channelId,
     }: VariantChannelMessageData): Promise<boolean> {
         const productIds = await this.getProductIdsByVariantIds([productVariantId]);
-        await this.updateProductsInternal(productIds);
+        const ctx = MutableRequestContext.deserialize(rawContext);
+        await this.updateProductsInternal(ctx, productIds);
         return true;
     }
 
@@ -163,7 +171,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         channelId,
     }: VariantChannelMessageData): Promise<boolean> {
         const productIds = await this.getProductIdsByVariantIds([productVariantId]);
-        await this.updateProductsInternal(productIds);
+        const ctx = MutableRequestContext.deserialize(rawContext);
+        await this.updateProductsInternal(ctx, productIds);
         return true;
     }
 
@@ -171,17 +180,19 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
      * Updates the search index only for the affected entities.
      */
     async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
+        const ctx = MutableRequestContext.deserialize(rawContext);
         return this.asyncQueue.push(async () => {
             const productIds = await this.getProductIdsByVariantIds(variantIds);
-            await this.updateProductsInternal(productIds);
+            await this.updateProductsInternal(ctx, productIds);
             return true;
         });
     }
 
     async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
+        const ctx = MutableRequestContext.deserialize(rawContext);
         const productIds = await this.getProductIdsByVariantIds(variantIds);
         for (const productId of productIds) {
-            await this.updateProductsInternal([productId]);
+            await this.updateProductsInternal(ctx, [productId]);
         }
         return true;
     }
@@ -190,6 +201,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         ctx: rawContext,
         ids,
     }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
+        const ctx = MutableRequestContext.deserialize(rawContext);
         return asyncObservable(async observer => {
             return this.asyncQueue.push(async () => {
                 const timeStart = Date.now();
@@ -197,7 +209,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 if (productIds.length) {
                     let finishedProductsCount = 0;
                     for (const productId of productIds) {
-                        await this.updateProductsInternal([productId]);
+                        await this.updateProductsInternal(ctx, [productId]);
                         finishedProductsCount++;
                         observer.next({
                             total: productIds.length,
@@ -221,6 +233,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
             return this.asyncQueue.push(async () => {
                 const timeStart = Date.now();
                 const operations: BulkVariantOperation[] = [];
+                const ctx = MutableRequestContext.deserialize(rawContext);
 
                 const reindexTempName = new Date().getTime();
                 const variantIndexName = this.options.indexPrefix + VARIANT_INDEX_NAME;
@@ -330,7 +343,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                     .getMany();
 
                 for (const { id: deletedProductId } of deletedProductIds) {
-                    operations.push(...(await this.deleteProductOperations(deletedProductId)));
+                    operations.push(...(await this.deleteProductOperations(ctx, deletedProductId)));
                 }
 
                 const productIds = await this.connection
@@ -344,7 +357,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
 
                 let finishedProductsCount = 0;
                 for (const { id: productId } of productIds) {
-                    operations.push(...(await this.updateProductsOperations([productId])));
+                    operations.push(...(await this.updateProductsOperations(ctx, [productId])));
                     finishedProductsCount++;
                     observer.next({
                         total: productIds.length,
@@ -443,17 +456,20 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         return result1.body.failures.length === 0 && result2.body.failures === 0;
     }
 
-    private async updateProductsInternal(productIds: ID[]) {
-        const operations = await this.updateProductsOperations(productIds);
+    private async updateProductsInternal(ctx: MutableRequestContext, productIds: ID[]) {
+        const operations = await this.updateProductsOperations(ctx, productIds);
         await this.executeBulkOperations(operations);
     }
 
-    private async updateProductsOperations(productIds: ID[]): Promise<BulkVariantOperation[]> {
-        Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx);
+    private async updateProductsOperations(
+        ctx: MutableRequestContext,
+        productIds: ID[],
+    ): Promise<BulkVariantOperation[]> {
+        Logger.debug(`Updating ${productIds.length} Products`, loggerCtx);
         const operations: BulkVariantOperation[] = [];
 
         for (const productId of productIds) {
-            operations.push(...(await this.deleteProductOperations(productId)));
+            operations.push(...(await this.deleteProductOperations(ctx, productId)));
 
             let product: Product | undefined;
             try {
@@ -485,7 +501,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 if (!product.enabled) {
                     updatedProductVariants.forEach(v => (v.enabled = false));
                 }
-                Logger.verbose(`Updating Product (${productId})`, loggerCtx);
+                Logger.debug(`Updating Product (${productId})`, loggerCtx);
                 const languageVariants: LanguageCode[] = [];
                 languageVariants.push(...product.translations.map(t => t.languageCode));
                 for (const variant of product.variants) {
@@ -494,19 +510,13 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 const uniqueLanguageVariants = unique(languageVariants);
 
                 for (const channel of product.channels) {
-                    const channelCtx = new RequestContext({
-                        channel,
-                        apiType: 'admin',
-                        authorizedAsOwnerOnly: false,
-                        isAuthorized: true,
-                        session: {} as any,
-                    });
+                    ctx.setChannel(channel);
 
                     const variantsInChannel = updatedProductVariants.filter(v =>
-                        v.channels.map(c => c.id).includes(channelCtx.channelId),
+                        v.channels.map(c => c.id).includes(ctx.channelId),
                     );
                     for (const variant of variantsInChannel) {
-                        await this.productPriceApplicator.applyChannelPriceAndTax(variant, channelCtx);
+                        await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
                     }
                     for (const languageCode of uniqueLanguageVariants) {
                         if (variantsInChannel.length) {
@@ -518,7 +528,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                                             update: {
                                                 _id: ElasticsearchIndexerController.getId(
                                                     variant.id,
-                                                    channelCtx.channelId,
+                                                    ctx.channelId,
                                                     languageCode,
                                                 ),
                                             },
@@ -530,7 +540,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                                             doc: await this.createVariantIndexItem(
                                                 variant,
                                                 variantsInChannel,
-                                                channelCtx,
+                                                ctx,
                                                 languageCode,
                                             ),
                                             doc_as_upsert: true,
@@ -546,7 +556,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                                         update: {
                                             _id: ElasticsearchIndexerController.getId(
                                                 -product.id,
-                                                channelCtx.channelId,
+                                                ctx.channelId,
                                                 languageCode,
                                             ),
                                         },
@@ -555,11 +565,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                                 {
                                     index: VARIANT_INDEX_NAME,
                                     operation: {
-                                        doc: this.createSyntheticProductIndexItem(
-                                            product,
-                                            channelCtx,
-                                            languageCode,
-                                        ),
+                                        doc: this.createSyntheticProductIndexItem(product, ctx, languageCode),
                                         doc_as_upsert: true,
                                     },
                                 },
@@ -601,12 +607,17 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         return uniqueRelations;
     }
 
-    private async deleteProductOperations(productId: ID): Promise<BulkVariantOperation[]> {
-        const channels = await this.connection
-            .getRepository(Channel)
-            .createQueryBuilder('channel')
-            .select('channel.id')
-            .getMany();
+    private async deleteProductOperations(
+        ctx: RequestContext,
+        productId: ID,
+    ): Promise<BulkVariantOperation[]> {
+        const channels = await this.requestContextCache.get(ctx, `elastic-index-all-channels`, () =>
+            this.connection
+                .getRepository(Channel)
+                .createQueryBuilder('channel')
+                .select('channel.id')
+                .getMany(),
+        );
         const product = await this.connection.getRepository(Product).findOne(productId, {
             relations: ['variants'],
         });
@@ -614,7 +625,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
             return [];
         }
 
-        Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx);
+        Logger.debug(`Deleting 1 Product (id: ${productId})`, loggerCtx);
         const operations: BulkVariantOperation[] = [];
         const languageVariants: LanguageCode[] = [];
         languageVariants.push(...product.translations.map(t => t.languageCode));
@@ -650,7 +661,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         channelIds: ID[],
         languageVariants: LanguageCode[],
     ): Promise<BulkVariantOperation[]> {
-        Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx);
+        Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx);
         const operations: BulkVariantOperation[] = [];
         for (const variant of variants) {
             for (const channelId of channelIds) {
@@ -820,10 +831,16 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     }
 
     private async getProductInStockValue(ctx: RequestContext, variants: ProductVariant[]): Promise<boolean> {
-        const stockLevels = await Promise.all(
-            variants.map(variant => this.productVariantService.getSaleableStockLevel(ctx, variant)),
+        return this.requestContextCache.get(
+            ctx,
+            `elastic-index-product-in-stock-${variants.map(v => v.id).join(',')}`,
+            async () => {
+                const stockLevels = await Promise.all(
+                    variants.map(variant => this.productVariantService.getSaleableStockLevel(ctx, variant)),
+                );
+                return stockLevels.some(stockLevel => 0 < stockLevel);
+            },
         );
-        return stockLevels.some(stockLevel => 0 < stockLevel);
     }
 
     /**

+ 42 - 0
packages/elasticsearch-plugin/src/indexing/mutable-request-context.ts

@@ -0,0 +1,42 @@
+import { Channel, ID, RequestContext, SerializedRequestContext } from '@vendure/core';
+
+/**
+ * @description
+ * This is used during search index creation to allow us to use a single
+ * RequestContext, but mutate the Channel. In this way, we can take
+ * full advantage of the RequestContextCacheService, and _massively_ cut
+ * down on the number of DB calls being made during indexing.
+ */
+export class MutableRequestContext extends RequestContext {
+    constructor(options: ConstructorParameters<typeof RequestContext>[0]) {
+        super(options);
+    }
+    private mutatedChannel: Channel | undefined;
+
+    setChannel(channel: Channel) {
+        this.mutatedChannel = channel;
+    }
+
+    get channel(): Channel {
+        return this.mutatedChannel ?? super.channel;
+    }
+
+    get channelId(): ID {
+        return this.mutatedChannel?.id ?? super.channelId;
+    }
+
+    static deserialize(ctxObject: SerializedRequestContext): MutableRequestContext {
+        return new MutableRequestContext({
+            req: ctxObject._req as any,
+            apiType: ctxObject._apiType,
+            channel: new Channel(ctxObject._channel),
+            session: {
+                ...ctxObject._session,
+                expires: ctxObject._session?.expires && new Date(ctxObject._session.expires),
+            },
+            languageCode: ctxObject._languageCode,
+            isAuthorized: ctxObject._isAuthorized,
+            authorizedAsOwnerOnly: ctxObject._authorizedAsOwnerOnly,
+        });
+    }
+}