Bläddra i källkod

fix(core): Update DefaultSearchPlugin indexer controller to avoid TypeORM memory leak (#2883)

Balazs Gallay 1 år sedan
förälder
incheckning
ee2c177b9e
1 ändrade filer med 99 tillägg och 148 borttagningar
  1. 99 148
      packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

+ 99 - 148
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -4,7 +4,7 @@ import { ID } from '@vendure/common/lib/shared-types';
 import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 import { unique } from '@vendure/common/lib/unique';
 import { Observable } from 'rxjs';
-import { FindManyOptions, In } from 'typeorm';
+import { Equal, FindManyOptions, FindOptionsWhere, In, IsNull } from 'typeorm';
 
 import { RequestContext } from '../../../api/common/request-context';
 import { RequestContextCacheService } from '../../../cache/request-context-cache.service';
@@ -39,17 +39,31 @@ import {
 import { MutableRequestContext } from './mutable-request-context';
 
 export const BATCH_SIZE = 1000;
-export const productRelations = ['featuredAsset', 'facetValues', 'facetValues.facet', 'channels'];
-export const variantRelations = [
+export const productRelations = [
+    'translations',
     'featuredAsset',
     'facetValues',
     'facetValues.facet',
-    'collections',
+    'variants',
+    'channels',
+];
+export const variantRelations = [
+    'translations',
     'taxCategory',
+    'featuredAsset',
+    'productVariantPrices',
     'channels',
     'channels.defaultTaxZone',
+    'facetValues',
+    'facetValues.facet',
+    'product',
+    'product.translations',
+    'product.channels',
+    'product.facetValues',
+    'product.facetValues.facet',
+    'collections',
+    'collections.translations',
 ];
-
 export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
 
 @Injectable()
@@ -72,8 +86,7 @@ export class IndexerController {
         return asyncObservable(async observer => {
             const timeStart = Date.now();
             const channel = ctx.channel ?? (await this.loadChannel(ctx, ctx.channelId));
-            const qb = this.getSearchIndexQueryBuilder(ctx, channel);
-            const count = await qb.getCount();
+            const { count } = await this.getSearchIndexQueryBuilder(ctx, { channels: [channel] });
             Logger.verbose(`Reindexing ${count} variants for channel ${ctx.channel.code}`, workerLoggerCtx);
             const batches = Math.ceil(count / BATCH_SIZE);
 
@@ -85,10 +98,11 @@ export class IndexerController {
                     throw new Error('reindex job was cancelled');
                 }
                 Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
-                const variants = await qb
-                    .take(BATCH_SIZE)
-                    .skip(i * BATCH_SIZE)
-                    .getMany();
+                const { variants } = await this.getSearchIndexQueryBuilder(ctx, {
+                    channels: [channel],
+                    take: BATCH_SIZE,
+                    skip: i * BATCH_SIZE,
+                });
                 await this.saveVariants(ctx, variants);
                 observer.next({
                     total: count,
@@ -124,12 +138,10 @@ export class IndexerController {
                     const end = begin + BATCH_SIZE;
                     Logger.verbose(`Updating ids from index ${begin} to ${end}`);
                     const batchIds = ids.slice(begin, end);
-                    const batch = await this.getSearchIndexQueryBuilder(
-                        ctx,
-                        ...(await this.getAllChannels(ctx)),
-                    )
-                        .where('variants.deletedAt IS NULL AND variants.id IN (:...ids)', { ids: batchIds })
-                        .getMany();
+                    const { variants: batch } = await this.getSearchIndexQueryBuilder(ctx, {
+                        channels: await this.getAllChannels(ctx),
+                        productVariantIds: batchIds,
+                    });
 
                     await this.saveVariants(ctx, batch);
                     observer.next({
@@ -245,19 +257,18 @@ export class IndexerController {
     ): Promise<boolean> {
         const channel = await this.loadChannel(ctx, channelId);
         ctx.setChannel(channel);
-        const product = await this.getProductInChannelQueryBuilder(ctx, productId, channel).getOneOrFail();
+        const product = await this.getProductInChannelQueryBuilder(ctx, productId, channel);
+
         if (product) {
             const affectedChannels = await this.getAllChannels(ctx, {
                 where: {
                     availableLanguageCodes: In(product.translations.map(t => t.languageCode)),
                 },
             });
-            const updatedVariants = await this.getSearchIndexQueryBuilder(
-                ctx,
-                ...unique(affectedChannels.concat(channel)),
-            )
-                .andWhere('variants.productId = :productId', { productId })
-                .getMany();
+            const { variants: updatedVariants } = await this.getSearchIndexQueryBuilder(ctx, {
+                channels: unique(affectedChannels.concat(channel)),
+                productId,
+            });
             if (updatedVariants.length === 0) {
                 const clone = new Product({ id: product.id });
                 await this.entityHydrator.hydrate(ctx, clone, { relations: ['translations' as never] });
@@ -287,9 +298,10 @@ export class IndexerController {
     ): Promise<boolean> {
         const channel = await this.loadChannel(ctx, channelId);
         ctx.setChannel(channel);
-        const variants = await this.getSearchIndexQueryBuilder(ctx, channel)
-            .andWhere('variants.deletedAt IS NULL AND variants.id IN (:...variantIds)', { variantIds })
-            .getMany();
+        const { variants } = await this.getSearchIndexQueryBuilder(ctx, {
+            channels: [channel],
+            productVariantIds: variantIds,
+        });
 
         if (variants) {
             Logger.verbose(`Updating ${variants.length} variants`, workerLoggerCtx);
@@ -304,10 +316,7 @@ export class IndexerController {
         channelIds: ID[],
     ): Promise<boolean> {
         const channels = await Promise.all(channelIds.map(channelId => this.loadChannel(ctx, channelId)));
-        const product = await this.getProductInChannelQueryBuilder(ctx, productId, ...channels)
-            .leftJoinAndSelect('product.variants', 'variants')
-            .leftJoinAndSelect('variants.translations', 'variants_translations')
-            .getOne();
+        const product = await this.getProductInChannelQueryBuilder(ctx, productId, ...channels);
 
         if (product) {
             const removedVariantIds = product.variants.map(v => v.id);
@@ -326,125 +335,68 @@ export class IndexerController {
         ctx: RequestContext,
         options?: FindManyOptions<Channel> | undefined,
     ): Promise<Channel[]> {
-        return await this.connection.getRepository(ctx, Channel).find(options);
+        return await this.connection
+            .getRepository(ctx, Channel)
+            .find({ ...options, relationLoadStrategy: 'query' });
     }
 
-    private getSearchIndexQueryBuilder(ctx: RequestContext, ...channels: Channel[]) {
-        const channelLanguages = unique(
-            channels.flatMap(c => c.availableLanguageCodes).concat(this.configService.defaultLanguageCode),
-        );
-        const qb = this.connection
-            .getRepository(ctx, ProductVariant)
-            .createQueryBuilder('variants')
-            .setFindOptions({
-                loadEagerRelations: false,
-            })
-            .leftJoinAndSelect(
-                'variants.channels',
-                'variant_channels',
-                'variant_channels.id IN (:...channelId)',
-                {
-                    channelId: channels.map(x => x.id),
-                },
-            )
-            .leftJoinAndSelect('variant_channels.defaultTaxZone', 'variant_channel_tax_zone')
-            .leftJoinAndSelect('variants.taxCategory', 'variant_tax_category')
-            .leftJoinAndSelect(
-                'variants.productVariantPrices',
-                'product_variant_prices',
-                'product_variant_prices.channelId IN (:...channelId)',
-                { channelId: channels.map(x => x.id) },
-            )
-            .leftJoinAndSelect(
-                'variants.translations',
-                'product_variant_translation',
-                'product_variant_translation.baseId = variants.id AND product_variant_translation.languageCode IN (:...channelLanguages)',
-                {
-                    channelLanguages,
-                },
-            )
-            .leftJoin('variants.product', 'product')
-            .leftJoinAndSelect('variants.facetValues', 'variant_facet_values')
-            .leftJoinAndSelect(
-                'variant_facet_values.translations',
-                'variant_facet_value_translations',
-                'variant_facet_value_translations.languageCode IN (:...channelLanguages)',
-                {
-                    channelLanguages,
-                },
-            )
-            .leftJoinAndSelect('variant_facet_values.facet', 'facet_values_facet')
-            .leftJoinAndSelect(
-                'facet_values_facet.translations',
-                'facet_values_facet_translations',
-                'facet_values_facet_translations.languageCode IN (:...channelLanguages)',
-                {
-                    channelLanguages,
-                },
-            )
-            .leftJoinAndSelect('variants.collections', 'collections')
-            .leftJoinAndSelect(
-                'collections.channels',
-                'collection_channels',
-                'collection_channels.id IN (:...channelId)',
-                { channelId: channels.map(x => x.id) },
-            )
-            .leftJoinAndSelect(
-                'collections.translations',
-                'collection_translations',
-                'collection_translations.languageCode IN (:...channelLanguages)',
-                {
-                    channelLanguages,
-                },
-            )
-            .leftJoin('product.channels', 'channel')
-            .where('channel.id IN (:...channelId)', { channelId: channels.map(x => x.id) })
-            .andWhere('product.deletedAt IS NULL')
-            .andWhere('variants.deletedAt IS NULL');
-        return qb;
+    private async getSearchIndexQueryBuilder(
+        ctx: RequestContext,
+        options?: {
+            productId?: ID;
+            productVariantIds?: ID[];
+            channels?: Channel[];
+            take?: number;
+            skip?: number;
+        },
+    ): Promise<{ variants: ProductVariant[]; count: number }> {
+        const {
+            productId = undefined,
+            productVariantIds = undefined,
+            channels = [],
+            take = 50,
+            skip = 0,
+        } = options ?? {};
+        const where: FindOptionsWhere<ProductVariant> = {
+            deletedAt: IsNull(),
+            product: {
+                deletedAt: IsNull(),
+            },
+        };
+        if (productId) {
+            where.productId = productId;
+        }
+        if (productVariantIds && productVariantIds.length > 0) {
+            where.id = In(productVariantIds);
+        }
+        where.channels = { id: In(channels.map(c => c.id)) };
+        const [variants, count] = await this.connection.getRepository(ctx, ProductVariant).findAndCount({
+            loadEagerRelations: false,
+            relations: variantRelations,
+            where,
+            take,
+            skip,
+            relationLoadStrategy: 'query',
+        });
+        return { variants, count };
     }
 
-    private getProductInChannelQueryBuilder(ctx: RequestContext, productId: ID, ...channels: Channel[]) {
+    private async getProductInChannelQueryBuilder(
+        ctx: RequestContext,
+        productId: ID,
+        ...channels: Channel[]
+    ): Promise<Product | undefined> {
         const channelLanguages = unique(
             channels.flatMap(c => c.availableLanguageCodes).concat(this.configService.defaultLanguageCode),
         );
-        return this.connection
-            .getRepository(ctx, Product)
-            .createQueryBuilder('product')
-            .setFindOptions({
-                loadEagerRelations: false,
-            })
-            .leftJoinAndSelect(
-                'product.translations',
-                'translations',
-                'translations.languageCode IN (:...channelLanguages)',
-                {
-                    channelLanguages,
-                },
-            )
-            .leftJoinAndSelect('product.featuredAsset', 'product_featured_asset')
-            .leftJoinAndSelect('product.facetValues', 'product_facet_values')
-            .leftJoinAndSelect(
-                'product_facet_values.translations',
-                'product_facet_value_translations',
-                'product_facet_value_translations.languageCode IN (:...channelLanguages)',
-                {
-                    channelLanguages,
-                },
-            )
-            .leftJoinAndSelect('product_facet_values.facet', 'product_facet')
-            .leftJoinAndSelect(
-                'product_facet.translations',
-                'product_facet_translations',
-                'product_facet_translations.languageCode IN (:...channelLanguages)',
-                {
-                    channelLanguages,
-                },
-            )
-            .leftJoinAndSelect('product.channels', 'channel', 'channel.id IN (:...channelId)', {
-                channelId: channels.map(x => x.id),
-            })
-            .where('product.id = :productId', { productId });
+
+        const product = await this.connection.getRepository(ctx, Product).findOne({
+            loadEagerRelations: false,
+            relations: productRelations,
+            relationLoadStrategy: 'query',
+            where: { id: Equal(productId), channels: { id: In(channels.map(x => x.id)) } },
+        });
+        return product ?? undefined;
     }
 
     private async saveVariants(ctx: MutableRequestContext, variants: ProductVariant[]) {
@@ -456,11 +408,10 @@ export class IndexerController {
         for (const variant of variants) {
             let product = productMap.get(variant.productId);
             if (!product) {
-                product = await this.getProductInChannelQueryBuilder(
-                    ctx,
-                    variant.productId,
-                    ctx.channel,
-                ).getOneOrFail();
+                product = await this.getProductInChannelQueryBuilder(ctx, variant.productId, ctx.channel);
+                if (!product) {
+                    throw new Error('Product not found for variant!');
+                }
                 productMap.set(variant.productId, product);
             }
             const availableLanguageCodes = unique(ctx.channel.availableLanguageCodes);