Просмотр исходного кода

feat(elastic): improve indexing performance

hendrikdepauw 4 лет назад
Родитель
Сommit
6488ce236c

+ 3 - 3
packages/core/src/service/services/facet-value.service.ts

@@ -53,9 +53,9 @@ export class FacetValueService {
     }
 
     findByIds(ctx: RequestContext, ids: ID[]): Promise<Array<Translated<FacetValue>>> {
-        const facetValues = this.connection
-            .getRepository(ctx, FacetValue)
-            .findByIds(ids, { relations: ['facet'] });
+        const facetValues = this.connection.findByIdsInChannel(ctx, FacetValue, ids, ctx.channelId, {
+            relations: ['facet'],
+        });
         return facetValues.then(values =>
             values.map(facetValue => translateDeep(facetValue, ctx.languageCode, ['facet'])),
         );

+ 2 - 1
packages/elasticsearch-plugin/package.json

@@ -19,7 +19,8 @@
   },
   "dependencies": {
     "@elastic/elasticsearch": "^7.9.1",
-    "deepmerge": "^4.2.2"
+    "deepmerge": "^4.2.2",
+    "lodash": "^4.17.21"
   },
   "devDependencies": {
     "@vendure/common": "^1.0.0-beta.8",

+ 20 - 21
packages/elasticsearch-plugin/src/elasticsearch-resolver.ts

@@ -3,17 +3,17 @@ import {
     Job as GraphQLJob,
     Permission,
     QuerySearchArgs,
-    SearchInput,
     SearchResponse,
 } from '@vendure/common/lib/generated-types';
 import { Omit } from '@vendure/common/lib/omit';
-import { Allow, Ctx, FacetValue, RequestContext, SearchResolver } from '@vendure/core';
+import { Allow, Ctx, FacetValue, FacetValueService, RequestContext, SearchResolver } from '@vendure/core';
+import { countBy, uniq } from 'lodash';
 
 import { ElasticsearchService } from './elasticsearch.service';
 import { ElasticSearchInput, SearchPriceData } from './types';
 
 @Resolver('SearchResponse')
-export class ShopElasticSearchResolver implements Omit<SearchResolver, 'reindex'> {
+export class ShopElasticSearchResolver implements Omit<SearchResolver, 'facetValues' | 'reindex'> {
     constructor(private elasticsearchService: ElasticsearchService) {}
 
     @Query()
@@ -28,15 +28,6 @@ export class ShopElasticSearchResolver implements Omit<SearchResolver, 'reindex'
         return result;
     }
 
-    @ResolveField()
-    async facetValues(
-        @Ctx() ctx: RequestContext,
-        @Parent() parent: { input: ElasticSearchInput },
-    ): Promise<Array<{ facetValue: FacetValue; count: number }>> {
-        const facetValues = await this.elasticsearchService.facetValues(ctx, parent.input, true);
-        return facetValues.filter((i) => !i.facetValue.facet.isPrivate);
-    }
-
     @ResolveField()
     async prices(
         @Ctx() ctx: RequestContext,
@@ -47,7 +38,7 @@ export class ShopElasticSearchResolver implements Omit<SearchResolver, 'reindex'
 }
 
 @Resolver('SearchResponse')
-export class AdminElasticSearchResolver implements SearchResolver {
+export class AdminElasticSearchResolver implements Omit<SearchResolver, 'facetValues'> {
     constructor(private elasticsearchService: ElasticsearchService) {}
 
     @Query()
@@ -62,17 +53,25 @@ export class AdminElasticSearchResolver implements SearchResolver {
         return result;
     }
 
-    @ResolveField()
-    async facetValues(
-        @Ctx() ctx: RequestContext,
-        @Parent() parent: { input: SearchInput },
-    ): Promise<Array<{ facetValue: FacetValue; count: number }>> {
-        return this.elasticsearchService.facetValues(ctx, parent.input, false);
-    }
-
     @Mutation()
     @Allow(Permission.UpdateCatalog)
     async reindex(@Ctx() ctx: RequestContext): Promise<GraphQLJob> {
         return (this.elasticsearchService.reindex(ctx, false) as unknown) as GraphQLJob;
     }
 }
+
+@Resolver('SearchResponse')
+export class EntityElasticSearchResolver implements Pick<SearchResolver, 'facetValues'> {
+    constructor(private facetValueService: FacetValueService) {}
+
+    @ResolveField()
+    async facetValues(
+        @Ctx() ctx: RequestContext,
+        @Parent() parent: Omit<SearchResponse, 'facetValues'>,
+    ): Promise<Array<{ facetValue: FacetValue; count: number }>> {
+        const facetValueIds = parent.items.map(item => item.facetValueIds).flat();
+        const facetValueCounts = countBy(facetValueIds);
+        const facetValues = await this.facetValueService.findByIds(ctx, uniq(facetValueIds));
+        return facetValues.map(facetValue => ({ facetValue, count: facetValueCounts[facetValue.id] }));
+    }
+}

+ 25 - 59
packages/elasticsearch-plugin/src/elasticsearch.service.ts

@@ -1,11 +1,9 @@
-import { Client, ClientOptions } from '@elastic/elasticsearch';
+import { Client } from '@elastic/elasticsearch';
 import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
 import { SearchResult, SearchResultAsset } from '@vendure/common/lib/generated-types';
 import {
     ConfigService,
     DeepRequired,
-    FacetValue,
-    FacetValueService,
     InternalServerError,
     Job,
     Logger,
@@ -37,7 +35,6 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
         @Inject(ELASTIC_SEARCH_OPTIONS) private options: DeepRequired<ElasticsearchOptions>,
         private searchService: SearchService,
         private elasticsearchIndexService: ElasticsearchIndexService,
-        private facetValueService: FacetValueService,
         private configService: ConfigService,
     ) {
         searchService.adopt(this);
@@ -58,8 +55,30 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
         return this.client.close();
     }
 
-    checkConnection() {
-        return this.client.ping({}, { requestTimeout: 1000 });
+    async checkConnection(): Promise<void> {
+        await new Promise<void>(async (resolve, reject) => {
+            const { connectionAttempts, connectionAttemptInterval } = this.options;
+            let attempts = 0;
+            Logger.verbose('Pinging Elasticsearch...', loggerCtx);
+            while (attempts < connectionAttempts) {
+                attempts++;
+                try {
+                    const pingResult = await this.client.ping({}, { requestTimeout: 1000 });
+                    if (pingResult.body) {
+                        Logger.verbose(`Ping to Elasticsearch successful`, loggerCtx);
+                        return resolve();
+                    }
+                } catch (e) {
+                    Logger.verbose(`Ping to Elasticsearch failed with error "${e.message}"`, loggerCtx);
+                }
+                Logger.verbose(
+                    `Connection to Elasticsearch could not be made, trying again after ${connectionAttemptInterval}ms (attempt ${attempts} of ${connectionAttempts})`,
+                    loggerCtx,
+                );
+                await new Promise(resolve1 => setTimeout(resolve1, connectionAttemptInterval));
+            }
+            reject(`Could not connection to Elasticsearch. Aborting bootstrap.`);
+        });
     }
 
     async createIndicesIfNotExists() {
@@ -133,59 +152,6 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
         }
     }
 
-    /**
-     * Return a list of all FacetValues which appear in the result set.
-     */
-    async facetValues(
-        ctx: RequestContext,
-        input: ElasticSearchInput,
-        enabledOnly: boolean = false,
-    ): Promise<Array<{ facetValue: FacetValue; count: number }>> {
-        const { indexPrefix } = this.options;
-        const elasticSearchBody = buildElasticBody(
-            input,
-            this.options.searchConfig,
-            ctx.channelId,
-            ctx.languageCode,
-            enabledOnly,
-        );
-        elasticSearchBody.from = 0;
-        elasticSearchBody.size = 0;
-        elasticSearchBody.aggs = {
-            facetValue: {
-                terms: {
-                    field: 'facetValueIds',
-                    size: this.options.searchConfig.facetValueMaxSize,
-                },
-            },
-        };
-        let body: SearchResponseBody<VariantIndexItem>;
-        try {
-            const result = await this.client.search<SearchResponseBody<VariantIndexItem>>({
-                index: indexPrefix + (input.groupByProduct ? PRODUCT_INDEX_NAME : VARIANT_INDEX_NAME),
-                body: elasticSearchBody,
-            });
-            body = result.body;
-        } catch (e) {
-            Logger.error(e.message, loggerCtx, e.stack);
-            throw e;
-        }
-
-        const buckets = body.aggregations ? body.aggregations.facetValue.buckets : [];
-
-        const facetValues = await this.facetValueService.findByIds(
-            ctx,
-            buckets.map(b => b.key),
-        );
-        return facetValues.map((facetValue, index) => {
-            const bucket = buckets.find(b => b.key.toString() === facetValue.id.toString());
-            return {
-                facetValue,
-                count: bucket ? bucket.doc_count : 0,
-            };
-        });
-    }
-
     async priceRange(ctx: RequestContext, input: ElasticSearchInput): Promise<SearchPriceData> {
         const { indexPrefix, searchConfig } = this.options;
         const { groupByProduct } = input;

+ 143 - 337
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -5,11 +5,11 @@ import {
     Asset,
     asyncObservable,
     AsyncQueue,
+    Channel,
     Collection,
     ConfigService,
     FacetValue,
     ID,
-    idsAreEqual,
     LanguageCode,
     Logger,
     Product,
@@ -18,15 +18,12 @@ import {
     RequestContext,
     TransactionalConnection,
     Translatable,
-    translateDeep,
     Translation,
 } from '@vendure/core';
 import { Observable } from 'rxjs';
-import { SelectQueryBuilder } from 'typeorm';
-import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
 
 import { ELASTIC_SEARCH_OPTIONS, loggerCtx, PRODUCT_INDEX_NAME, VARIANT_INDEX_NAME } from './constants';
-import { createIndices, deleteByChannel, deleteIndices } from './indexing-utils';
+import { createIndices, deleteIndices } from './indexing-utils';
 import { ElasticsearchOptions } from './options';
 import {
     BulkOperation,
@@ -43,12 +40,16 @@ import {
     VariantIndexItem,
 } from './types';
 
+export const productRelations = [
+    'variants',
+    'featuredAsset',
+    'facetValues',
+    'facetValues.facet',
+    'channels',
+    'channels.defaultTaxZone',
+];
+
 export const variantRelations = [
-    'product',
-    'product.featuredAsset',
-    'product.facetValues',
-    'product.facetValues.facet',
-    'product.channels',
     'featuredAsset',
     'facetValues',
     'facetValues.facet',
@@ -91,8 +92,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
      * Updates the search index only for the affected product.
      */
     async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(rawContext);
-        await this.updateProductInternal(ctx, productId);
+        await this.updateProductsInternal([productId]);
         return true;
     }
 
@@ -100,17 +100,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
      * Updates the search index only for the affected product.
      */
     async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(rawContext);
-        const product = await this.connection.getRepository(Product).findOne(productId, {
-            relations: ['channels'],
-        });
-        if (!product) {
-            return false;
-        }
-        const channelIds = product.channels.map(c => c.id);
-        await this.deleteProductInternal(product, channelIds);
-        const { items: variants } = await this.productVariantService.getVariantsByProductId(ctx, productId);
-        await this.deleteVariantsInternal(variants, channelIds);
+        await this.deleteProductInternal(productId);
         return true;
     }
 
@@ -122,14 +112,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         productId,
         channelId,
     }: ProductChannelMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(rawContext);
-        await this.updateProductInternal(ctx, productId);
-        const { items: variants } = await this.productVariantService.getVariantsByProductId(ctx, productId);
-        await this.updateVariantsInternal(
-            ctx,
-            variants.map(v => v.id),
-            channelId,
-        );
+        await this.updateProductsInternal([productId]);
         return true;
     }
 
@@ -141,14 +124,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         productId,
         channelId,
     }: ProductChannelMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(rawContext);
-        const product = await this.connection.getRepository(Product).findOne(productId);
-        if (!product) {
-            return false;
-        }
-        await this.deleteProductInternal(product, [channelId]);
-        const { items: variants } = await this.productVariantService.getVariantsByProductId(ctx, productId);
-        await this.deleteVariantsInternal(variants, [channelId]);
+        await this.updateProductsInternal([productId]);
         return true;
     }
 
@@ -157,8 +133,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         productVariantId,
         channelId,
     }: VariantChannelMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(rawContext);
-        await this.updateVariantsInternal(ctx, [productVariantId], channelId);
+        const productIds = await this.getProductIdsByVariantIds([productVariantId]);
+        await this.updateProductsInternal(productIds);
         return true;
     }
 
@@ -167,14 +143,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         productVariantId,
         channelId,
     }: VariantChannelMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(rawContext);
-        const productVariant = await this.connection.getEntityOrThrow(ctx, ProductVariant, productVariantId, {
-            relations: ['product', 'product.channels'],
-        });
-        await this.deleteVariantsInternal([productVariant], [channelId]);
-        if (!productVariant.product.channels.find(c => idsAreEqual(c.id, channelId))) {
-            await this.deleteProductInternal(productVariant.product, [channelId]);
-        }
+        const productIds = await this.getProductIdsByVariantIds([productVariantId]);
+        await this.updateProductsInternal(productIds);
         return true;
     }
 
@@ -182,26 +152,18 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
      * Updates the search index only for the affected entities.
      */
     async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(rawContext);
         return this.asyncQueue.push(async () => {
-            await this.updateVariantsInternal(ctx, variantIds, ctx.channelId);
+            const productIds = await this.getProductIdsByVariantIds(variantIds);
+            await this.updateProductsInternal(productIds);
             return true;
         });
     }
 
     async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
-        const ctx = RequestContext.deserialize(rawContext);
-        const variants = await this.connection
-            .getRepository(ProductVariant)
-            .findByIds(variantIds, { relations: ['product', 'channels'] });
-        const productIds = unique(variants.map(v => v.product.id));
+        const productIds = await this.getProductIdsByVariantIds(variantIds);
         for (const productId of productIds) {
-            await this.updateProductInternal(ctx, productId);
+            await this.deleteProductInternal(productId);
         }
-        const channelIds = unique(
-            variants.reduce((flat: ID[], v) => [...flat, ...v.channels.map(c => c.id)], []),
-        );
-        await this.deleteVariantsInternal(variants, channelIds);
         return true;
     }
 
@@ -209,76 +171,26 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         ctx: rawContext,
         ids,
     }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
-        const ctx = RequestContext.deserialize(rawContext);
-        const { batchSize } = this.options;
-
         return asyncObservable(async observer => {
             return this.asyncQueue.push(async () => {
                 const timeStart = Date.now();
-                if (ids.length) {
-                    const batches = Math.ceil(ids.length / batchSize);
-                    Logger.verbose(`Updating ${ids.length} variants...`, loggerCtx);
-
-                    let variantsInProduct: ProductVariant[] = [];
-
-                    for (let i = 0; i < batches; i++) {
-                        const begin = i * batchSize;
-                        const end = begin + batchSize;
-                        const batchIds = ids.slice(begin, end);
-                        const variants = await this.getVariantsByIds(ctx, batchIds);
-                        variantsInProduct = await this.processVariantBatch(
-                            variants,
-                            variantsInProduct,
-                            (operations, variant) => {
-                                const languageVariants = variant.translations.map(t => t.languageCode);
-                                for (const languageCode of languageVariants) {
-                                    operations.push(
-                                        {
-                                            update: {
-                                                _id: this.getId(variant.id, ctx.channelId, languageCode),
-                                            },
-                                        },
-                                        {
-                                            doc: this.createVariantIndexItem(
-                                                variant,
-                                                ctx.channelId,
-                                                languageCode,
-                                            ),
-                                        },
-                                    );
-                                }
-                            },
-                            (operations, product, _variants) => {
-                                const languageVariants = product.translations.map(t => t.languageCode);
-                                for (const languageCode of languageVariants) {
-                                    operations.push(
-                                        {
-                                            update: {
-                                                _id: this.getId(product.id, ctx.channelId, languageCode),
-                                            },
-                                        },
-                                        {
-                                            doc: this.createProductIndexItem(
-                                                _variants,
-                                                ctx.channelId,
-                                                languageCode,
-                                            ),
-                                        },
-                                    );
-                                }
-                            },
-                        );
+                const productIds = await this.getProductIdsByVariantIds(ids);
+                if (productIds.length) {
+                    let finishedProductsCount = 0;
+                    for (const productId of productIds) {
+                        await this.updateProductsInternal([productId]);
+                        finishedProductsCount++;
                         observer.next({
-                            total: ids.length,
-                            completed: Math.min((i + 1) * batchSize, ids.length),
+                            total: productIds.length,
+                            completed: Math.min(finishedProductsCount, productIds.length),
                             duration: +new Date() - timeStart,
                         });
                     }
                 }
                 Logger.verbose(`Completed updating variants`, loggerCtx);
                 return {
-                    total: ids.length,
-                    completed: ids.length,
+                    total: productIds.length,
+                    completed: productIds.length,
                     duration: +new Date() - timeStart,
                 };
             });
@@ -286,9 +198,6 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     }
 
     reindex({ ctx: rawContext, dropIndices }: ReindexMessageData): Observable<ReindexMessageResponse> {
-        const ctx = RequestContext.deserialize(rawContext);
-        const { batchSize } = this.options;
-
         return asyncObservable(async observer => {
             return this.asyncQueue.push(async () => {
                 const timeStart = Date.now();
@@ -300,56 +209,40 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         this.options.indexPrefix,
                         this.configService.entityIdStrategy.primaryKeyType,
                     );
-                } else {
-                    await deleteByChannel(this.client, this.options.indexPrefix, ctx.channelId);
                 }
 
-                const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
-                const count = await qb.getCount();
-                Logger.verbose(`Reindexing ${count} ProductVariants`, loggerCtx);
-
-                const batches = Math.ceil(count / batchSize);
-                let variantsInProduct: ProductVariant[] = [];
+                const deletedProductIds = await this.connection
+                    .getRepository(Product)
+                    .createQueryBuilder('product')
+                    .select('product.id')
+                    .where('product.deletedAt IS NOT NULL')
+                    .getMany();
 
-                for (let i = 0; i < batches; i++) {
-                    const variants = await this.getBatch(ctx, qb, i);
+                for (const { id: deletedProductId } of deletedProductIds) {
+                    await this.deleteProductInternal(deletedProductId);
+                }
 
-                    Logger.verbose(
-                        `Processing batch ${i + 1} of ${batches}. ProductVariants count: ${variants.length}`,
-                        loggerCtx,
-                    );
-                    variantsInProduct = await this.processVariantBatch(
-                        variants,
-                        variantsInProduct,
-                        (operations, variant) => {
-                            const languageVariants = variant.translations.map(t => t.languageCode);
-                            for (const languageCode of languageVariants) {
-                                operations.push(
-                                    { index: { _id: this.getId(variant.id, ctx.channelId, languageCode) } },
-                                    this.createVariantIndexItem(variant, ctx.channelId, languageCode),
-                                );
-                            }
-                        },
-                        (operations, product, _variants) => {
-                            const languageVariants = product.translations.map(t => t.languageCode);
-                            for (const languageCode of languageVariants) {
-                                operations.push(
-                                    { index: { _id: this.getId(product.id, ctx.channelId, languageCode) } },
-                                    this.createProductIndexItem(_variants, ctx.channelId, languageCode),
-                                );
-                            }
-                        },
-                    );
+                const productIds = await this.connection
+                    .getRepository(Product)
+                    .createQueryBuilder('product')
+                    .select('product.id')
+                    .where('product.deletedAt IS NULL')
+                    .getMany();
+
+                let finishedProductsCount = 0;
+                for (const { id: productId } of productIds) {
+                    await this.updateProductsInternal([productId]);
+                    finishedProductsCount++;
                     observer.next({
-                        total: count,
-                        completed: Math.min((i + 1) * batchSize, count),
+                        total: productIds.length,
+                        completed: Math.min(finishedProductsCount, productIds.length),
                         duration: +new Date() - timeStart,
                     });
                 }
                 Logger.verbose(`Completed reindexing!`, loggerCtx);
                 return {
-                    total: count,
-                    completed: count,
+                    total: productIds.length,
+                    completed: productIds.length,
                     duration: +new Date() - timeStart,
                 };
             });
@@ -443,60 +336,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         return result1.body.failures.length === 0 && result2.body.failures === 0;
     }
 
-    private async processVariantBatch(
-        variants: ProductVariant[],
-        variantsInProduct: ProductVariant[],
-        processVariants: (
-            operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem> | VariantIndexItem>,
-            variant: ProductVariant,
-        ) => void,
-        processProducts: (
-            operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem> | ProductIndexItem>,
-            product: Product,
-            variants: ProductVariant[],
-        ) => void,
-    ) {
-        const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
-        const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
-        const productIdsIndexed = new Set<ID>();
-        // tslint:disable-next-line:prefer-for-of
-        for (let j = 0; j < variants.length; j++) {
-            const variant = variants[j];
-            variantsInProduct.push(variant);
-            processVariants(variantsToIndex, variant);
-            const nextVariant = variants[j + 1];
-            const nextVariantIsNewProduct = nextVariant && nextVariant.productId !== variant.productId;
-            const thisVariantIsLastAndProductNotAdded =
-                !nextVariant && !productIdsIndexed.has(variant.productId);
-            if (nextVariantIsNewProduct || thisVariantIsLastAndProductNotAdded) {
-                processProducts(productsToIndex, variant.product, variantsInProduct);
-                variantsInProduct = [];
-                productIdsIndexed.add(variant.productId);
-            }
-        }
-        await this.executeBulkOperations(VARIANT_INDEX_NAME, variantsToIndex);
-        await this.executeBulkOperations(PRODUCT_INDEX_NAME, productsToIndex);
-        return variantsInProduct;
-    }
-
-    private async updateVariantsInternal(ctx: RequestContext, variantIds: ID[], channelId: ID) {
-        const productVariants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
-            relations: variantRelations,
-            where: {
-                deletedAt: null,
-            },
-            order: {
-                id: 'ASC',
-            },
-        });
-
+    private async updateVariantsInternal(productVariants: ProductVariant[]) {
         if (productVariants.length) {
-            // When ProductVariants change, we need to update the corresponding Product index
-            // since e.g. price changes must be reflected on the Product level too.
-            const productIdsOfVariants = unique(productVariants.map(v => v.productId));
-            for (const variantProductId of productIdsOfVariants) {
-                await this.updateProductInternal(ctx, variantProductId);
-            }
             const operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
             for (const variant of productVariants) {
                 const languageVariants = variant.translations.map(t => t.languageCode);
@@ -508,12 +349,12 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                         isAuthorized: true,
                         session: {} as any,
                     });
-                    await this.productVariantService.applyChannelPriceAndTax(variant, ctx);
+                    await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
                     for (const languageCode of languageVariants) {
                         operations.push(
-                            { update: { _id: this.getId(variant.id, channel.id, languageCode) } },
+                            { update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) } },
                             {
-                                doc: this.createVariantIndexItem(variant, channel.id, languageCode),
+                                doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode),
                                 doc_as_upsert: true,
                             },
                         );
@@ -525,27 +366,38 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         }
     }
 
-    private async updateProductInternal(ctx: RequestContext, productId: ID) {
-        const product = await this.connection.getRepository(Product).findOne(productId, {
-            relations: ['variants', 'channels', 'channels.defaultTaxZone'],
-        });
-        if (product) {
-            const updatedProductVariants = await this.connection.getRepository(ProductVariant).findByIds(
-                product.variants.map(v => v.id),
-                {
-                    relations: variantRelations,
-                    where: {
-                        deletedAt: null,
-                    },
+    private async updateProductsInternal(productIds: ID[]) {
+        Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx);
+        const operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
+
+        for (const productId of productIds) {
+            await this.deleteProductInternal(productId);
+            const product = await this.connection.getRepository(Product).findOne(productId, {
+                relations: productRelations,
+                where: {
+                    deletedAt: null,
                 },
-            );
-            if (product.enabled === false) {
-                updatedProductVariants.forEach(v => (v.enabled = false));
-            }
-            const operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
+            });
+            if (product) {
+                const updatedProductVariants = await this.connection.getRepository(ProductVariant).findByIds(
+                    product.variants.map(v => v.id),
+                    {
+                        relations: variantRelations,
+                        where: {
+                            deletedAt: null,
+                        },
+                        order: {
+                            id: 'ASC',
+                        },
+                    },
+                );
+                updatedProductVariants.forEach(variant => (variant.product = product));
+                if (product.enabled === false) {
+                    updatedProductVariants.forEach(v => (v.enabled = false));
+                }
+                Logger.verbose(`Updating Product (${productId})`, loggerCtx);
+                if (updatedProductVariants.length) await this.updateVariantsInternal(updatedProductVariants);
 
-            if (updatedProductVariants.length) {
-                Logger.verbose(`Updating 1 Product (${productId})`, loggerCtx);
                 const languageVariants = product.translations.map(t => t.languageCode);
 
                 for (const channel of product.channels) {
@@ -558,66 +410,69 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                     });
 
                     const variantsInChannel = updatedProductVariants.filter(v =>
-                        v.channels.map(c => c.id).includes(channel.id),
+                        v.channels.map(c => c.id).includes(channelCtx.channelId),
                     );
                     for (const variant of variantsInChannel) {
                         await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
                     }
-
                     for (const languageCode of languageVariants) {
-                        const updatedProductIndexItem = this.createProductIndexItem(
-                            variantsInChannel,
-                            channel.id,
-                            languageCode,
-                        );
                         operations.push(
                             {
                                 update: {
-                                    _id: this.getId(
-                                        updatedProductIndexItem.productId,
-                                        channel.id,
-                                        languageCode,
-                                    ),
+                                    _id: this.getId(product.id, channelCtx.channelId, languageCode),
                                 },
                             },
-                            { doc: updatedProductIndexItem, doc_as_upsert: true },
+                            {
+                                doc: variantsInChannel.length
+                                    ? this.createProductIndexItem(
+                                          variantsInChannel,
+                                          channelCtx.channelId,
+                                          languageCode,
+                                      )
+                                    : this.createSyntheticProductIndexItem(channelCtx, product, languageCode),
+                                doc_as_upsert: true,
+                            },
                         );
                     }
                 }
-            } else {
-                const syntheticIndexItem = this.createSyntheticProductIndexItem(ctx, product);
-                operations.push(
-                    {
-                        update: {
-                            _id: this.getId(syntheticIndexItem.productId, ctx.channelId, ctx.languageCode),
-                        },
-                    },
-                    { doc: syntheticIndexItem, doc_as_upsert: true },
-                );
             }
-            await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
         }
+        await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
     }
 
-    private async deleteProductInternal(product: Product, channelIds: ID[]) {
-        Logger.verbose(`Deleting 1 Product (${product.id})`, loggerCtx);
-        const operations: BulkOperation[] = [];
-        const languageVariants = product.translations.map(t => t.languageCode);
-        for (const languageCode of languageVariants) {
-            for (const channelId of channelIds) {
-                operations.push({ delete: { _id: this.getId(product.id, channelId, languageCode) } });
+    private async deleteProductInternal(productId: ID) {
+        Logger.verbose(`Deleting 1 Product (${productId})`, loggerCtx);
+        const channels = await this.connection
+            .getRepository(Channel)
+            .createQueryBuilder('channel')
+            .select('channel.id')
+            .getMany();
+        const product = await this.connection.getRepository(Product).findOne(productId, {
+            relations: ['variants'],
+        });
+        if (product) {
+            const operations: BulkOperation[] = [];
+            for (const { id: channelId } of channels) {
+                const languageVariants = product.translations.map(t => t.languageCode);
+                for (const languageCode of languageVariants) {
+                    operations.push({ delete: { _id: this.getId(product.id, channelId, languageCode) } });
+                }
             }
+            await this.deleteVariantsInternal(
+                product.variants,
+                channels.map(c => c.id),
+            );
+            await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
         }
-        await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
     }
 
     private async deleteVariantsInternal(variants: ProductVariant[], channelIds: ID[]) {
         Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx);
         const operations: BulkOperation[] = [];
         for (const variant of variants) {
-            const languageVariants = variant.translations.map(t => t.languageCode);
-            for (const languageCode of languageVariants) {
-                for (const channelId of channelIds) {
+            for (const channelId of channelIds) {
+                const languageVariants = variant.translations.map(t => t.languageCode);
+                for (const languageCode of languageVariants) {
                     operations.push({
                         delete: { _id: this.getId(variant.id, channelId, languageCode) },
                     });
@@ -627,6 +482,14 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         await this.executeBulkOperations(VARIANT_INDEX_NAME, operations);
     }
 
+    private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {
+        const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
+            relations: ['product'],
+            loadEagerRelations: false,
+        });
+        return unique(variants.map(v => v.product.id));
+    }
+
     private async executeBulkOperations(
         indexName: string,
         operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
@@ -668,67 +531,6 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         }
     }
 
-    private getSearchIndexQueryBuilder(channelId: ID) {
-        const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
-        FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
-            relations: variantRelations,
-            order: {
-                productId: 'ASC',
-            },
-        });
-        FindOptionsUtils.joinEagerRelations(
-            qb,
-            qb.alias,
-            this.connection.rawConnection.getMetadata(ProductVariant),
-        );
-        qb.leftJoin('variants.product', '__product')
-            .leftJoin('__product.channels', '__channel')
-            .where('__channel.id = :channelId', { channelId })
-            .andWhere('variants__product.deletedAt IS NULL')
-            .andWhere('variants.deletedAt IS NULL');
-        return qb;
-    }
-
-    private async getBatch(
-        ctx: RequestContext,
-        qb: SelectQueryBuilder<ProductVariant>,
-        batchNumber: string | number,
-    ): Promise<ProductVariant[]> {
-        const { batchSize } = this.options;
-        const i = Number.parseInt(batchNumber.toString(), 10);
-        const variants = await qb
-            .take(batchSize)
-            .skip(i * batchSize)
-            .addOrderBy('variants.id', 'ASC')
-            .getMany();
-
-        return this.hydrateVariants(ctx, variants);
-    }
-
-    private async getVariantsByIds(ctx: RequestContext, ids: ID[]) {
-        const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
-            relations: variantRelations,
-            where: {
-                deletedAt: null,
-            },
-            order: {
-                id: 'ASC',
-            },
-        });
-        return this.hydrateVariants(ctx, variants);
-    }
-    /**
-     * Given an array of ProductVariants, this method applies the correct taxes and translations.
-     */
-    private async hydrateVariants(
-        ctx: RequestContext,
-        variants: ProductVariant[],
-    ): Promise<ProductVariant[]> {
-        return (
-            await Promise.all(variants.map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx)))
-        ).map(v => translateDeep(v, ctx.languageCode, ['product', 'collections']));
-    }
-
     private createVariantIndexItem(
         v: ProductVariant,
         channelId: ID,
@@ -835,11 +637,15 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
      * If a Product has no variants, we create a synthetic variant for the purposes
      * of making that product visible via the search query.
      */
-    private createSyntheticProductIndexItem(ctx: RequestContext, product: Product): ProductIndexItem {
+    private createSyntheticProductIndexItem(
+        ctx: RequestContext,
+        product: Product,
+        languageCode: LanguageCode,
+    ): ProductIndexItem {
         const productTranslation = this.getTranslation(product, ctx.languageCode);
         return {
             channelId: ctx.channelId,
-            languageCode: ctx.languageCode,
+            languageCode: languageCode,
             sku: '',
             slug: productTranslation.slug,
             productId: product.id,

+ 16 - 0
packages/elasticsearch-plugin/src/options.ts

@@ -26,6 +26,20 @@ export interface ElasticsearchOptions {
      * @default 9200
      */
     port?: number;
+    /**
+     * @description
+     * Maximum amount of attempts made to connect to the ElasticSearch server on startup.
+     *
+     * @default 10
+     */
+    connectionAttempts?: number;
+    /**
+     * @description
+     * Interval in milliseconds between attempts to connect to the ElasticSearch server on startup.
+     *
+     * @default 5000
+     */
+    connectionAttemptInterval?: number;
     /**
      * @description
      * Options to pass directly to the
@@ -295,6 +309,8 @@ export type ElasticsearchRuntimeOptions = DeepRequired<Omit<ElasticsearchOptions
 export const defaultOptions: ElasticsearchRuntimeOptions = {
     host: 'http://localhost',
     port: 9200,
+    connectionAttempts: 10,
+    connectionAttemptInterval: 5000,
     indexPrefix: 'vendure-',
     batchSize: 2000,
     searchConfig: {

+ 9 - 6
packages/elasticsearch-plugin/src/plugin.ts

@@ -22,7 +22,11 @@ import { buffer, debounceTime, delay, filter, map } from 'rxjs/operators';
 import { ELASTIC_SEARCH_OPTIONS, loggerCtx } from './constants';
 import { CustomMappingsResolver } from './custom-mappings.resolver';
 import { ElasticsearchIndexService } from './elasticsearch-index.service';
-import { AdminElasticSearchResolver, ShopElasticSearchResolver } from './elasticsearch-resolver';
+import {
+    AdminElasticSearchResolver,
+    EntityElasticSearchResolver,
+    ShopElasticSearchResolver,
+} from './elasticsearch-resolver';
 import { ElasticsearchHealthIndicator } from './elasticsearch.health';
 import { ElasticsearchService } from './elasticsearch.service';
 import { generateSchemaExtensions } from './graphql-schema-extensions';
@@ -199,7 +203,7 @@ import { ElasticsearchOptions, ElasticsearchRuntimeOptions, mergeWithDefaults }
         ElasticsearchIndexerController,
         { provide: ELASTIC_SEARCH_OPTIONS, useFactory: () => ElasticsearchPlugin.options },
     ],
-    adminApiExtensions: { resolvers: [AdminElasticSearchResolver] },
+    adminApiExtensions: { resolvers: [AdminElasticSearchResolver, EntityElasticSearchResolver] },
     shopApiExtensions: {
         resolvers: () => {
             const { options } = ElasticsearchPlugin;
@@ -207,8 +211,8 @@ import { ElasticsearchOptions, ElasticsearchRuntimeOptions, mergeWithDefaults }
                 0 < Object.keys(options.customProductMappings || {}).length &&
                 0 < Object.keys(options.customProductVariantMappings || {}).length;
             return requiresUnionResolver
-                ? [ShopElasticSearchResolver, CustomMappingsResolver]
-                : [ShopElasticSearchResolver];
+                ? [ShopElasticSearchResolver, EntityElasticSearchResolver, CustomMappingsResolver]
+                : [ShopElasticSearchResolver, EntityElasticSearchResolver];
         },
         // `any` cast is there due to a strange error "Property '[Symbol.iterator]' is missing in type... URLSearchParams"
         // which looks like possibly a TS/definitions bug.
@@ -237,10 +241,9 @@ export class ElasticsearchPlugin implements OnApplicationBootstrap {
 
     /** @internal */
     async onApplicationBootstrap(): Promise<void> {
-        const { host, port } = ElasticsearchPlugin.options;
         const nodeName = this.nodeName();
         try {
-            const pingResult = await this.elasticsearchService.checkConnection();
+            await this.elasticsearchService.checkConnection();
         } catch (e) {
             Logger.error(`Could not connect to Elasticsearch instance at "${nodeName}"`, loggerCtx);
             Logger.error(JSON.stringify(e), loggerCtx);

+ 17 - 2
yarn.lock

@@ -10258,7 +10258,7 @@ iferr@^0.1.5:
   resolved "https://registry.npmjs.org/iferr/-/iferr-0.1.5.tgz#c60eed69e6d8fdb6b3104a1fcbca1c192dc5b501"
   integrity sha1-xg7taebY/bazEEofy8ocGS3FtQE=
 
-ignore-walk@^3.0.1:
+ignore-walk@^3.0.1, ignore-walk@^3.0.3:
   version "3.0.3"
   resolved "https://registry.npmjs.org/ignore-walk/-/ignore-walk-3.0.3.tgz#017e2447184bfeade7c238e4aefdd1e8f95b1e37"
   integrity sha512-m7o6xuOaT1aqheYHKf8W6J5pYH85ZI9w077erOzLje3JsB1gkafkAhHHY19dqjulgIZHFm32Cp5uNZgcQqdJKw==
@@ -12368,6 +12368,11 @@ lodash@4.x, lodash@^4.15.0, lodash@^4.17.11, lodash@^4.17.14, lodash@^4.17.15, l
   resolved "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz#679591c564c3bffaae8454cf0b3df370c3d6911c"
   integrity sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==
 
+lodash@^4.17.21:
+  version "4.17.21"
+  resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.21.tgz#679591c564c3bffaae8454cf0b3df370c3d6911c"
+  integrity sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==
+
 log-symbols@^1.0.2:
   version "1.0.2"
   resolved "https://registry.npmjs.org/log-symbols/-/log-symbols-1.0.2.tgz#376ff7b58ea3086a0f09facc74617eca501e1a18"
@@ -13833,7 +13838,7 @@ npm-package-arg@^8.0.0, npm-package-arg@^8.0.1, npm-package-arg@^8.1.0:
     semver "^7.0.0"
     validate-npm-package-name "^3.0.0"
 
-npm-packlist@1.1.12, npm-packlist@^1.1.6, npm-packlist@^2.1.4:
+npm-packlist@^1.1.6:
   version "1.1.12"
   resolved "https://registry.yarnpkg.com/npm-packlist/-/npm-packlist-1.1.12.tgz#22bde2ebc12e72ca482abd67afc51eb49377243a"
   integrity sha512-WJKFOVMeAlsU/pjXuqVdzU0WfgtIBCupkEVwn+1Y0ERAbUfWw8R4GjgVbaKnUjRoD2FoQbHOCbOyT5Mbs9Lw4g==
@@ -13841,6 +13846,16 @@ npm-packlist@1.1.12, npm-packlist@^1.1.6, npm-packlist@^2.1.4:
     ignore-walk "^3.0.1"
     npm-bundled "^1.0.1"
 
+npm-packlist@^2.1.4:
+  version "2.1.5"
+  resolved "https://registry.yarnpkg.com/npm-packlist/-/npm-packlist-2.1.5.tgz#43ef5bbb9f59b7c0ef91e0905f1dd707b4cfb33c"
+  integrity sha512-KCfK3Vi2F+PH1klYauoQzg81GQ8/GGjQRKYY6tRnpQUPKTs/1gBZSRWtTEd7jGdSn1LZL7gpAmJT+BcS55k2XQ==
+  dependencies:
+    glob "^7.1.6"
+    ignore-walk "^3.0.3"
+    npm-bundled "^1.1.1"
+    npm-normalize-package-bin "^1.0.1"
+
 npm-pick-manifest@6.1.0, npm-pick-manifest@^6.0.0:
   version "6.1.0"
   resolved "https://registry.npmjs.org/npm-pick-manifest/-/npm-pick-manifest-6.1.0.tgz#2befed87b0fce956790f62d32afb56d7539c022a"