浏览代码

fix(elasticsearch-plugin): Update search index for all channels on updates

Relates to #629
Michael Bromley 5 年之前
父节点
当前提交
2be29c28be

+ 31 - 0
packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts

@@ -855,6 +855,37 @@ describe('Elasticsearch plugin', () => {
                     'T_4',
                 ]);
             });
+
+            it('updating product affects current channel', async () => {
+                adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
+                const { updateProduct } = await adminClient.query<
+                    UpdateProduct.Mutation,
+                    UpdateProduct.Variables
+                >(UPDATE_PRODUCT, {
+                    input: {
+                        id: 'T_3',
+                        enabled: true,
+                        translations: [{ languageCode: LanguageCode.en, name: 'xyz' }],
+                    },
+                });
+
+                await awaitRunningJobs(adminClient);
+
+                const { search: searchGrouped } = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: true,
+                    term: 'xyz',
+                });
+                expect(searchGrouped.items.map(i => i.productName)).toEqual(['xyz']);
+            });
+
+            it('updating product affects other channels', async () => {
+                adminClient.setChannelToken(E2E_DEFAULT_CHANNEL_TOKEN);
+                const { search: searchGrouped } = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: true,
+                    term: 'xyz',
+                });
+                expect(searchGrouped.items.map(i => i.productName)).toEqual(['xyz']);
+            });
         });
 
         describe('multiple language handling', () => {

+ 74 - 39
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -6,6 +6,7 @@ import {
     Asset,
     asyncObservable,
     AsyncQueue,
+    Collection,
     ConfigService,
     FacetValue,
     ID,
@@ -60,6 +61,7 @@ export const variantRelations = [
     'collections',
     'taxCategory',
     'channels',
+    'channels.defaultTaxZone',
 ];
 
 export interface ReindexMessageResponse {
@@ -101,7 +103,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     }: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
         const ctx = RequestContext.deserialize(rawContext);
         return asyncObservable(async () => {
-            await this.updateProductInternal(ctx, productId, ctx.channelId);
+            await this.updateProductInternal(ctx, productId);
             return true;
         });
     }
@@ -138,7 +140,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     }: AssignProductToChannelMessage['data']): Observable<AssignProductToChannelMessage['response']> {
         const ctx = RequestContext.deserialize(rawContext);
         return asyncObservable(async () => {
-            await this.updateProductInternal(ctx, productId, channelId);
+            await this.updateProductInternal(ctx, productId);
             const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
             await this.updateVariantsInternal(
                 ctx,
@@ -236,7 +238,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 .findByIds(variantIds, { relations: ['product'] });
             const productIds = unique(variants.map(v => v.product.id));
             for (const productId of productIds) {
-                await this.updateProductInternal(ctx, productId, ctx.channelId);
+                await this.updateProductInternal(ctx, productId);
             }
             await this.deleteVariantsInternal(variants, ctx.channelId);
             return true;
@@ -529,8 +531,6 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
     }
 
     private async updateVariantsInternal(ctx: RequestContext, variantIds: ID[], channelId: ID) {
-        let updatedVariants: ProductVariant[] = [];
-
         const productVariants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
             relations: variantRelations,
             where: {
@@ -540,37 +540,46 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
                 id: 'ASC',
             },
         });
-        updatedVariants = this.hydrateVariants(ctx, productVariants);
 
-        if (updatedVariants.length) {
+        if (productVariants.length) {
             // When ProductVariants change, we need to update the corresponding Product index
             // since e.g. price changes must be reflected on the Product level too.
-            const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
+            const productIdsOfVariants = unique(productVariants.map(v => v.productId));
             for (const variantProductId of productIdsOfVariants) {
-                await this.updateProductInternal(ctx, variantProductId, channelId);
+                await this.updateProductInternal(ctx, variantProductId);
             }
             const operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
-            for (const variant of updatedVariants) {
+            for (const variant of productVariants) {
                 const languageVariants = variant.translations.map(t => t.languageCode);
-                for (const languageCode of languageVariants) {
-                    operations.push(
-                        { update: { _id: this.getId(variant.id, channelId, languageCode) } },
-                        {
-                            doc: this.createVariantIndexItem(variant, channelId, languageCode),
-                            doc_as_upsert: true,
-                        },
-                    );
+                for (const channel of variant.channels) {
+                    const channelCtx = new RequestContext({
+                        channel,
+                        apiType: 'admin',
+                        authorizedAsOwnerOnly: false,
+                        isAuthorized: true,
+                        session: {} as any,
+                    });
+                    this.productVariantService.applyChannelPriceAndTax(variant, ctx);
+                    for (const languageCode of languageVariants) {
+                        operations.push(
+                            { update: { _id: this.getId(variant.id, channel.id, languageCode) } },
+                            {
+                                doc: this.createVariantIndexItem(variant, channel.id, languageCode),
+                                doc_as_upsert: true,
+                            },
+                        );
+                    }
                 }
             }
-            Logger.verbose(`Updating ${updatedVariants.length} ProductVariants`, loggerCtx);
+            Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx);
             await this.executeBulkOperations(VARIANT_INDEX_NAME, operations);
         }
     }
 
-    private async updateProductInternal(ctx: RequestContext, productId: ID, channelId: ID) {
+    private async updateProductInternal(ctx: RequestContext, productId: ID) {
         let updatedProductVariants: ProductVariant[] = [];
         const product = await this.connection.getRepository(Product).findOne(productId, {
-            relations: ['variants'],
+            relations: ['variants', 'channels', 'channels.defaultTaxZone'],
         });
         if (product) {
             updatedProductVariants = await this.connection.getRepository(ProductVariant).findByIds(
@@ -588,23 +597,44 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
 
             if (updatedProductVariants.length) {
                 Logger.verbose(`Updating 1 Product (${productId})`, loggerCtx);
-                updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
                 const operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
                 const languageVariants = product.translations.map(t => t.languageCode);
-                for (const languageCode of languageVariants) {
-                    const updatedProductIndexItem = this.createProductIndexItem(
-                        updatedProductVariants,
-                        channelId,
-                        languageCode,
+
+                for (const channel of product.channels) {
+                    const channelCtx = new RequestContext({
+                        channel,
+                        apiType: 'admin',
+                        authorizedAsOwnerOnly: false,
+                        isAuthorized: true,
+                        session: {} as any,
+                    });
+
+                    const variantsInChannel = updatedProductVariants.filter(v =>
+                        v.channels.map(c => c.id).includes(channel.id),
                     );
-                    operations.push(
-                        {
-                            update: {
-                                _id: this.getId(updatedProductIndexItem.productId, channelId, languageCode),
+                    for (const variant of variantsInChannel) {
+                        this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
+                    }
+
+                    for (const languageCode of languageVariants) {
+                        const updatedProductIndexItem = this.createProductIndexItem(
+                            variantsInChannel,
+                            channel.id,
+                            languageCode,
+                        );
+                        operations.push(
+                            {
+                                update: {
+                                    _id: this.getId(
+                                        updatedProductIndexItem.productId,
+                                        channel.id,
+                                        languageCode,
+                                    ),
+                                },
                             },
-                        },
-                        { doc: updatedProductIndexItem, doc_as_upsert: true },
-                    );
+                            { doc: updatedProductIndexItem, doc_as_upsert: true },
+                        );
+                    }
                 }
                 await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
             }
@@ -743,6 +773,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         const variantAsset = v.featuredAsset;
         const productTranslation = this.getTranslation(v.product, languageCode);
         const variantTranslation = this.getTranslation(v, languageCode);
+        const collectionTranslations = v.collections.map(c => this.getTranslation(c, languageCode));
 
         const item: VariantIndexItem = {
             channelId,
@@ -767,7 +798,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
             channelIds: v.channels.map(c => c.id),
             facetValueIds: this.getFacetValueIds([v]),
             collectionIds: v.collections.map(c => c.id.toString()),
-            collectionSlugs: v.collections.map(c => c.slug),
+            collectionSlugs: collectionTranslations.map(c => c.slug),
             enabled: v.enabled && v.product.enabled,
         };
         const customMappings = Object.entries(this.options.customProductVariantMappings);
@@ -791,6 +822,13 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
             : null;
         const productTranslation = this.getTranslation(first.product, languageCode);
         const variantTranslation = this.getTranslation(first, languageCode);
+        const collectionTranslations = variants.reduce(
+            (translations, variant) => [
+                ...translations,
+                ...variant.collections.map(c => this.getTranslation(c, languageCode)),
+            ],
+            [] as Array<Translation<Collection>>,
+        );
 
         const item: ProductIndexItem = {
             channelId,
@@ -816,10 +854,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
             facetIds: this.getFacetIds(variants),
             facetValueIds: this.getFacetValueIds(variants),
             collectionIds: variants.reduce((ids, v) => [...ids, ...v.collections.map(c => c.id)], [] as ID[]),
-            collectionSlugs: variants.reduce(
-                (ids, v) => [...ids, ...v.collections.map(c => c.slug)],
-                [] as string[],
-            ),
+            collectionSlugs: collectionTranslations.map(c => c.slug),
             channelIds: first.product.channels.map(c => c.id),
             enabled: variants.some(v => v.enabled) && first.product.enabled,
         };