فهرست منبع

refactor(core) Remove use of worker in collection service

Fred Cox 5 سال پیش
والد
کامیت
e1c603b34a

+ 0 - 117
packages/core/src/service/controllers/collection.controller.ts

@@ -1,117 +0,0 @@
-import { Controller } from '@nestjs/common';
-import { MessagePattern } from '@nestjs/microservices';
-import { ConfigurableOperation } from '@vendure/common/lib/generated-types';
-import { pick } from '@vendure/common/lib/pick';
-import { ID } from '@vendure/common/lib/shared-types';
-import { Observable } from 'rxjs';
-
-import { ConfigService } from '../../config/config.service';
-import { Logger } from '../../config/logger/vendure-logger';
-import { Collection } from '../../entity/collection/collection.entity';
-import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
-import { asyncObservable } from '../../worker/async-observable';
-import { CollectionService } from '../services/collection.service';
-import { TransactionalConnection } from '../transaction/transactional-connection';
-import { ApplyCollectionFiltersMessage } from '../types/collection-messages';
-
-/**
- * Updates collections on the worker process because running the CollectionFilters
- * is computationally expensive.
- */
-@Controller()
-export class CollectionController {
-    constructor(
-        private connection: TransactionalConnection,
-        private collectionService: CollectionService,
-        private configService: ConfigService,
-    ) {}
-
-    @MessagePattern(ApplyCollectionFiltersMessage.pattern)
-    applyCollectionFilters({
-        collectionIds,
-    }: ApplyCollectionFiltersMessage['data']): Observable<ApplyCollectionFiltersMessage['response']> {
-        return asyncObservable(async observer => {
-            Logger.verbose(`Processing ${collectionIds.length} Collections`);
-            const timeStart = Date.now();
-            const collections = await this.connection.getRepository(Collection).findByIds(collectionIds, {
-                relations: ['productVariants'],
-            });
-            let completed = 0;
-            for (const collection of collections) {
-                const affectedVariantIds = await this.applyCollectionFiltersInternal(collection);
-
-                observer.next({
-                    total: collectionIds.length,
-                    completed: ++completed,
-                    duration: +new Date() - timeStart,
-                    collectionId: collection.id,
-                    affectedVariantIds,
-                });
-            }
-        });
-    }
-
-    /**
-     * Applies the CollectionFilters and returns an array of all affected ProductVariant ids.
-     */
-    private async applyCollectionFiltersInternal(collection: Collection): Promise<ID[]> {
-        const ancestorFilters = await this.collectionService
-            .getAncestors(collection.id)
-            .then(ancestors =>
-                ancestors.reduce(
-                    (filters, c) => [...filters, ...(c.filters || [])],
-                    [] as ConfigurableOperation[],
-                ),
-            );
-        const preIds = await this.collectionService.getCollectionProductVariantIds(collection);
-        collection.productVariants = await this.getFilteredProductVariants([
-            ...ancestorFilters,
-            ...(collection.filters || []),
-        ]);
-        const postIds = collection.productVariants.map(v => v.id);
-        try {
-            await this.connection
-                .getRepository(Collection)
-                // Only update the exact changed properties, to avoid VERY hard-to-debug
-                // non-deterministic race conditions e.g. when the "position" is changed
-                // by moving a Collection and then this save operation clobbers it back
-                // to the old value.
-                .save(pick(collection, ['id', 'productVariants']), {
-                    chunk: Math.ceil(collection.productVariants.length / 500),
-                    reload: false,
-                });
-        } catch (e) {
-            Logger.error(e);
-        }
-
-        const preIdsSet = new Set(preIds);
-        const postIdsSet = new Set(postIds);
-        const difference = [
-            ...preIds.filter(id => !postIdsSet.has(id)),
-            ...postIds.filter(id => !preIdsSet.has(id)),
-        ];
-        return difference;
-    }
-
-    /**
-     * Applies the CollectionFilters and returns an array of ProductVariant entities which match.
-     */
-    private async getFilteredProductVariants(filters: ConfigurableOperation[]): Promise<ProductVariant[]> {
-        if (filters.length === 0) {
-            return [];
-        }
-        const { collectionFilters } = this.configService.catalogOptions;
-        let qb = this.connection.getRepository(ProductVariant).createQueryBuilder('productVariant');
-
-        for (const filterType of collectionFilters) {
-            const filtersOfType = filters.filter(f => f.code === filterType.code);
-            if (filtersOfType.length) {
-                for (const filter of filtersOfType) {
-                    qb = filterType.apply(qb, filter.args);
-                }
-            }
-        }
-
-        return qb.getMany();
-    }
-}

+ 1 - 2
packages/core/src/service/service.module.ts

@@ -9,7 +9,6 @@ import { EventBusModule } from '../event-bus/event-bus.module';
 import { JobQueueModule } from '../job-queue/job-queue.module';
 import { WorkerServiceModule } from '../worker/worker-service.module';
 
-import { CollectionController } from './controllers/collection.controller';
 import { TaxRateController } from './controllers/tax-rate.controller';
 import { ConfigArgService } from './helpers/config-arg/config-arg.service';
 import { CustomFieldRelationService } from './helpers/custom-field-relation/custom-field-relation.service';
@@ -117,7 +116,7 @@ const helpers = [
     CustomFieldRelationService,
 ];
 
-const workerControllers = [CollectionController, TaxRateController];
+const workerControllers = [TaxRateController];
 
 let defaultTypeOrmModule: DynamicModule;
 let workerTypeOrmModule: DynamicModule;

+ 61 - 36
packages/core/src/service/services/collection.service.ts

@@ -28,10 +28,8 @@ import { EventBus } from '../../event-bus/event-bus';
 import { CollectionModificationEvent } from '../../event-bus/events/collection-modification-event';
 import { ProductEvent } from '../../event-bus/events/product-event';
 import { ProductVariantEvent } from '../../event-bus/events/product-variant-event';
-import { Job } from '../../job-queue/job';
 import { JobQueue } from '../../job-queue/job-queue';
 import { JobQueueService } from '../../job-queue/job-queue.service';
-import { WorkerService } from '../../worker/worker.service';
 import { ConfigArgService } from '../helpers/config-arg/config-arg.service';
 import { CustomFieldRelationService } from '../helpers/custom-field-relation/custom-field-relation.service';
 import { ListQueryBuilder } from '../helpers/list-query-builder/list-query-builder';
@@ -40,12 +38,13 @@ import { TranslatableSaver } from '../helpers/translatable-saver/translatable-sa
 import { moveToIndex } from '../helpers/utils/move-to-index';
 import { translateDeep } from '../helpers/utils/translate-entity';
 import { TransactionalConnection } from '../transaction/transactional-connection';
-import { ApplyCollectionFiltersJobData, ApplyCollectionFiltersMessage } from '../types/collection-messages';
 
 import { AssetService } from './asset.service';
 import { ChannelService } from './channel.service';
 import { FacetValueService } from './facet-value.service';
 
+type ApplyCollectionFiltersJobData = { ctx: SerializedRequestContext; collectionIds: ID[] };
+
 @Injectable()
 export class CollectionService implements OnModuleInit {
     private rootCollection: Collection | undefined;
@@ -59,7 +58,6 @@ export class CollectionService implements OnModuleInit {
         private listQueryBuilder: ListQueryBuilder,
         private translatableSaver: TranslatableSaver,
         private eventBus: EventBus,
-        private workerService: WorkerService,
         private jobQueueService: JobQueueService,
         private configService: ConfigService,
         private slugValidator: SlugValidator,
@@ -84,10 +82,19 @@ export class CollectionService implements OnModuleInit {
         this.applyFiltersQueue = this.jobQueueService.createQueue({
             name: 'apply-collection-filters',
             process: async job => {
+                const ctx = RequestContext.deserialize(job.data.ctx);
+
+                Logger.verbose(`Processing ${job.data.collectionIds.length} Collections`);
                 const collections = await this.connection
                     .getRepository(Collection)
-                    .findByIds(job.data.collectionIds);
-                return this.applyCollectionFilters(job.data.ctx, collections, job);
+                    .findByIds(job.data.collectionIds, {
+                        relations: ['productVariants'],
+                    });
+                let completed = 0;
+                for (const collection of collections) {
+                    await this.applyCollectionFiltersInternal(collection);
+                    job.setProgress(Math.ceil((++completed / job.data.collectionIds.length) * 100));
+                }
             },
         });
     }
@@ -393,37 +400,55 @@ export class CollectionService implements OnModuleInit {
     }
 
     /**
-     * Applies the CollectionFilters and returns an array of all affected ProductVariant ids.
+     * Applies the CollectionFilters
      */
-    private async applyCollectionFilters(
-        ctx: SerializedRequestContext,
-        collections: Collection[],
-        job: Job<ApplyCollectionFiltersJobData>,
-    ): Promise<void> {
-        const collectionIds = collections.map(c => c.id);
-        const requestContext = RequestContext.deserialize(ctx);
-
-        return new Promise<void>((resolve, reject) => {
-            this.workerService.send(new ApplyCollectionFiltersMessage({ collectionIds })).subscribe({
-                next: ({ total, completed, duration, collectionId, affectedVariantIds }) => {
-                    const progress = Math.ceil((completed / total) * 100);
-                    const collection = collections.find(c => idsAreEqual(c.id, collectionId));
-                    if (collection) {
-                        this.eventBus.publish(
-                            new CollectionModificationEvent(requestContext, collection, affectedVariantIds),
-                        );
-                    }
-                    job.setProgress(progress);
-                },
-                complete: () => {
-                    resolve();
-                },
-                error: err => {
-                    Logger.error(err);
-                    reject(err);
-                },
-            });
-        });
+    private async applyCollectionFiltersInternal(collection: Collection): Promise<void> {
+        const ancestorFilters = await this.getAncestors(collection.id).then(ancestors =>
+            ancestors.reduce(
+                (filters, c) => [...filters, ...(c.filters || [])],
+                [] as ConfigurableOperation[],
+            ),
+        );
+        collection.productVariants = await this.getFilteredProductVariants([
+            ...ancestorFilters,
+            ...(collection.filters || []),
+        ]);
+        try {
+            await this.connection
+                .getRepository(Collection)
+                // Only update the exact changed properties, to avoid VERY hard-to-debug
+                // non-deterministic race conditions e.g. when the "position" is changed
+                // by moving a Collection and then this save operation clobbers it back
+                // to the old value.
+                .save(pick(collection, ['id', 'productVariants']), {
+                    chunk: Math.ceil(collection.productVariants.length / 500),
+                    reload: false,
+                });
+        } catch (e) {
+            Logger.error(e);
+        }
+    }
+
+    /**
+     * Applies the CollectionFilters and returns an array of ProductVariant entities which match.
+     */
+    private async getFilteredProductVariants(filters: ConfigurableOperation[]): Promise<ProductVariant[]> {
+        if (filters.length === 0) {
+            return [];
+        }
+        const { collectionFilters } = this.configService.catalogOptions;
+        let qb = this.connection.getRepository(ProductVariant).createQueryBuilder('productVariant');
+
+        for (const filterType of collectionFilters) {
+            const filtersOfType = filters.filter(f => f.code === filterType.code);
+            if (filtersOfType.length) {
+                for (const filter of filtersOfType) {
+                    qb = filterType.apply(qb, filter.args);
+                }
+            }
+        }
+
+        return qb.getMany();
     }
 
     /**

+ 0 - 21
packages/core/src/service/types/collection-messages.ts

@@ -1,21 +0,0 @@
-import { ID } from '@vendure/common/lib/shared-types';
-
-import { SerializedRequestContext } from '../../api/common/request-context';
-import { WorkerMessage } from '../../worker/types';
-
-export interface ProcessCollectionsResponse {
-    total: number;
-    completed: number;
-    duration: number;
-    collectionId: ID;
-    affectedVariantIds: ID[];
-}
-
-export class ApplyCollectionFiltersMessage extends WorkerMessage<
-    { collectionIds: ID[] },
-    ProcessCollectionsResponse
-> {
-    static readonly pattern = 'ApplyCollectionFilters';
-}
-
-export type ApplyCollectionFiltersJobData = { ctx: SerializedRequestContext; collectionIds: ID[] };