Forráskód Böngészése

perf(core): Move application of CollectionFilters to worker

Relates to #148
Michael Bromley 6 éve
szülő
commit
0a90982e9b

+ 28 - 4
packages/core/e2e/collection.e2e-spec.ts

@@ -47,6 +47,7 @@ import {
 import { TestAdminClient } from './test-client';
 import { TestServer } from './test-server';
 import { assertThrowsWithMessage } from './utils/assert-throws-with-message';
+import { awaitRunningJobs } from './utils/await-running-jobs';
 
 describe('Collection resolver', () => {
     const client = new TestAdminClient();
@@ -611,9 +612,16 @@ describe('Collection resolver', () => {
                         ],
                     } as CreateCollectionInput,
                 });
-                expect(result.createCollection.productVariants.items.map(i => i.name)).toEqual([
-                    'Instant Camera',
-                ]);
+
+                await awaitRunningJobs(client);
+                const { collection } = await client.query<GetCollection.Query, GetCollection.Variables>(
+                    GET_COLLECTION,
+                    {
+                        id: result.createCollection.id,
+                    },
+                );
+
+                expect(collection!.productVariants.items.map(i => i.name)).toEqual(['Instant Camera']);
             });
 
             it('photo OR pear', async () => {
@@ -646,7 +654,16 @@ describe('Collection resolver', () => {
                         ],
                     } as CreateCollectionInput,
                 });
-                expect(result.createCollection.productVariants.items.map(i => i.name)).toEqual([
+
+                await awaitRunningJobs(client);
+                const { collection } = await client.query<GetCollection.Query, GetCollection.Variables>(
+                    GET_COLLECTION,
+                    {
+                        id: result.createCollection.id,
+                    },
+                );
+
+                expect(collection!.productVariants.items.map(i => i.name)).toEqual([
                     'Laptop 13 inch 8GB',
                     'Laptop 15 inch 8GB',
                     'Laptop 13 inch 16GB',
@@ -926,6 +943,12 @@ export const GET_COLLECTION = gql`
     query GetCollection($id: ID!) {
         collection(id: $id) {
             ...Collection
+            productVariants {
+                items {
+                    id
+                    name
+                }
+            }
         }
     }
     ${COLLECTION_FRAGMENT}
@@ -989,6 +1012,7 @@ const GET_COLLECTION_PRODUCT_VARIANTS = gql`
 const CREATE_COLLECTION_SELECT_VARIANTS = gql`
     mutation CreateCollectionSelectVariants($input: CreateCollectionInput!) {
         createCollection(input: $input) {
+            id
             productVariants {
                 items {
                     name

+ 9 - 32
packages/core/e2e/default-search-plugin.e2e-spec.ts

@@ -10,8 +10,6 @@ import { TEST_SETUP_TIMEOUT_MS } from './config/test-config';
 import {
     CreateCollection,
     CreateFacet,
-    GetRunningJobs,
-    JobState,
     LanguageCode,
     SearchFacetValues,
     SearchGetPrices,
@@ -33,6 +31,7 @@ import {
 import { SEARCH_PRODUCTS_SHOP } from './graphql/shop-definitions';
 import { TestAdminClient, TestShopClient } from './test-client';
 import { TestServer } from './test-server';
+import { awaitRunningJobs } from './utils/await-running-jobs';
 
 describe('Default search plugin', () => {
     const adminClient = new TestAdminClient();
@@ -306,7 +305,7 @@ describe('Default search plugin', () => {
                     input: [{ id: 'T_3', enabled: false }],
                 },
             );
-            await awaitRunningJobs();
+            await awaitRunningJobs(adminClient);
             const result = await shopClient.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
                 SEARCH_PRODUCTS_SHOP,
                 {
@@ -357,7 +356,7 @@ describe('Default search plugin', () => {
                     facetValueIds: [],
                 },
             });
-            await awaitRunningJobs();
+            await awaitRunningJobs(adminClient);
             const result = await adminClient.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
                 SEARCH_PRODUCTS,
                 {
@@ -402,7 +401,7 @@ describe('Default search plugin', () => {
                     },
                 },
             );
-            await awaitRunningJobs();
+            await awaitRunningJobs(adminClient);
             const result = await adminClient.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
                 SEARCH_PRODUCTS,
                 {
@@ -455,7 +454,7 @@ describe('Default search plugin', () => {
                     ],
                 },
             });
-            await awaitRunningJobs();
+            await awaitRunningJobs(adminClient);
             const result = await adminClient.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
                 SEARCH_PRODUCTS,
                 {
@@ -482,7 +481,7 @@ describe('Default search plugin', () => {
                     value: 50,
                 },
             });
-            await awaitRunningJobs();
+            await awaitRunningJobs(adminClient);
             const result = await adminClient.query<SearchGetPrices.Query, SearchGetPrices.Variables>(
                 SEARCH_GET_PRICES,
                 {
@@ -524,7 +523,7 @@ describe('Default search plugin', () => {
                     input: [{ id: 'T_1', enabled: false }, { id: 'T_2', enabled: false }],
                 },
             );
-            await awaitRunningJobs();
+            await awaitRunningJobs(adminClient);
             const result = await adminClient.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
                 SEARCH_PRODUCTS,
                 {
@@ -548,7 +547,7 @@ describe('Default search plugin', () => {
                     input: [{ id: 'T_4', enabled: false }],
                 },
             );
-            await awaitRunningJobs();
+            await awaitRunningJobs(adminClient);
             const result = await adminClient.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
                 SEARCH_PRODUCTS,
                 {
@@ -572,7 +571,7 @@ describe('Default search plugin', () => {
                     enabled: false,
                 },
             });
-            await awaitRunningJobs();
+            await awaitRunningJobs(adminClient);
             const result = await adminClient.query<SearchProductsShop.Query, SearchProductsShop.Variables>(
                 SEARCH_PRODUCTS,
                 {
@@ -589,30 +588,8 @@ describe('Default search plugin', () => {
             ]);
         });
     });
-
-    /**
-     * Since the updates to the search index are performed in the background, we need
-     * to ensure that any running background jobs are completed before continuing certain
-     * tests.
-     */
-    async function awaitRunningJobs() {
-        let runningJobs = 0;
-        do {
-            const { jobs } = await adminClient.query<GetRunningJobs.Query>(GET_RUNNING_JOBS);
-            runningJobs = jobs.filter(job => job.state !== JobState.COMPLETED).length;
-        } while (runningJobs > 0);
-    }
 });
 
-export const GET_RUNNING_JOBS = gql`
-    query GetRunningJobs {
-        jobs {
-            name
-            state
-        }
-    }
-`;
-
 export const SEARCH_PRODUCTS = gql`
     query SearchProductsAdmin($input: SearchInput!) {
         search(input: $input) {

+ 29 - 18
packages/core/e2e/graphql/generated-e2e-admin-types.ts

@@ -3367,7 +3367,13 @@ export type GetCollectionQueryVariables = {
 };
 
 export type GetCollectionQuery = { __typename?: 'Query' } & {
-    collection: Maybe<{ __typename?: 'Collection' } & CollectionFragment>;
+    collection: Maybe<
+        { __typename?: 'Collection' } & {
+            productVariants: { __typename?: 'ProductVariantList' } & {
+                items: Array<{ __typename?: 'ProductVariant' } & Pick<ProductVariant, 'id' | 'name'>>;
+            };
+        } & CollectionFragment
+    >;
 };
 
 export type MoveCollectionMutationVariables = {
@@ -3423,11 +3429,12 @@ export type CreateCollectionSelectVariantsMutationVariables = {
 };
 
 export type CreateCollectionSelectVariantsMutation = { __typename?: 'Mutation' } & {
-    createCollection: { __typename?: 'Collection' } & {
-        productVariants: { __typename?: 'ProductVariantList' } & Pick<ProductVariantList, 'totalItems'> & {
-                items: Array<{ __typename?: 'ProductVariant' } & Pick<ProductVariant, 'name'>>;
-            };
-    };
+    createCollection: { __typename?: 'Collection' } & Pick<Collection, 'id'> & {
+            productVariants: { __typename?: 'ProductVariantList' } & Pick<
+                ProductVariantList,
+                'totalItems'
+            > & { items: Array<{ __typename?: 'ProductVariant' } & Pick<ProductVariant, 'name'>> };
+        };
 };
 
 export type GetCollectionBreadcrumbsQueryVariables = {
@@ -3574,12 +3581,6 @@ export type DeleteCustomerMutation = { __typename?: 'Mutation' } & {
     deleteCustomer: { __typename?: 'DeletionResponse' } & Pick<DeletionResponse, 'result'>;
 };
 
-export type GetRunningJobsQueryVariables = {};
-
-export type GetRunningJobsQuery = { __typename?: 'Query' } & {
-    jobs: Array<{ __typename?: 'JobInfo' } & Pick<JobInfo, 'name' | 'state'>>;
-};
-
 export type SearchProductsAdminQueryVariables = {
     input: SearchInput;
 };
@@ -4270,6 +4271,12 @@ export type GetStockMovementQuery = { __typename?: 'Query' } & {
     >;
 };
 
+export type GetRunningJobsQueryVariables = {};
+
+export type GetRunningJobsQuery = { __typename?: 'Query' } & {
+    jobs: Array<{ __typename?: 'JobInfo' } & Pick<JobInfo, 'name' | 'state'>>;
+};
+
 export type GetProductsQueryVariables = {
     options?: Maybe<ProductListOptions>;
 };
@@ -5023,6 +5030,10 @@ export namespace GetCollection {
     export type Variables = GetCollectionQueryVariables;
     export type Query = GetCollectionQuery;
     export type Collection = CollectionFragment;
+    export type ProductVariants = (NonNullable<GetCollectionQuery['collection']>)['productVariants'];
+    export type Items = NonNullable<
+        (NonNullable<GetCollectionQuery['collection']>)['productVariants']['items'][0]
+    >;
 }
 
 export namespace MoveCollection {
@@ -5163,12 +5174,6 @@ export namespace DeleteCustomer {
     export type DeleteCustomer = DeleteCustomerMutation['deleteCustomer'];
 }
 
-export namespace GetRunningJobs {
-    export type Variables = GetRunningJobsQueryVariables;
-    export type Query = GetRunningJobsQuery;
-    export type Jobs = NonNullable<GetRunningJobsQuery['jobs'][0]>;
-}
-
 export namespace SearchProductsAdmin {
     export type Variables = SearchProductsAdminQueryVariables;
     export type Query = SearchProductsAdminQuery;
@@ -5614,6 +5619,12 @@ export namespace GetStockMovement {
     export type Variants = VariantWithStockFragment;
 }
 
+export namespace GetRunningJobs {
+    export type Variables = GetRunningJobsQueryVariables;
+    export type Query = GetRunningJobsQuery;
+    export type Jobs = NonNullable<GetRunningJobsQuery['jobs'][0]>;
+}
+
 export namespace GetProducts {
     export type Variables = GetProductsQueryVariables;
     export type Query = GetProductsQuery;

+ 8 - 0
packages/core/e2e/graphql/shared-definitions.ts

@@ -266,3 +266,11 @@ export const GET_STOCK_MOVEMENT = gql`
     }
     ${VARIANT_WITH_STOCK_FRAGMENT}
 `;
+export const GET_RUNNING_JOBS = gql`
+    query GetRunningJobs {
+        jobs {
+            name
+            state
+        }
+    }
+`;

+ 18 - 0
packages/core/e2e/utils/await-running-jobs.ts

@@ -0,0 +1,18 @@
+import { GetRunningJobs, JobState } from '../graphql/generated-e2e-admin-types';
+import { GET_RUNNING_JOBS } from '../graphql/shared-definitions';
+import { TestAdminClient } from '../test-client';
+
+/**
+ * For mutation which trigger background jobs, this can be used to "pause" the execution of
+ * the test until those jobs have completed;
+ */
+export async function awaitRunningJobs(adminClient: TestAdminClient, timeout: number = 5000) {
+    let runningJobs = 0;
+    const startTime = +new Date();
+    let timedOut = false;
+    do {
+        const { jobs } = await adminClient.query<GetRunningJobs.Query>(GET_RUNNING_JOBS);
+        runningJobs = jobs.filter(job => job.state !== JobState.COMPLETED).length;
+        timedOut = timeout < +new Date() - startTime;
+    } while (runningJobs > 0 && !timedOut);
+}

+ 129 - 0
packages/core/src/service/controllers/collection.controller.ts

@@ -0,0 +1,129 @@
+import { Controller } from '@nestjs/common';
+import { MessagePattern } from '@nestjs/microservices';
+import { InjectConnection } from '@nestjs/typeorm';
+import { ConfigurableOperation } from '@vendure/common/lib/generated-types';
+import { ID } from '@vendure/common/lib/shared-types';
+import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
+import { Observable } from 'rxjs';
+import { Connection } from 'typeorm';
+
+import {
+    facetValueCollectionFilter,
+    variantNameCollectionFilter,
+} from '../../config/collection/default-collection-filters';
+import { Logger } from '../../config/logger/vendure-logger';
+import { Collection } from '../../entity/collection/collection.entity';
+import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
+import { CollectionService } from '../services/collection.service';
+import { ApplyCollectionFiltersMessage } from '../types/collection-messages';
+
+/**
+ * Updates collections on the worker process because running the CollectionFilters
+ * is computationally expensive.
+ */
+@Controller()
+export class CollectionController {
+    constructor(
+        @InjectConnection() private connection: Connection,
+        private collectionService: CollectionService,
+    ) {}
+
+    @MessagePattern(ApplyCollectionFiltersMessage.pattern)
+    applyCollectionFilters({
+        collectionIds,
+    }: ApplyCollectionFiltersMessage['data']): Observable<ApplyCollectionFiltersMessage['response']> {
+        return new Observable(observer => {
+            (async () => {
+                Logger.verbose(`Processing ${collectionIds.length} Collections`);
+                const timeStart = Date.now();
+                const collections = await this.connection.getRepository(Collection).findByIds(collectionIds, {
+                    relations: ['productVariants'],
+                });
+                let completed = 0;
+                for (const collection of collections) {
+                    const affectedVariantIds = await this.applyCollectionFiltersInternal(collection);
+
+                    observer.next({
+                        total: collectionIds.length,
+                        completed: ++completed,
+                        duration: +new Date() - timeStart,
+                        collectionId: collection.id,
+                        affectedVariantIds,
+                    });
+                }
+                observer.complete();
+            })();
+        });
+    }
+
+    /**
+     * Applies the CollectionFilters and returns an array of all affected ProductVariant ids.
+     */
+    private async applyCollectionFiltersInternal(collection: Collection): Promise<ID[]> {
+        const ancestorFilters = await this.collectionService
+            .getAncestors(collection.id)
+            .then(ancestors =>
+                ancestors.reduce(
+                    (filters, c) => [...filters, ...(c.filters || [])],
+                    [] as ConfigurableOperation[],
+                ),
+            );
+        const preIds = await this.collectionService.getCollectionProductVariantIds(collection);
+        collection.productVariants = await this.getFilteredProductVariants([
+            ...ancestorFilters,
+            ...(collection.filters || []),
+        ]);
+        const postIds = collection.productVariants.map(v => v.id);
+        await this.connection
+            .getRepository(Collection)
+            .save(collection, { chunk: Math.ceil(collection.productVariants.length / 500) });
+
+        const preIdsSet = new Set(preIds);
+        const postIdsSet = new Set(postIds);
+        const difference = [
+            ...preIds.filter(id => !postIdsSet.has(id)),
+            ...postIds.filter(id => !preIdsSet.has(id)),
+        ];
+        return difference;
+    }
+
+    /**
+     * Applies the CollectionFilters and returns an array of ProductVariant entities which match.
+     */
+    private async getFilteredProductVariants(filters: ConfigurableOperation[]): Promise<ProductVariant[]> {
+        if (filters.length === 0) {
+            return [];
+        }
+        const facetFilters = filters.filter(f => f.code === facetValueCollectionFilter.code);
+        const variantNameFilters = filters.filter(f => f.code === variantNameCollectionFilter.code);
+        let qb = this.connection.getRepository(ProductVariant).createQueryBuilder('productVariant');
+
+        // Apply any facetValue-based filters
+        if (facetFilters.length) {
+            const [idsArg, containsAnyArg] = facetFilters[0].args;
+            const mergedArgs = facetFilters
+                .map(f => f.args[0].value)
+                .filter(notNullOrUndefined)
+                .map(value => JSON.parse(value))
+                .reduce((all, ids) => [...all, ...ids]);
+
+            qb = facetValueCollectionFilter.apply(qb, [
+                {
+                    name: idsArg.name,
+                    type: idsArg.type,
+                    value: JSON.stringify(Array.from(new Set(mergedArgs))),
+                },
+                containsAnyArg,
+            ]);
+        }
+
+        // Apply any variant name-based filters
+        if (variantNameFilters.length) {
+            for (const filter of variantNameFilters) {
+                qb = variantNameCollectionFilter.apply(qb, filter.args);
+            }
+        }
+
+        return qb.getMany();
+    }
+}

+ 4 - 0
packages/core/src/service/service.module.ts

@@ -6,6 +6,7 @@ import { ConfigService } from '../config/config.service';
 import { EventBusModule } from '../event-bus/event-bus.module';
 import { WorkerServiceModule } from '../worker/worker-service.module';
 
+import { CollectionController } from './controllers/collection.controller';
 import { AssetUpdater } from './helpers/asset-updater/asset-updater';
 import { ListQueryBuilder } from './helpers/list-query-builder/list-query-builder';
 import { OrderCalculator } from './helpers/order-calculator/order-calculator';
@@ -82,6 +83,8 @@ const exportedProviders = [
     TranslatableSaver,
 ];
 
+const workerControllers = [CollectionController];
+
 let defaultTypeOrmModule: DynamicModule;
 let workerTypeOrmModule: DynamicModule;
 
@@ -192,6 +195,7 @@ export class ServiceModule {
         return {
             module: ServiceModule,
             imports: [workerTypeOrmModule],
+            controllers: workerControllers,
         };
     }
 

+ 42 - 80
packages/core/src/service/services/collection.service.ts

@@ -12,7 +12,6 @@ import {
 import { pick } from '@vendure/common/lib/pick';
 import { ROOT_COLLECTION_NAME } from '@vendure/common/lib/shared-constants';
 import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
-import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 import { Connection } from 'typeorm';
 
 import { RequestContext } from '../../api/common/request-context';
@@ -27,20 +26,24 @@ import {
     facetValueCollectionFilter,
     variantNameCollectionFilter,
 } from '../../config/collection/default-collection-filters';
+import { Logger } from '../../config/logger/vendure-logger';
 import { CollectionTranslation } from '../../entity/collection/collection-translation.entity';
 import { Collection } from '../../entity/collection/collection.entity';
 import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
 import { EventBus } from '../../event-bus/event-bus';
 import { CatalogModificationEvent } from '../../event-bus/events/catalog-modification-event';
 import { CollectionModificationEvent } from '../../event-bus/events/collection-modification-event';
+import { WorkerService } from '../../worker/worker.service';
 import { AssetUpdater } from '../helpers/asset-updater/asset-updater';
 import { ListQueryBuilder } from '../helpers/list-query-builder/list-query-builder';
 import { TranslatableSaver } from '../helpers/translatable-saver/translatable-saver';
 import { getEntityOrThrow } from '../helpers/utils/get-entity-or-throw';
 import { translateDeep } from '../helpers/utils/translate-entity';
+import { ApplyCollectionFiltersMessage } from '../types/collection-messages';
 
 import { ChannelService } from './channel.service';
 import { FacetValueService } from './facet-value.service';
+import { JobService } from './job.service';
 
 export class CollectionService implements OnModuleInit {
     private rootCategories: { [channelCode: string]: Collection } = {};
@@ -57,6 +60,8 @@ export class CollectionService implements OnModuleInit {
         private listQueryBuilder: ListQueryBuilder,
         private translatableSaver: TranslatableSaver,
         private eventBus: EventBus,
+        private workerService: WorkerService,
+        private jobService: JobService,
     ) {}
 
     onModuleInit() {
@@ -64,14 +69,7 @@ export class CollectionService implements OnModuleInit {
             const collections = await this.connection.getRepository(Collection).find({
                 relations: ['productVariants'],
             });
-            for (const collection of collections) {
-                const affectedVariantIds = await this.applyCollectionFilters(collection);
-                if (affectedVariantIds.length) {
-                    this.eventBus.publish(
-                        new CollectionModificationEvent(event.ctx, collection, affectedVariantIds),
-                    );
-                }
-            }
+            this.applyCollectionFilters(event.ctx, collections);
         });
     }
 
@@ -250,8 +248,7 @@ export class CollectionService implements OnModuleInit {
                 await this.assetUpdater.updateEntityAssets(coll, input);
             },
         });
-        const affectedVariantIds = await this.applyCollectionFilters(collection);
-        this.eventBus.publish(new CollectionModificationEvent(ctx, collection, affectedVariantIds));
+        this.applyCollectionFilters(ctx, [collection]);
         return assertFound(this.findOne(ctx, collection.id));
     }
 
@@ -267,11 +264,9 @@ export class CollectionService implements OnModuleInit {
                 await this.assetUpdater.updateEntityAssets(coll, input);
             },
         });
-        let affectedVariantIds: ID[] = [];
         if (input.filters) {
-            affectedVariantIds = await this.applyCollectionFilters(collection);
+            this.applyCollectionFilters(ctx, [collection]);
         }
-        this.eventBus.publish(new CollectionModificationEvent(ctx, collection, affectedVariantIds));
         return assertFound(this.findOne(ctx, collection.id));
     }
 
@@ -330,7 +325,7 @@ export class CollectionService implements OnModuleInit {
         }
 
         await this.connection.getRepository(Collection).save(siblings);
-        await this.applyCollectionFilters(target);
+        await this.applyCollectionFilters(ctx, [target]);
         return assertFound(this.findOne(ctx, input.collectionId));
     }
 
@@ -361,36 +356,43 @@ export class CollectionService implements OnModuleInit {
     /**
      * Applies the CollectionFilters and returns an array of all affected ProductVariant ids.
      */
-    private async applyCollectionFilters(collection: Collection): Promise<ID[]> {
-        const ancestorFilters = await this.getAncestors(collection.id).then(ancestors =>
-            ancestors.reduce(
-                (filters, c) => [...filters, ...(c.filters || [])],
-                [] as ConfigurableOperation[],
-            ),
-        );
-        const preIds = await this.getCollectionProductVariantIds(collection);
-        collection.productVariants = await this.getFilteredProductVariants([
-            ...ancestorFilters,
-            ...(collection.filters || []),
-        ]);
-        const postIds = collection.productVariants.map(v => v.id);
-        await this.connection
-            .getRepository(Collection)
-            .save(collection, { chunk: Math.ceil(collection.productVariants.length / 500) });
-
-        const preIdsSet = new Set(preIds);
-        const postIdsSet = new Set(postIds);
-        const difference = [
-            ...preIds.filter(id => !postIdsSet.has(id)),
-            ...postIds.filter(id => !preIdsSet.has(id)),
-        ];
-        return difference;
+    private async applyCollectionFilters(ctx: RequestContext, collections: Collection[]): Promise<void> {
+        const collectionIds = collections.map(c => c.id);
+
+        const job = this.jobService.createJob({
+            name: 'apply-collection-filters',
+            metadata: { collectionIds },
+            singleInstance: true,
+            work: async reporter => {
+                Logger.verbose(`sending ApplyCollectionFiltersMessage message`);
+                this.workerService.send(new ApplyCollectionFiltersMessage({ collectionIds })).subscribe({
+                    next: ({ total, completed, duration, collectionId, affectedVariantIds }) => {
+                        const progress = Math.ceil((completed / total) * 100);
+                        const collection = collections.find(c => idsAreEqual(c.id, collectionId));
+                        if (collection) {
+                            this.eventBus.publish(
+                                new CollectionModificationEvent(ctx, collection, affectedVariantIds),
+                            );
+                        }
+                        reporter.setProgress(progress);
+                    },
+                    complete: () => {
+                        reporter.complete();
+                    },
+                    error: err => {
+                        Logger.error(err);
+                        reporter.complete();
+                    },
+                });
+            },
+        });
+        await job.start();
     }
 
     /**
      * Returns the IDs of the Collection's ProductVariants.
      */
-    private async getCollectionProductVariantIds(collection: Collection): Promise<ID[]> {
+    async getCollectionProductVariantIds(collection: Collection): Promise<ID[]> {
         if (collection.productVariants) {
             return collection.productVariants.map(v => v.id);
         } else {
@@ -403,46 +405,6 @@ export class CollectionService implements OnModuleInit {
         }
     }
 
-    /**
-     * Applies the CollectionFilters and returns an array of ProductVariant entities which match.
-     */
-    private async getFilteredProductVariants(filters: ConfigurableOperation[]): Promise<ProductVariant[]> {
-        if (filters.length === 0) {
-            return [];
-        }
-        const facetFilters = filters.filter(f => f.code === facetValueCollectionFilter.code);
-        const variantNameFilters = filters.filter(f => f.code === variantNameCollectionFilter.code);
-        let qb = this.connection.getRepository(ProductVariant).createQueryBuilder('productVariant');
-
-        // Apply any facetValue-based filters
-        if (facetFilters.length) {
-            const [idsArg, containsAnyArg] = facetFilters[0].args;
-            const mergedArgs = facetFilters
-                .map(f => f.args[0].value)
-                .filter(notNullOrUndefined)
-                .map(value => JSON.parse(value))
-                .reduce((all, ids) => [...all, ...ids]);
-
-            qb = facetValueCollectionFilter.apply(qb, [
-                {
-                    name: idsArg.name,
-                    type: idsArg.type,
-                    value: JSON.stringify(Array.from(new Set(mergedArgs))),
-                },
-                containsAnyArg,
-            ]);
-        }
-
-        // Apply any variant name-based filters
-        if (variantNameFilters.length) {
-            for (const filter of variantNameFilters) {
-                qb = variantNameCollectionFilter.apply(qb, filter.args);
-            }
-        }
-
-        return qb.getMany();
-    }
-
     /**
      * Returns the next position value in the given parent collection.
      */

+ 18 - 0
packages/core/src/service/types/collection-messages.ts

@@ -0,0 +1,18 @@
+import { ID } from '@vendure/common/lib/shared-types';
+
+import { WorkerMessage } from '../../worker/types';
+
+export interface ProcessCollectionsResponse {
+    total: number;
+    completed: number;
+    duration: number;
+    collectionId: ID;
+    affectedVariantIds: ID[];
+}
+
+export class ApplyCollectionFiltersMessage extends WorkerMessage<
+    { collectionIds: ID[] },
+    ProcessCollectionsResponse
+> {
+    static readonly pattern = 'ApplyCollectionFilters';
+}