Browse Source

perf(core): Improve perf of DefaultSearchPlugin reindex job

This optimization makes use of a mutable RequestContext that is shared by the entire job,
allowing the RequestContextCache optimizations to work as intended.

In tests this lead to a 50% speed increase when indexing ~12k variants, from ~150 seconds to
~99 seconds.
Michael Bromley 4 years ago
parent
commit
bfc72f2557

+ 20 - 24
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -32,6 +32,8 @@ import {
     VariantChannelMessageData,
 } from '../types';
 
+import { MutableRequestContext } from './mutable-request-context';
+
 export const BATCH_SIZE = 1000;
 export const variantRelations = [
     'product',
@@ -64,7 +66,7 @@ export class IndexerController {
     ) {}
 
     reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
-        const ctx = RequestContext.deserialize(rawContext);
+        const ctx = MutableRequestContext.deserialize(rawContext);
         return asyncObservable(async observer => {
             const timeStart = Date.now();
             const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
@@ -85,7 +87,7 @@ export class IndexerController {
                     .take(BATCH_SIZE)
                     .skip(i * BATCH_SIZE)
                     .getMany();
-                await this.saveVariants(variants);
+                await this.saveVariants(ctx, variants);
                 observer.next({
                     total: count,
                     completed: Math.min((i + 1) * BATCH_SIZE, count),
@@ -105,7 +107,7 @@ export class IndexerController {
         ctx: rawContext,
         ids,
     }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
-        const ctx = RequestContext.deserialize(rawContext);
+        const ctx = MutableRequestContext.deserialize(rawContext);
 
         return asyncObservable(async observer => {
             const timeStart = Date.now();
@@ -122,7 +124,7 @@ export class IndexerController {
                         relations: variantRelations,
                         where: { deletedAt: null },
                     });
-                    await this.saveVariants(batch);
+                    await this.saveVariants(ctx, batch);
                     observer.next({
                         total: ids.length,
                         completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
@@ -140,22 +142,22 @@ export class IndexerController {
     }
 
     async updateProduct(data: UpdateProductMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(data.ctx);
+        const ctx = MutableRequestContext.deserialize(data.ctx);
         return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
     }
 
     async updateVariants(data: UpdateVariantMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(data.ctx);
+        const ctx = MutableRequestContext.deserialize(data.ctx);
         return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
     }
 
     async deleteProduct(data: UpdateProductMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(data.ctx);
+        const ctx = MutableRequestContext.deserialize(data.ctx);
         return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
     }
 
     async deleteVariant(data: UpdateVariantMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(data.ctx);
+        const ctx = MutableRequestContext.deserialize(data.ctx);
         const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
         if (variants.length) {
             const languageVariants = unique([
@@ -173,22 +175,22 @@ export class IndexerController {
     }
 
     async assignProductToChannel(data: ProductChannelMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(data.ctx);
+        const ctx = MutableRequestContext.deserialize(data.ctx);
         return this.updateProductInChannel(ctx, data.productId, data.channelId);
     }
 
     async removeProductFromChannel(data: ProductChannelMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(data.ctx);
+        const ctx = MutableRequestContext.deserialize(data.ctx);
         return this.deleteProductInChannel(ctx, data.productId, data.channelId);
     }
 
     async assignVariantToChannel(data: VariantChannelMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(data.ctx);
+        const ctx = MutableRequestContext.deserialize(data.ctx);
         return this.updateVariantsInChannel(ctx, [data.productVariantId], data.channelId);
     }
 
     async removeVariantFromChannel(data: VariantChannelMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(data.ctx);
+        const ctx = MutableRequestContext.deserialize(data.ctx);
         const variant = await this.connection.getRepository(ProductVariant).findOne(data.productVariantId);
         const languageVariants = variant?.translations.map(t => t.languageCode) ?? [];
         await this.removeSearchIndexItems(data.channelId, [data.productVariantId], languageVariants);
@@ -224,7 +226,7 @@ export class IndexerController {
     }
 
     private async updateProductInChannel(
-        ctx: RequestContext,
+        ctx: MutableRequestContext,
         productId: ID,
         channelId: ID,
     ): Promise<boolean> {
@@ -250,7 +252,7 @@ export class IndexerController {
                 );
                 Logger.verbose(`Updating ${variantsInCurrentChannel.length} variants`, workerLoggerCtx);
                 if (variantsInCurrentChannel.length) {
-                    await this.saveVariants(variantsInCurrentChannel);
+                    await this.saveVariants(ctx, variantsInCurrentChannel);
                 }
             }
         }
@@ -258,7 +260,7 @@ export class IndexerController {
     }
 
     private async updateVariantsInChannel(
-        ctx: RequestContext,
+        ctx: MutableRequestContext,
         variantIds: ID[],
         channelId: ID,
     ): Promise<boolean> {
@@ -268,7 +270,7 @@ export class IndexerController {
         });
         if (variants) {
             Logger.verbose(`Updating ${variants.length} variants`, workerLoggerCtx);
-            await this.saveVariants(variants);
+            await this.saveVariants(ctx, variants);
         }
         return true;
     }
@@ -315,7 +317,7 @@ export class IndexerController {
         return qb;
     }
 
-    private async saveVariants(variants: ProductVariant[]) {
+    private async saveVariants(ctx: MutableRequestContext, variants: ProductVariant[]) {
         const items: SearchIndexItem[] = [];
 
         await this.removeSyntheticVariants(variants);
@@ -333,13 +335,7 @@ export class IndexerController {
                 );
 
                 for (const channel of variant.channels) {
-                    const ctx = new RequestContext({
-                        channel,
-                        apiType: 'admin',
-                        authorizedAsOwnerOnly: false,
-                        isAuthorized: true,
-                        session: {} as any,
-                    });
+                    ctx.setChannel(channel);
                     await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
                     const item = new SearchIndexItem({
                         channelId: channel.id,

+ 45 - 0
packages/core/src/plugin/default-search-plugin/indexer/mutable-request-context.ts

@@ -0,0 +1,45 @@
+import { ID } from '@vendure/common/lib/shared-types';
+
+import { RequestContext, SerializedRequestContext } from '../../../api/common/request-context';
+import { Channel } from '../../../entity/channel/channel.entity';
+
+/**
+ * @description
+ * This is used during search index creation to allow us to use a single
+ * RequestContext, but mutate the Channel. In this way, we can take
+ * full advantage of the RequestContextCacheService, and _massively_ cut
+ * down on the number of DB calls being made during indexing.
+ */
+export class MutableRequestContext extends RequestContext {
+    constructor(options: ConstructorParameters<typeof RequestContext>[0]) {
+        super(options);
+    }
+    private mutatedChannel: Channel | undefined;
+
+    setChannel(channel: Channel) {
+        this.mutatedChannel = channel;
+    }
+
+    get channel(): Channel {
+        return this.mutatedChannel ?? super.channel;
+    }
+
+    get channelId(): ID {
+        return this.mutatedChannel?.id ?? super.channelId;
+    }
+
+    static deserialize(ctxObject: SerializedRequestContext): MutableRequestContext {
+        return new MutableRequestContext({
+            req: ctxObject._req as any,
+            apiType: ctxObject._apiType,
+            channel: new Channel(ctxObject._channel),
+            session: {
+                ...ctxObject._session,
+                expires: ctxObject._session?.expires && new Date(ctxObject._session.expires),
+            },
+            languageCode: ctxObject._languageCode,
+            isAuthorized: ctxObject._isAuthorized,
+            authorizedAsOwnerOnly: ctxObject._authorizedAsOwnerOnly,
+        });
+    }
+}