Browse Source

refactor(elasticsearch-plugin): ElasticsearchPlugin variant channels

Michael Bromley 5 years ago
parent
commit
83842998c1

+ 1 - 0
packages/core/src/event-bus/index.ts

@@ -18,5 +18,6 @@ export * from './events/payment-state-transition-event';
 export * from './events/product-event';
 export * from './events/product-channel-event';
 export * from './events/product-variant-event';
+export * from './events/product-variant-channel-event';
 export * from './events/refund-state-transition-event';
 export * from './events/tax-rate-modification-event';

+ 1 - 1
packages/core/src/job-queue/job.ts

@@ -132,7 +132,7 @@ export class Job<T extends JobData<T> = any> {
      * Sets the progress (0 - 100) of the job.
      */
     setProgress(percent: number) {
-        this._progress = Math.min(percent, 100);
+        this._progress = Math.min(percent || 0, 100);
         this.fireEvent('progress');
     }
 

+ 1 - 1
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -186,7 +186,7 @@ export class SearchIndexService {
                 }
                 duration = response.duration;
                 completed = response.completed;
-                const progress = Math.ceil((completed / total) * 100);
+                const progress = total === 0 ? 100 : Math.ceil((completed / total) * 100);
                 job.setProgress(progress);
             },
             complete: () => {

+ 4 - 3
packages/dev-server/dev-config.ts

@@ -11,6 +11,7 @@ import {
     PermissionDefinition,
     VendureConfig,
 } from '@vendure/core';
+import { ElasticsearchPlugin } from '@vendure/elasticsearch-plugin';
 import { defaultEmailHandlers, EmailPlugin } from '@vendure/email-plugin';
 import path from 'path';
 import { ConnectionOptions } from 'typeorm';
@@ -63,12 +64,12 @@ export const devConfig: VendureConfig = {
             assetUploadDir: path.join(__dirname, 'assets'),
             port: 5002,
         }),
-        DefaultSearchPlugin,
+        // DefaultSearchPlugin,
         DefaultJobQueuePlugin,
-        /*ElasticsearchPlugin.init({
+        ElasticsearchPlugin.init({
             host: 'http://localhost',
             port: 9200,
-        }),*/
+        }),
         EmailPlugin.init({
             devMode: true,
             handlers: defaultEmailHandlers,

+ 192 - 0
packages/elasticsearch-plugin/e2e/e2e-helpers.ts

@@ -0,0 +1,192 @@
+import { Client } from '@elastic/elasticsearch';
+import { SortOrder } from '@vendure/common/lib/generated-types';
+import { SimpleGraphQLClient } from '@vendure/testing';
+
+import { SearchGetPrices, SearchInput } from '../../core/e2e/graphql/generated-e2e-admin-types';
+import { LogicalOperator, SearchProductsShop } from '../../core/e2e/graphql/generated-e2e-shop-types';
+import { SEARCH_PRODUCTS_SHOP } from '../../core/e2e/graphql/shop-definitions';
+import { deleteIndices } from '../src/indexing-utils';
+
+import { SEARCH_GET_PRICES, SEARCH_PRODUCTS } from './elasticsearch-plugin.e2e-spec';
+import { SearchProductsAdmin } from './graphql/generated-e2e-elasticsearch-plugin-types';
+
+// tslint:disable-next-line:no-var-requires
+const { elasticsearchHost, elasticsearchPort } = require('./constants');
+
+export function doAdminSearchQuery(client: SimpleGraphQLClient, input: SearchInput) {
+    return client.query<SearchProductsAdmin.Query, SearchProductsAdmin.Variables>(SEARCH_PRODUCTS, {
+        input,
+    });
+}
+
+export async function testGroupByProduct(client: SimpleGraphQLClient) {
+    const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
+        SEARCH_PRODUCTS_SHOP,
+        {
+            input: {
+                groupByProduct: true,
+            },
+        },
+    );
+    expect(result.search.totalItems).toBe(20);
+}
+
+export async function testNoGrouping(client: SimpleGraphQLClient) {
+    const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
+        SEARCH_PRODUCTS_SHOP,
+        {
+            input: {
+                groupByProduct: false,
+            },
+        },
+    );
+    expect(result.search.totalItems).toBe(34);
+}
+
+export async function testMatchSearchTerm(client: SimpleGraphQLClient) {
+    const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
+        SEARCH_PRODUCTS_SHOP,
+        {
+            input: {
+                term: 'camera',
+                groupByProduct: true,
+            },
+        },
+    );
+    expect(result.search.items.map(i => i.productName)).toEqual([
+        'Instant Camera',
+        'Camera Lens',
+        'SLR Camera',
+    ]);
+}
+
+export async function testMatchFacetIdsAnd(client: SimpleGraphQLClient) {
+    const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
+        SEARCH_PRODUCTS_SHOP,
+        {
+            input: {
+                facetValueIds: ['T_1', 'T_2'],
+                facetValueOperator: LogicalOperator.AND,
+                groupByProduct: true,
+                sort: {
+                    name: SortOrder.ASC,
+                },
+            },
+        },
+    );
+    expect(result.search.items.map(i => i.productName)).toEqual([
+        'Clacky Keyboard',
+        'Curvy Monitor',
+        'Gaming PC',
+        'Hard Drive',
+        'Laptop',
+        'USB Cable',
+    ]);
+}
+
+export async function testMatchFacetIdsOr(client: SimpleGraphQLClient) {
+    const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
+        SEARCH_PRODUCTS_SHOP,
+        {
+            input: {
+                facetValueIds: ['T_1', 'T_5'],
+                facetValueOperator: LogicalOperator.OR,
+                groupByProduct: true,
+                sort: {
+                    name: SortOrder.ASC,
+                },
+                take: 20,
+            },
+        },
+    );
+    expect(result.search.items.map(i => i.productName)).toEqual([
+        'Bonsai Tree',
+        'Camera Lens',
+        'Clacky Keyboard',
+        'Curvy Monitor',
+        'Gaming PC',
+        'Hard Drive',
+        'Instant Camera',
+        'Laptop',
+        'Orchid',
+        'SLR Camera',
+        'Spiky Cactus',
+        'Tripod',
+        'USB Cable',
+    ]);
+}
+
+export async function testMatchCollectionId(client: SimpleGraphQLClient) {
+    const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
+        SEARCH_PRODUCTS_SHOP,
+        {
+            input: {
+                collectionId: 'T_2',
+                groupByProduct: true,
+            },
+        },
+    );
+    expect(result.search.items.map(i => i.productName)).toEqual(['Spiky Cactus', 'Orchid', 'Bonsai Tree']);
+}
+
+export async function testMatchCollectionSlug(client: SimpleGraphQLClient) {
+    const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
+        SEARCH_PRODUCTS_SHOP,
+        {
+            input: {
+                collectionSlug: 'plants',
+                groupByProduct: true,
+            },
+        },
+    );
+    expect(result.search.items.map(i => i.productName)).toEqual(['Spiky Cactus', 'Orchid', 'Bonsai Tree']);
+}
+
+export async function testSinglePrices(client: SimpleGraphQLClient) {
+    const result = await client.query<SearchGetPrices.Query, SearchGetPrices.Variables>(SEARCH_GET_PRICES, {
+        input: {
+            groupByProduct: false,
+            take: 3,
+            sort: {
+                price: SortOrder.ASC,
+            },
+        },
+    });
+    expect(result.search.items).toEqual([
+        {
+            price: { value: 799 },
+            priceWithTax: { value: 959 },
+        },
+        {
+            price: { value: 1498 },
+            priceWithTax: { value: 1798 },
+        },
+        {
+            price: { value: 1550 },
+            priceWithTax: { value: 1860 },
+        },
+    ]);
+}
+
+export async function testPriceRanges(client: SimpleGraphQLClient) {
+    const result = await client.query<SearchGetPrices.Query, SearchGetPrices.Variables>(SEARCH_GET_PRICES, {
+        input: {
+            groupByProduct: true,
+            take: 3,
+            term: 'laptop',
+        },
+    });
+    expect(result.search.items).toEqual([
+        {
+            price: { min: 129900, max: 229900 },
+            priceWithTax: { min: 155880, max: 275880 },
+        },
+    ]);
+}
+
+export async function dropElasticIndices(indexPrefix: string) {
+    const esClient = new Client({
+        node: `${elasticsearchHost}:${elasticsearchPort}`,
+    });
+    return deleteIndices(esClient, indexPrefix);
+}

+ 149 - 208
packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts

@@ -18,6 +18,7 @@ import { initialData } from '../../../e2e-common/e2e-initial-data';
 import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
 import {
     AssignProductsToChannel,
+    AssignProductVariantsToChannel,
     ChannelFragment,
     CreateChannel,
     CreateCollection,
@@ -28,6 +29,7 @@ import {
     DeleteProductVariant,
     LanguageCode,
     RemoveProductsFromChannel,
+    RemoveProductVariantsFromChannel,
     SearchFacetValues,
     SearchGetPrices,
     SearchInput,
@@ -39,6 +41,7 @@ import {
 } from '../../core/e2e/graphql/generated-e2e-admin-types';
 import { LogicalOperator, SearchProductsShop } from '../../core/e2e/graphql/generated-e2e-shop-types';
 import {
+    ASSIGN_PRODUCTVARIANT_TO_CHANNEL,
     ASSIGN_PRODUCT_TO_CHANNEL,
     CREATE_CHANNEL,
     CREATE_COLLECTION,
@@ -46,6 +49,7 @@ import {
     DELETE_ASSET,
     DELETE_PRODUCT,
     DELETE_PRODUCT_VARIANT,
+    REMOVE_PRODUCTVARIANT_FROM_CHANNEL,
     REMOVE_PRODUCT_FROM_CHANNEL,
     UPDATE_ASSET,
     UPDATE_COLLECTION,
@@ -58,8 +62,19 @@ import { awaitRunningJobs } from '../../core/e2e/utils/await-running-jobs';
 import { loggerCtx } from '../src/constants';
 import { ElasticsearchPlugin } from '../src/plugin';
 
-// tslint:disable-next-line:no-var-requires
-const { elasticsearchHost, elasticsearchPort } = require('./constants');
+import {
+    doAdminSearchQuery,
+    dropElasticIndices,
+    testGroupByProduct,
+    testMatchCollectionId,
+    testMatchCollectionSlug,
+    testMatchFacetIdsAnd,
+    testMatchFacetIdsOr,
+    testMatchSearchTerm,
+    testNoGrouping,
+    testPriceRanges,
+    testSinglePrices,
+} from './e2e-helpers';
 import {
     GetJobInfo,
     JobState,
@@ -67,6 +82,9 @@ import {
     SearchProductsAdmin,
 } from './graphql/generated-e2e-elasticsearch-plugin-types';
 
+// tslint:disable-next-line:no-var-requires
+const { elasticsearchHost, elasticsearchPort } = require('./constants');
+
 /**
  * The Elasticsearch tests sometimes take a long time in CI due to limited resources.
  * We increase the timeout to 30 seconds to prevent failure due to timeouts.
@@ -75,6 +93,8 @@ if (process.env.CI) {
     jest.setTimeout(10 * 3000);
 }
 
+const INDEX_PREFIX = 'e2e-tests';
+
 describe('Elasticsearch plugin', () => {
     const { server, adminClient, shopClient } = createTestEnvironment(
         mergeConfig(testConfig, {
@@ -89,7 +109,7 @@ describe('Elasticsearch plugin', () => {
             logger: new DefaultLogger({ level: LogLevel.Info }),
             plugins: [
                 ElasticsearchPlugin.init({
-                    indexPrefix: 'e2e-tests',
+                    indexPrefix: INDEX_PREFIX,
                     port: elasticsearchPort,
                     host: elasticsearchHost,
                 }),
@@ -99,6 +119,7 @@ describe('Elasticsearch plugin', () => {
     );
 
     beforeAll(async () => {
+        await dropElasticIndices(INDEX_PREFIX);
         await server.init({
             initialData,
             productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-full.csv'),
@@ -113,191 +134,6 @@ describe('Elasticsearch plugin', () => {
         await server.destroy();
     });
 
-    function doAdminSearchQuery(input: SearchInput) {
-        return adminClient.query<SearchProductsAdmin.Query, SearchProductsAdmin.Variables>(SEARCH_PRODUCTS, {
-            input,
-        });
-    }
-
-    async function testGroupByProduct(client: SimpleGraphQLClient) {
-        const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
-            SEARCH_PRODUCTS_SHOP,
-            {
-                input: {
-                    groupByProduct: true,
-                },
-            },
-        );
-        expect(result.search.totalItems).toBe(20);
-    }
-
-    async function testNoGrouping(client: SimpleGraphQLClient) {
-        const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
-            SEARCH_PRODUCTS_SHOP,
-            {
-                input: {
-                    groupByProduct: false,
-                },
-            },
-        );
-        expect(result.search.totalItems).toBe(34);
-    }
-
-    async function testMatchSearchTerm(client: SimpleGraphQLClient) {
-        const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
-            SEARCH_PRODUCTS_SHOP,
-            {
-                input: {
-                    term: 'camera',
-                    groupByProduct: true,
-                },
-            },
-        );
-        expect(result.search.items.map(i => i.productName)).toEqual([
-            'Instant Camera',
-            'Camera Lens',
-            'SLR Camera',
-        ]);
-    }
-
-    async function testMatchFacetIdsAnd(client: SimpleGraphQLClient) {
-        const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
-            SEARCH_PRODUCTS_SHOP,
-            {
-                input: {
-                    facetValueIds: ['T_1', 'T_2'],
-                    facetValueOperator: LogicalOperator.AND,
-                    groupByProduct: true,
-                    sort: {
-                        name: SortOrder.ASC,
-                    },
-                },
-            },
-        );
-        expect(result.search.items.map(i => i.productName)).toEqual([
-            'Clacky Keyboard',
-            'Curvy Monitor',
-            'Gaming PC',
-            'Hard Drive',
-            'Laptop',
-            'USB Cable',
-        ]);
-    }
-
-    async function testMatchFacetIdsOr(client: SimpleGraphQLClient) {
-        const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
-            SEARCH_PRODUCTS_SHOP,
-            {
-                input: {
-                    facetValueIds: ['T_1', 'T_5'],
-                    facetValueOperator: LogicalOperator.OR,
-                    groupByProduct: true,
-                    sort: {
-                        name: SortOrder.ASC,
-                    },
-                    take: 20,
-                },
-            },
-        );
-        expect(result.search.items.map(i => i.productName)).toEqual([
-            'Bonsai Tree',
-            'Camera Lens',
-            'Clacky Keyboard',
-            'Curvy Monitor',
-            'Gaming PC',
-            'Hard Drive',
-            'Instant Camera',
-            'Laptop',
-            'Orchid',
-            'SLR Camera',
-            'Spiky Cactus',
-            'Tripod',
-            'USB Cable',
-        ]);
-    }
-
-    async function testMatchCollectionId(client: SimpleGraphQLClient) {
-        const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
-            SEARCH_PRODUCTS_SHOP,
-            {
-                input: {
-                    collectionId: 'T_2',
-                    groupByProduct: true,
-                },
-            },
-        );
-        expect(result.search.items.map(i => i.productName)).toEqual([
-            'Spiky Cactus',
-            'Orchid',
-            'Bonsai Tree',
-        ]);
-    }
-
-    async function testMatchCollectionSlug(client: SimpleGraphQLClient) {
-        const result = await client.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
-            SEARCH_PRODUCTS_SHOP,
-            {
-                input: {
-                    collectionSlug: 'plants',
-                    groupByProduct: true,
-                },
-            },
-        );
-        expect(result.search.items.map(i => i.productName)).toEqual([
-            'Spiky Cactus',
-            'Orchid',
-            'Bonsai Tree',
-        ]);
-    }
-
-    async function testSinglePrices(client: SimpleGraphQLClient) {
-        const result = await client.query<SearchGetPrices.Query, SearchGetPrices.Variables>(
-            SEARCH_GET_PRICES,
-            {
-                input: {
-                    groupByProduct: false,
-                    take: 3,
-                    sort: {
-                        price: SortOrder.ASC,
-                    },
-                },
-            },
-        );
-        expect(result.search.items).toEqual([
-            {
-                price: { value: 799 },
-                priceWithTax: { value: 959 },
-            },
-            {
-                price: { value: 1498 },
-                priceWithTax: { value: 1798 },
-            },
-            {
-                price: { value: 1550 },
-                priceWithTax: { value: 1860 },
-            },
-        ]);
-    }
-
-    async function testPriceRanges(client: SimpleGraphQLClient) {
-        const result = await client.query<SearchGetPrices.Query, SearchGetPrices.Variables>(
-            SEARCH_GET_PRICES,
-            {
-                input: {
-                    groupByProduct: true,
-                    take: 3,
-                    term: 'laptop',
-                },
-            },
-        );
-        expect(result.search.items).toEqual([
-            {
-                price: { min: 129900, max: 229900 },
-                priceWithTax: { min: 155880, max: 275880 },
-            },
-        ]);
-    }
-
     describe('shop api', () => {
         it('group by product', () => testGroupByProduct(shopClient));
 
@@ -474,7 +310,10 @@ describe('Elasticsearch plugin', () => {
         describe('updating the index', () => {
             it('updates index when ProductVariants are changed', async () => {
                 await awaitRunningJobs(adminClient);
-                const { search } = await doAdminSearchQuery({ term: 'drive', groupByProduct: false });
+                const { search } = await doAdminSearchQuery(adminClient, {
+                    term: 'drive',
+                    groupByProduct: false,
+                });
                 expect(search.items.map(i => i.sku)).toEqual([
                     'IHD455T1',
                     'IHD455T2',
@@ -494,7 +333,7 @@ describe('Elasticsearch plugin', () => {
                 );
 
                 await awaitRunningJobs(adminClient);
-                const { search: search2 } = await doAdminSearchQuery({
+                const { search: search2 } = await doAdminSearchQuery(adminClient, {
                     term: 'drive',
                     groupByProduct: false,
                 });
@@ -510,7 +349,10 @@ describe('Elasticsearch plugin', () => {
 
             it('updates index when ProductVariants are deleted', async () => {
                 await awaitRunningJobs(adminClient);
-                const { search } = await doAdminSearchQuery({ term: 'drive', groupByProduct: false });
+                const { search } = await doAdminSearchQuery(adminClient, {
+                    term: 'drive',
+                    groupByProduct: false,
+                });
 
                 await adminClient.query<DeleteProductVariant.Mutation, DeleteProductVariant.Variables>(
                     DELETE_PRODUCT_VARIANT,
@@ -520,7 +362,7 @@ describe('Elasticsearch plugin', () => {
                 );
 
                 await awaitRunningJobs(adminClient);
-                const { search: search2 } = await doAdminSearchQuery({
+                const { search: search2 } = await doAdminSearchQuery(adminClient, {
                     term: 'drive',
                     groupByProduct: false,
                 });
@@ -541,7 +383,10 @@ describe('Elasticsearch plugin', () => {
                     },
                 });
                 await awaitRunningJobs(adminClient);
-                const result = await doAdminSearchQuery({ facetValueIds: ['T_2'], groupByProduct: true });
+                const result = await doAdminSearchQuery(adminClient, {
+                    facetValueIds: ['T_2'],
+                    groupByProduct: true,
+                });
                 expect(result.search.items.map(i => i.productName).sort()).toEqual([
                     'Clacky Keyboard',
                     'Curvy Monitor',
@@ -552,7 +397,10 @@ describe('Elasticsearch plugin', () => {
             });
 
             it('updates index when a Product is deleted', async () => {
-                const { search } = await doAdminSearchQuery({ facetValueIds: ['T_2'], groupByProduct: true });
+                const { search } = await doAdminSearchQuery(adminClient, {
+                    facetValueIds: ['T_2'],
+                    groupByProduct: true,
+                });
                 expect(search.items.map(i => i.productId).sort()).toEqual([
                     'T_2',
                     'T_3',
@@ -564,7 +412,7 @@ describe('Elasticsearch plugin', () => {
                     id: 'T_5',
                 });
                 await awaitRunningJobs(adminClient);
-                const { search: search2 } = await doAdminSearchQuery({
+                const { search: search2 } = await doAdminSearchQuery(adminClient, {
                     facetValueIds: ['T_2'],
                     groupByProduct: true,
                 });
@@ -598,7 +446,10 @@ describe('Elasticsearch plugin', () => {
                 await awaitRunningJobs(adminClient);
                 // add an additional check for the collection filters to update
                 await awaitRunningJobs(adminClient);
-                const result1 = await doAdminSearchQuery({ collectionId: 'T_2', groupByProduct: true });
+                const result1 = await doAdminSearchQuery(adminClient, {
+                    collectionId: 'T_2',
+                    groupByProduct: true,
+                });
 
                 expect(result1.search.items.map(i => i.productName)).toEqual([
                     'Road Bike',
@@ -610,7 +461,10 @@ describe('Elasticsearch plugin', () => {
                     'Running Shoe',
                 ]);
 
-                const result2 = await doAdminSearchQuery({ collectionSlug: 'plants', groupByProduct: true });
+                const result2 = await doAdminSearchQuery(adminClient, {
+                    collectionSlug: 'plants',
+                    groupByProduct: true,
+                });
 
                 expect(result2.search.items.map(i => i.productName)).toEqual([
                     'Road Bike',
@@ -657,7 +511,7 @@ describe('Elasticsearch plugin', () => {
                 await awaitRunningJobs(adminClient);
                 // add an additional check for the collection filters to update
                 await awaitRunningJobs(adminClient);
-                const result = await doAdminSearchQuery({
+                const result = await doAdminSearchQuery(adminClient, {
                     collectionId: createCollection.id,
                     groupByProduct: true,
                 });
@@ -698,7 +552,7 @@ describe('Elasticsearch plugin', () => {
 
             describe('asset changes', () => {
                 function searchForLaptop() {
-                    return doAdminSearchQuery({
+                    return doAdminSearchQuery(adminClient, {
                         term: 'laptop',
                         groupByProduct: true,
                         take: 1,
@@ -752,7 +606,7 @@ describe('Elasticsearch plugin', () => {
             });
 
             it('does not include deleted ProductVariants in index', async () => {
-                const { search: s1 } = await doAdminSearchQuery({
+                const { search: s1 } = await doAdminSearchQuery(adminClient, {
                     term: 'hard drive',
                     groupByProduct: false,
                 });
@@ -777,7 +631,10 @@ describe('Elasticsearch plugin', () => {
             });
 
             it('returns disabled field when not grouped', async () => {
-                const result = await doAdminSearchQuery({ groupByProduct: false, term: 'laptop' });
+                const result = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: false,
+                    term: 'laptop',
+                });
                 expect(result.search.items.map(pick(['productVariantId', 'enabled']))).toEqual([
                     { productVariantId: 'T_1', enabled: true },
                     { productVariantId: 'T_2', enabled: true },
@@ -797,7 +654,10 @@ describe('Elasticsearch plugin', () => {
                     },
                 );
                 await awaitRunningJobs(adminClient);
-                const result = await doAdminSearchQuery({ groupByProduct: true, term: 'laptop' });
+                const result = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: true,
+                    term: 'laptop',
+                });
                 expect(result.search.items.map(pick(['productId', 'enabled']))).toEqual([
                     { productId: 'T_1', enabled: true },
                 ]);
@@ -811,7 +671,11 @@ describe('Elasticsearch plugin', () => {
                     },
                 );
                 await awaitRunningJobs(adminClient);
-                const result = await doAdminSearchQuery({ groupByProduct: true, take: 3, term: 'laptop' });
+                const result = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: true,
+                    take: 3,
+                    term: 'laptop',
+                });
                 expect(result.search.items.map(pick(['productId', 'enabled']))).toEqual([
                     { productId: 'T_1', enabled: false },
                 ]);
@@ -825,7 +689,10 @@ describe('Elasticsearch plugin', () => {
                     },
                 });
                 await awaitRunningJobs(adminClient);
-                const result = await doAdminSearchQuery({ groupByProduct: true, term: 'gaming' });
+                const result = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: true,
+                    term: 'gaming',
+                });
                 expect(result.search.items.map(pick(['productId', 'enabled']))).toEqual([
                     { productId: 'T_3', enabled: false },
                 ]);
@@ -836,7 +703,7 @@ describe('Elasticsearch plugin', () => {
                 await adminClient.query<Reindex.Mutation>(REINDEX);
 
                 await awaitRunningJobs(adminClient);
-                const result = await doAdminSearchQuery({ groupByProduct: true, take: 3 });
+                const result = await doAdminSearchQuery(adminClient, { groupByProduct: true, take: 3 });
                 expect(result.search.items.map(pick(['productId', 'enabled']))).toEqual([
                     { productId: 'T_1', enabled: false },
                     { productId: 'T_2', enabled: true },
@@ -865,6 +732,21 @@ describe('Elasticsearch plugin', () => {
                     },
                 });
                 secondChannel = createChannel as ChannelFragment;
+
+                adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
+                await adminClient.query<Reindex.Mutation>(REINDEX);
+                await awaitRunningJobs(adminClient);
+            });
+
+            it('new channel is initially empty', async () => {
+                const { search: searchGrouped } = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: true,
+                });
+                const { search: searchUngrouped } = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: false,
+                });
+                expect(searchGrouped.totalItems).toEqual(0);
+                expect(searchUngrouped.totalItems).toEqual(0);
             });
 
             it('adding product to channel', async () => {
@@ -878,7 +760,7 @@ describe('Elasticsearch plugin', () => {
                 await awaitRunningJobs(adminClient);
 
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
-                const { search } = await doAdminSearchQuery({ groupByProduct: true });
+                const { search } = await doAdminSearchQuery(adminClient, { groupByProduct: true });
                 expect(search.items.map(i => i.productId).sort()).toEqual(['T_1', 'T_2']);
             });
 
@@ -896,7 +778,7 @@ describe('Elasticsearch plugin', () => {
                 await awaitRunningJobs(adminClient);
 
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
-                const { search } = await doAdminSearchQuery({ groupByProduct: true });
+                const { search } = await doAdminSearchQuery(adminClient, { groupByProduct: true });
                 expect(search.items.map(i => i.productId)).toEqual(['T_1']);
             });
 
@@ -912,9 +794,67 @@ describe('Elasticsearch plugin', () => {
                 );
                 expect(job!.state).toBe(JobState.COMPLETED);
 
-                const { search } = await doAdminSearchQuery({ groupByProduct: true });
+                const { search } = await doAdminSearchQuery(adminClient, { groupByProduct: true });
                 expect(search.items.map(i => i.productId).sort()).toEqual(['T_1']);
             });
+
+            it('adding product variant to channel', async () => {
+                adminClient.setChannelToken(E2E_DEFAULT_CHANNEL_TOKEN);
+                await adminClient.query<
+                    AssignProductVariantsToChannel.Mutation,
+                    AssignProductVariantsToChannel.Variables
+                >(ASSIGN_PRODUCTVARIANT_TO_CHANNEL, {
+                    input: { channelId: secondChannel.id, productVariantIds: ['T_10', 'T_15'] },
+                });
+                await awaitRunningJobs(adminClient);
+
+                adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
+
+                const { search: searchGrouped } = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: true,
+                });
+                expect(searchGrouped.items.map(i => i.productId).sort()).toEqual(['T_1', 'T_3', 'T_4']);
+
+                const { search: searchUngrouped } = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: false,
+                });
+                expect(searchUngrouped.items.map(i => i.productVariantId).sort()).toEqual([
+                    'T_1',
+                    'T_10',
+                    'T_15',
+                    'T_2',
+                    'T_3',
+                    'T_4',
+                ]);
+            });
+
+            it('removing product variant from channel', async () => {
+                adminClient.setChannelToken(E2E_DEFAULT_CHANNEL_TOKEN);
+                await adminClient.query<
+                    RemoveProductVariantsFromChannel.Mutation,
+                    RemoveProductVariantsFromChannel.Variables
+                >(REMOVE_PRODUCTVARIANT_FROM_CHANNEL, {
+                    input: { channelId: secondChannel.id, productVariantIds: ['T_1', 'T_15'] },
+                });
+                await awaitRunningJobs(adminClient);
+
+                adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
+
+                const { search: searchGrouped } = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: true,
+                });
+                expect(searchGrouped.items.map(i => i.productId).sort()).toEqual(['T_1', 'T_3']);
+
+                const { search: searchUngrouped } = await doAdminSearchQuery(adminClient, {
+                    groupByProduct: false,
+                });
+                expect(searchUngrouped.items.map(i => i.productVariantId).sort()).toEqual([
+                    'T_10',
+                    'T_2',
+                    'T_3',
+                    'T_4',
+                ]);
+            });
         });
 
         describe('multiple language handling', () => {
@@ -935,6 +875,7 @@ describe('Elasticsearch plugin', () => {
             }
 
             beforeAll(async () => {
+                adminClient.setChannelToken(E2E_DEFAULT_CHANNEL_TOKEN);
                 const { updateProduct } = await adminClient.query<
                     UpdateProduct.Mutation,
                     UpdateProduct.Variables

+ 31 - 5
packages/elasticsearch-plugin/src/elasticsearch-index.service.ts

@@ -17,11 +17,13 @@ import {
 import { ReindexMessageResponse } from './indexer.controller';
 import {
     AssignProductToChannelMessage,
+    AssignVariantToChannelMessage,
     DeleteAssetMessage,
     DeleteProductMessage,
     DeleteVariantMessage,
     ReindexMessage,
     RemoveProductFromChannelMessage,
+    RemoveVariantFromChannelMessage,
     UpdateAssetMessage,
     UpdateIndexQueueJobData,
     UpdateProductMessage,
@@ -39,7 +41,7 @@ export class ElasticsearchIndexService {
         updateIndexQueue = this.jobService.createQueue({
             name: 'update-search-index',
             concurrency: 1,
-            process: (job) => {
+            process: job => {
                 const data = job.data;
                 switch (data.type) {
                     case 'reindex':
@@ -73,6 +75,12 @@ export class ElasticsearchIndexService {
                     case 'remove-product-from-channel':
                         this.sendMessage(job, new RemoveProductFromChannelMessage(data));
                         break;
+                    case 'assign-variant-to-channel':
+                        this.sendMessage(job, new AssignVariantToChannelMessage(data));
+                        break;
+                    case 'remove-variant-from-channel':
+                        this.sendMessage(job, new RemoveVariantFromChannelMessage(data));
+                        break;
                     default:
                         assertNever(data);
                 }
@@ -89,7 +97,7 @@ export class ElasticsearchIndexService {
     }
 
     updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
-        const variantIds = variants.map((v) => v.id);
+        const variantIds = variants.map(v => v.id);
         this.addJobToQueue({ type: 'update-variants', ctx: ctx.serialize(), variantIds });
     }
 
@@ -98,7 +106,7 @@ export class ElasticsearchIndexService {
     }
 
     deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
-        const variantIds = variants.map((v) => v.id);
+        const variantIds = variants.map(v => v.id);
         this.addJobToQueue({ type: 'delete-variant', ctx: ctx.serialize(), variantIds });
     }
 
@@ -120,6 +128,24 @@ export class ElasticsearchIndexService {
         });
     }
 
+    assignVariantToChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
+        this.addJobToQueue({
+            type: 'assign-variant-to-channel',
+            ctx: ctx.serialize(),
+            productVariantId,
+            channelId,
+        });
+    }
+
+    removeVariantFromChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
+        this.addJobToQueue({
+            type: 'remove-variant-from-channel',
+            ctx: ctx.serialize(),
+            productVariantId,
+            channelId,
+        });
+    }
+
     updateVariantsById(ctx: RequestContext, ids: ID[]) {
         this.addJobToQueue({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids });
     }
@@ -141,7 +167,7 @@ export class ElasticsearchIndexService {
     private sendMessage(job: Job<any>, message: WorkerMessage<any, any>) {
         this.workerService.send(message).subscribe({
             complete: () => job.complete(true),
-            error: (err) => {
+            error: err => {
                 Logger.error(err);
                 job.fail(err);
             },
@@ -159,7 +185,7 @@ export class ElasticsearchIndexService {
                 }
                 duration = response.duration;
                 completed = response.completed;
-                const progress = Math.ceil((completed / total) * 100);
+                const progress = total === 0 ? 100 : Math.ceil((completed / total) * 100);
                 job.setProgress(progress);
             },
             complete: () => {

+ 41 - 1
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -9,6 +9,7 @@ import {
     ConfigService,
     FacetValue,
     ID,
+    idsAreEqual,
     LanguageCode,
     Logger,
     Product,
@@ -29,6 +30,7 @@ import { createIndices, deleteByChannel, deleteIndices } from './indexing-utils'
 import { ElasticsearchOptions } from './options';
 import {
     AssignProductToChannelMessage,
+    AssignVariantToChannelMessage,
     BulkOperation,
     BulkOperationDoc,
     BulkResponseBody,
@@ -38,6 +40,7 @@ import {
     ProductIndexItem,
     ReindexMessage,
     RemoveProductFromChannelMessage,
+    RemoveVariantFromChannelMessage,
     UpdateAssetMessage,
     UpdateProductMessage,
     UpdateVariantMessage,
@@ -56,6 +59,7 @@ export const variantRelations = [
     'facetValues.facet',
     'collections',
     'taxCategory',
+    'channels',
 ];
 
 export interface ReindexMessageResponse {
@@ -167,6 +171,42 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         });
     }
 
+    @MessagePattern(AssignVariantToChannelMessage.pattern)
+    assignVariantToChannel({
+        ctx: rawContext,
+        productVariantId,
+        channelId,
+    }: AssignVariantToChannelMessage['data']): Observable<AssignVariantToChannelMessage['response']> {
+        const ctx = RequestContext.deserialize(rawContext);
+        return asyncObservable(async () => {
+            await this.updateVariantsInternal(ctx, [productVariantId], channelId);
+            return true;
+        });
+    }
+
+    @MessagePattern(RemoveVariantFromChannelMessage.pattern)
+    removeVariantFromChannel({
+        ctx: rawContext,
+        productVariantId,
+        channelId,
+    }: AssignVariantToChannelMessage['data']): Observable<AssignVariantToChannelMessage['response']> {
+        const ctx = RequestContext.deserialize(rawContext);
+        return asyncObservable(async () => {
+            const productVariant = await this.connection.getEntityOrThrow(
+                ctx,
+                ProductVariant,
+                productVariantId,
+                { relations: ['product', 'product.channels'] },
+            );
+            await this.deleteVariantsInternal([productVariant], channelId);
+
+            if (!productVariant.product.channels.find(c => idsAreEqual(c.id, channelId))) {
+                await this.deleteProductInternal(productVariant.product, channelId);
+            }
+            return true;
+        });
+    }
+
     /**
      * Updates the search index only for the affected entities.
      */
@@ -724,7 +764,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
             currencyCode: v.currencyCode,
             description: productTranslation.description,
             facetIds: this.getFacetIds([v]),
-            channelIds: v.product.channels.map(c => c.id),
+            channelIds: v.channels.map(c => c.id),
             facetValueIds: this.getFacetValueIds([v]),
             collectionIds: v.collections.map(c => c.id.toString()),
             collectionSlugs: v.collections.map(c => c.slug),

+ 17 - 0
packages/elasticsearch-plugin/src/plugin.ts

@@ -11,6 +11,7 @@ import {
     PluginCommonModule,
     ProductChannelEvent,
     ProductEvent,
+    ProductVariantChannelEvent,
     ProductVariantEvent,
     TaxRateModificationEvent,
     Type,
@@ -295,6 +296,22 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
             }
         });
 
+        this.eventBus.ofType(ProductVariantChannelEvent).subscribe(event => {
+            if (event.type === 'assigned') {
+                return this.elasticsearchIndexService.assignVariantToChannel(
+                    event.ctx,
+                    event.productVariant.id,
+                    event.channelId,
+                );
+            } else {
+                return this.elasticsearchIndexService.removeVariantFromChannel(
+                    event.ctx,
+                    event.productVariant.id,
+                    event.channelId,
+                );
+            }
+        });
+
         const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent);
         const closingNotifier$ = collectionModification$.pipe(debounceTime(50));
         collectionModification$

+ 18 - 1
packages/elasticsearch-plugin/src/types.ts

@@ -178,6 +178,13 @@ export interface ProductChannelMessageData {
     productId: ID;
     channelId: ID;
 }
+
+export type VariantChannelMessageData = {
+    ctx: SerializedRequestContext;
+    productVariantId: ID;
+    channelId: ID;
+};
+
 export interface UpdateAssetMessageData {
     ctx: SerializedRequestContext;
     asset: JsonCompatible<Required<Asset>>;
@@ -210,6 +217,12 @@ export class AssignProductToChannelMessage extends WorkerMessage<ProductChannelM
 export class RemoveProductFromChannelMessage extends WorkerMessage<ProductChannelMessageData, boolean> {
     static readonly pattern = 'RemoveProductFromChannel';
 }
+export class AssignVariantToChannelMessage extends WorkerMessage<VariantChannelMessageData, boolean> {
+    static readonly pattern = 'AssignVariantToChannel';
+}
+export class RemoveVariantFromChannelMessage extends WorkerMessage<VariantChannelMessageData, boolean> {
+    static readonly pattern = 'RemoveVariantFromChannel';
+}
 export class UpdateAssetMessage extends WorkerMessage<UpdateAssetMessageData, boolean> {
     static readonly pattern = 'UpdateAsset';
 }
@@ -235,6 +248,8 @@ type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>;
 type DeleteAssetJobData = NamedJobData<'delete-asset', UpdateAssetMessageData>;
 type AssignProductToChannelJobData = NamedJobData<'assign-product-to-channel', ProductChannelMessageData>;
 type RemoveProductFromChannelJobData = NamedJobData<'remove-product-from-channel', ProductChannelMessageData>;
+type AssignVariantToChannelJobData = NamedJobData<'assign-variant-to-channel', VariantChannelMessageData>;
+type RemoveVariantFromChannelJobData = NamedJobData<'remove-variant-from-channel', VariantChannelMessageData>;
 export type UpdateIndexQueueJobData =
     | ReindexJobData
     | UpdateProductJobData
@@ -245,7 +260,9 @@ export type UpdateIndexQueueJobData =
     | UpdateAssetJobData
     | DeleteAssetJobData
     | AssignProductToChannelJobData
-    | RemoveProductFromChannelJobData;
+    | RemoveProductFromChannelJobData
+    | AssignVariantToChannelJobData
+    | RemoveVariantFromChannelJobData;
 
 type CustomStringMapping<Args extends any[]> = CustomMappingDefinition<Args, 'String!', string>;
 type CustomStringMappingNullable<Args extends any[]> = CustomMappingDefinition<Args, 'String', Maybe<string>>;