Explorar o código

feat(core): Update search index on Product assigned/removed from Channel

Relates to #12
Michael Bromley %!s(int64=6) %!d(string=hai) anos
pai
achega
3a6c2777f5

+ 2 - 23
packages/core/e2e/channel.e2e-spec.ts

@@ -6,12 +6,10 @@ import path from 'path';
 
 import { dataDir, TEST_SETUP_TIMEOUT_MS, testConfig } from './config/test-config';
 import { initialData } from './fixtures/e2e-initial-data';
-import { PRODUCT_WITH_VARIANTS_FRAGMENT } from './graphql/fragments';
 import {
     AssignProductsToChannel,
     CreateAdministrator,
     CreateChannel,
-    CreateProduct,
     CreateRole,
     CurrencyCode,
     DeleteChannel,
@@ -23,17 +21,16 @@ import {
     Me,
     Permission,
     RemoveProductsFromChannel,
-    UpdateProduct,
 } from './graphql/generated-e2e-admin-types';
 import {
+    ASSIGN_PRODUCT_TO_CHANNEL,
     CREATE_ADMINISTRATOR,
     CREATE_CHANNEL,
-    CREATE_PRODUCT,
     CREATE_ROLE,
     GET_CUSTOMER_LIST,
     GET_PRODUCT_WITH_VARIANTS,
     ME,
-    UPDATE_PRODUCT,
+    REMOVE_PRODUCT_FROM_CHANNEL,
 } from './graphql/shared-definitions';
 import { assertThrowsWithMessage } from './utils/assert-throws-with-message';
 
@@ -401,24 +398,6 @@ const GET_CHANNELS = gql`
     }
 `;
 
-const ASSIGN_PRODUCT_TO_CHANNEL = gql`
-    mutation AssignProductsToChannel($input: AssignProductsToChannelInput!) {
-        assignProductsToChannel(input: $input) {
-            ...ProductWithVariants
-        }
-    }
-    ${PRODUCT_WITH_VARIANTS_FRAGMENT}
-`;
-
-const REMOVE_PRODUCT_FROM_CHANNEL = gql`
-    mutation RemoveProductsFromChannel($input: RemoveProductsFromChannelInput!) {
-        removeProductsFromChannel(input: $input) {
-            ...ProductWithVariants
-        }
-    }
-    ${PRODUCT_WITH_VARIANTS_FRAGMENT}
-`;
-
 const DELETE_CHANNEL = gql`
     mutation DeleteChannel($id: ID!) {
         deleteChannel(id: $id) {

+ 62 - 1
packages/core/e2e/default-search-plugin.e2e-spec.ts

@@ -1,18 +1,22 @@
 import { pick } from '@vendure/common/lib/pick';
 import { DefaultSearchPlugin, mergeConfig } from '@vendure/core';
 import { facetValueCollectionFilter } from '@vendure/core/dist/config/collection/default-collection-filters';
-import { createTestEnvironment, SimpleGraphQLClient } from '@vendure/testing';
+import { createTestEnvironment, E2E_DEFAULT_CHANNEL_TOKEN, SimpleGraphQLClient } from '@vendure/testing';
 import gql from 'graphql-tag';
 import path from 'path';
 
 import { dataDir, TEST_SETUP_TIMEOUT_MS, testConfig } from './config/test-config';
 import { initialData } from './fixtures/e2e-initial-data';
 import {
+    AssignProductsToChannel,
+    CreateChannel,
     CreateCollection,
     CreateFacet,
+    CurrencyCode,
     DeleteProduct,
     DeleteProductVariant,
     LanguageCode,
+    RemoveProductsFromChannel,
     SearchFacetValues,
     SearchGetPrices,
     SearchInput,
@@ -23,10 +27,13 @@ import {
 } from './graphql/generated-e2e-admin-types';
 import { SearchProductsShop } from './graphql/generated-e2e-shop-types';
 import {
+    ASSIGN_PRODUCT_TO_CHANNEL,
+    CREATE_CHANNEL,
     CREATE_COLLECTION,
     CREATE_FACET,
     DELETE_PRODUCT,
     DELETE_PRODUCT_VARIANT,
+    REMOVE_PRODUCT_FROM_CHANNEL,
     UPDATE_COLLECTION,
     UPDATE_PRODUCT,
     UPDATE_PRODUCT_VARIANTS,
@@ -616,6 +623,60 @@ describe('Default search plugin', () => {
                 ]);
             });
         });
+
+        describe('channel handling', () => {
+            const SECOND_CHANNEL_TOKEN = 'second-channel-token';
+            let secondChannel: CreateChannel.CreateChannel;
+
+            beforeAll(async () => {
+                const { createChannel } = await adminClient.query<
+                    CreateChannel.Mutation,
+                    CreateChannel.Variables
+                >(CREATE_CHANNEL, {
+                    input: {
+                        code: 'second-channel',
+                        token: SECOND_CHANNEL_TOKEN,
+                        defaultLanguageCode: LanguageCode.en,
+                        currencyCode: CurrencyCode.GBP,
+                        pricesIncludeTax: true,
+                    },
+                });
+                secondChannel = createChannel;
+            });
+
+            it('adding product to channel', async () => {
+                adminClient.setChannelToken(E2E_DEFAULT_CHANNEL_TOKEN);
+                await adminClient.query<AssignProductsToChannel.Mutation, AssignProductsToChannel.Variables>(
+                    ASSIGN_PRODUCT_TO_CHANNEL,
+                    {
+                        input: { channelId: secondChannel.id, productIds: ['T_1', 'T_2'] },
+                    },
+                );
+                await awaitRunningJobs(adminClient);
+
+                adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
+                const { search } = await doAdminSearchQuery({ groupByProduct: true });
+                expect(search.items.map(i => i.productId)).toEqual(['T_1', 'T_2']);
+            }, 10000);
+
+            it('removing product from channel', async () => {
+                adminClient.setChannelToken(E2E_DEFAULT_CHANNEL_TOKEN);
+                const { removeProductsFromChannel } = await adminClient.query<
+                    RemoveProductsFromChannel.Mutation,
+                    RemoveProductsFromChannel.Variables
+                >(REMOVE_PRODUCT_FROM_CHANNEL, {
+                    input: {
+                        productIds: ['T_2'],
+                        channelId: secondChannel.id,
+                    },
+                });
+                await awaitRunningJobs(adminClient);
+
+                adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
+                const { search } = await doAdminSearchQuery({ groupByProduct: true });
+                expect(search.items.map(i => i.productId)).toEqual(['T_1']);
+            }, 10000);
+        });
     });
 });
 

+ 18 - 0
packages/core/e2e/graphql/shared-definitions.ts

@@ -319,3 +319,21 @@ export const DELETE_PRODUCT_VARIANT = gql`
         }
     }
 `;
+
+export const ASSIGN_PRODUCT_TO_CHANNEL = gql`
+    mutation AssignProductsToChannel($input: AssignProductsToChannelInput!) {
+        assignProductsToChannel(input: $input) {
+            ...ProductWithVariants
+        }
+    }
+    ${PRODUCT_WITH_VARIANTS_FRAGMENT}
+`;
+
+export const REMOVE_PRODUCT_FROM_CHANNEL = gql`
+    mutation RemoveProductsFromChannel($input: RemoveProductsFromChannelInput!) {
+        removeProductsFromChannel(input: $input) {
+            ...ProductWithVariants
+        }
+    }
+    ${PRODUCT_WITH_VARIANTS_FRAGMENT}
+`;

+ 12 - 0
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -5,6 +5,7 @@ import { buffer, debounceTime, filter, map } from 'rxjs/operators';
 import { idsAreEqual } from '../../common/utils';
 import { EventBus } from '../../event-bus/event-bus';
 import { CollectionModificationEvent } from '../../event-bus/events/collection-modification-event';
+import { ProductChannelEvent } from '../../event-bus/events/product-channel-event';
 import { ProductEvent } from '../../event-bus/events/product-event';
 import { ProductVariantEvent } from '../../event-bus/events/product-variant-event';
 import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
@@ -79,6 +80,17 @@ export class DefaultSearchPlugin implements OnVendureBootstrap {
                 return this.searchIndexService.updateVariants(event.ctx, event.variants).start();
             }
         });
+        this.eventBus.ofType(ProductChannelEvent).subscribe(event => {
+            if (event.type === 'assigned') {
+                return this.searchIndexService
+                    .assignProductToChannel(event.ctx, event.product.id, event.channelId)
+                    .start();
+            } else {
+                return this.searchIndexService
+                    .removeProductFromChannel(event.ctx, event.product.id, event.channelId)
+                    .start();
+            }
+        });
 
         const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent);
         const closingNotifier$ = collectionModification$.pipe(debounceTime(50));

+ 93 - 47
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -19,9 +19,11 @@ import { TaxRateService } from '../../../service/services/tax-rate.service';
 import { AsyncQueue } from '../async-queue';
 import { SearchIndexItem } from '../search-index-item.entity';
 import {
+    AssignProductToChannelMessage,
     DeleteProductMessage,
     DeleteVariantMessage,
     ReindexMessage,
+    RemoveProductFromChannelMessage,
     UpdateProductMessage,
     UpdateVariantMessage,
     UpdateVariantsByIdMessage,
@@ -84,7 +86,7 @@ export class IndexerController {
                         .skip(i * BATCH_SIZE)
                         .getMany();
                     const hydratedVariants = this.hydrateVariants(ctx, variants);
-                    await this.saveVariants(ctx, hydratedVariants);
+                    await this.saveVariants(ctx.languageCode, ctx.channelId, hydratedVariants);
                     observer.next({
                         total: count,
                         completed: Math.min((i + 1) * BATCH_SIZE, count),
@@ -127,7 +129,7 @@ export class IndexerController {
                                 relations: variantRelations,
                             });
                         const variants = this.hydrateVariants(ctx, batch);
-                        await this.saveVariants(ctx, variants);
+                        await this.saveVariants(ctx.languageCode, ctx.channelId, variants);
                         observer.next({
                             total: ids.length,
                             completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
@@ -150,25 +152,7 @@ export class IndexerController {
     updateProduct(data: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
         return defer(async () => {
-            const product = await this.connection.getRepository(Product).findOne(data.productId, {
-                relations: ['variants'],
-            });
-            if (product) {
-                let updatedVariants = await this.connection
-                    .getRepository(ProductVariant)
-                    .findByIds(product.variants.map(v => v.id), {
-                        relations: variantRelations,
-                    });
-                if (product.enabled === false) {
-                    updatedVariants.forEach(v => (v.enabled = false));
-                }
-                Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
-                updatedVariants = this.hydrateVariants(ctx, updatedVariants);
-                if (updatedVariants.length) {
-                    await this.saveVariants(ctx, updatedVariants);
-                }
-            }
-            return true;
+            return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
         });
     }
 
@@ -176,15 +160,7 @@ export class IndexerController {
     updateVariants(data: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
         return defer(async () => {
-            const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds, {
-                relations: variantRelations,
-            });
-            if (variants) {
-                const updatedVariants = this.hydrateVariants(ctx, variants);
-                Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
-                await this.saveVariants(ctx, updatedVariants);
-            }
-            return true;
+            return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
         });
     }
 
@@ -192,16 +168,7 @@ export class IndexerController {
     deleteProduct(data: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
         return defer(async () => {
-            const product = await this.connection.getRepository(Product).findOne(data.productId, {
-                relations: ['variants'],
-            });
-            if (product && product.deletedAt) {
-                const removedVariantIds = product.variants.map(v => v.id);
-                if (removedVariantIds.length) {
-                    await this.removeSearchIndexItems(ctx, removedVariantIds);
-                }
-            }
-            return true;
+            return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
         });
     }
 
@@ -211,12 +178,91 @@ export class IndexerController {
         return defer(async () => {
             const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
             if (variants.length) {
-                await this.removeSearchIndexItems(ctx, variants.map(v => v.id));
+                await this.removeSearchIndexItems(ctx.languageCode, ctx.channelId, variants.map(v => v.id));
             }
             return true;
         });
     }
 
+    @MessagePattern(AssignProductToChannelMessage.pattern)
+    assignProductToChannel(
+        data: AssignProductToChannelMessage['data'],
+    ): Observable<AssignProductToChannelMessage['response']> {
+        const ctx = RequestContext.fromObject(data.ctx);
+        return defer(async () => {
+            return this.updateProductInChannel(ctx, data.productId, data.channelId);
+        });
+    }
+
+    @MessagePattern(RemoveProductFromChannelMessage.pattern)
+    removeProductFromChannel(
+        data: RemoveProductFromChannelMessage['data'],
+    ): Observable<RemoveProductFromChannelMessage['response']> {
+        const ctx = RequestContext.fromObject(data.ctx);
+        return defer(async () => {
+            return this.deleteProductInChannel(ctx, data.productId, data.channelId);
+        });
+    }
+
+    private async updateProductInChannel(
+        ctx: RequestContext,
+        productId: ID,
+        channelId: ID,
+    ): Promise<boolean> {
+        const product = await this.connection.getRepository(Product).findOne(productId, {
+            relations: ['variants'],
+        });
+        if (product) {
+            let updatedVariants = await this.connection
+                .getRepository(ProductVariant)
+                .findByIds(product.variants.map(v => v.id), {
+                    relations: variantRelations,
+                });
+            if (product.enabled === false) {
+                updatedVariants.forEach(v => (v.enabled = false));
+            }
+            Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
+            updatedVariants = this.hydrateVariants(ctx, updatedVariants);
+            if (updatedVariants.length) {
+                await this.saveVariants(ctx.languageCode, channelId, updatedVariants);
+            }
+        }
+        return true;
+    }
+
+    private async updateVariantsInChannel(
+        ctx: RequestContext,
+        variantIds: ID[],
+        channelId: ID,
+    ): Promise<boolean> {
+        const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
+            relations: variantRelations,
+        });
+        if (variants) {
+            const updatedVariants = this.hydrateVariants(ctx, variants);
+            Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
+            await this.saveVariants(ctx.languageCode, channelId, updatedVariants);
+        }
+        return true;
+    }
+
+    private async deleteProductInChannel(
+        ctx: RequestContext,
+        productId: ID,
+        channelId: ID,
+    ): Promise<boolean> {
+        const product = await this.connection.getRepository(Product).findOne(productId, {
+            relations: ['variants'],
+        });
+        if (product) {
+            const removedVariantIds = product.variants.map(v => v.id);
+            if (removedVariantIds.length) {
+                await this.removeSearchIndexItems(ctx.languageCode, channelId, removedVariantIds);
+            }
+        }
+        return true;
+    }
+
     private getSearchIndexQueryBuilder(channelId: ID) {
         const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
         FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
@@ -239,13 +285,13 @@ export class IndexerController {
             .map(v => translateDeep(v, ctx.languageCode, ['product']));
     }
 
-    private async saveVariants(ctx: RequestContext, variants: ProductVariant[]) {
+    private async saveVariants(languageCode: LanguageCode, channelId: ID, variants: ProductVariant[]) {
         const items = variants.map(
             (v: ProductVariant) =>
                 new SearchIndexItem({
                     productVariantId: v.id,
-                    channelId: ctx.channelId,
-                    languageCode: ctx.languageCode,
+                    channelId,
+                    languageCode,
                     sku: v.sku,
                     enabled: v.enabled,
                     slug: v.product.slug,
@@ -283,11 +329,11 @@ export class IndexerController {
     /**
      * Remove items from the search index
      */
-    private async removeSearchIndexItems(ctx: RequestContext, variantIds: ID[]) {
+    private async removeSearchIndexItems(languageCode: LanguageCode, channelId: ID, variantIds: ID[]) {
         const compositeKeys = variantIds.map(id => ({
             productVariantId: id,
-            channelId: ctx.channelId,
-            languageCode: ctx.languageCode,
+            channelId,
+            languageCode,
         })) as any[];
         await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(compositeKeys));
     }

+ 18 - 0
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -10,10 +10,12 @@ import { JobReporter, JobService } from '../../../service/services/job.service';
 import { WorkerMessage } from '../../../worker/types';
 import { WorkerService } from '../../../worker/worker.service';
 import {
+    AssignProductToChannelMessage,
     DeleteProductMessage,
     DeleteVariantMessage,
     ReindexMessage,
     ReindexMessageResponse,
+    RemoveProductFromChannelMessage,
     UpdateProductMessage,
     UpdateVariantMessage,
     UpdateVariantsByIdMessage,
@@ -86,6 +88,22 @@ export class SearchIndexService {
         });
     }
 
+    assignProductToChannel(ctx: RequestContext, productId: ID, channelId: ID) {
+        const data = { ctx, productId, channelId };
+        return this.createShortWorkerJob(new AssignProductToChannelMessage(data), {
+            entity: 'Product',
+            id: productId,
+        });
+    }
+
+    removeProductFromChannel(ctx: RequestContext, productId: ID, channelId: ID) {
+        const data = { ctx, productId, channelId };
+        return this.createShortWorkerJob(new RemoveProductFromChannelMessage(data), {
+            entity: 'Product',
+            id: productId,
+        });
+    }
+
     /**
      * Creates a short-running job that does not expect progress updates.
      */

+ 12 - 0
packages/core/src/plugin/default-search-plugin/types.ts

@@ -24,6 +24,12 @@ export interface UpdateVariantsByIdMessageData {
     ids: ID[];
 }
 
+export interface ProductChannelMessageData {
+    ctx: RequestContext;
+    productId: ID;
+    channelId: ID;
+}
+
 export class ReindexMessage extends WorkerMessage<{ ctx: RequestContext }, ReindexMessageResponse> {
     static readonly pattern = 'Reindex';
 }
@@ -45,3 +51,9 @@ export class UpdateVariantsByIdMessage extends WorkerMessage<
 > {
     static readonly pattern = 'UpdateVariantsById';
 }
+export class AssignProductToChannelMessage extends WorkerMessage<ProductChannelMessageData, boolean> {
+    static readonly pattern = 'AssignProductToChannel';
+}
+export class RemoveProductFromChannelMessage extends WorkerMessage<ProductChannelMessageData, boolean> {
+    static readonly pattern = 'RemoveProductFromChannel';
+}