Browse Source

refactor: Update plugins to use new EventBus.ofType() API

Michael Bromley 6 years ago
parent
commit
a243ab0713

+ 5 - 0
packages/core/e2e/collection.e2e-spec.ts

@@ -818,6 +818,8 @@ describe('Collection resolver', () => {
                     },
                     },
                 });
                 });
 
 
+                await awaitRunningJobs(client);
+
                 const result = await client.query<
                 const result = await client.query<
                     GetCollectionProducts.Query,
                     GetCollectionProducts.Query,
                     GetCollectionProducts.Variables
                     GetCollectionProducts.Variables
@@ -846,6 +848,9 @@ describe('Collection resolver', () => {
                         ],
                         ],
                     },
                     },
                 );
                 );
+
+                await awaitRunningJobs(client);
+
                 const result = await client.query<
                 const result = await client.query<
                     GetCollectionProducts.Query,
                     GetCollectionProducts.Query,
                     GetCollectionProducts.Variables
                     GetCollectionProducts.Variables

+ 4 - 4
packages/core/e2e/shop-auth.e2e-spec.ts

@@ -739,16 +739,16 @@ describe('Updating email address without email verification', () => {
 class TestEmailPlugin implements OnModuleInit {
 class TestEmailPlugin implements OnModuleInit {
     constructor(private eventBus: EventBus) {}
     constructor(private eventBus: EventBus) {}
     onModuleInit() {
     onModuleInit() {
-        this.eventBus.subscribe(AccountRegistrationEvent, event => {
+        this.eventBus.ofType(AccountRegistrationEvent).subscribe(event => {
             sendEmailFn(event);
             sendEmailFn(event);
         });
         });
-        this.eventBus.subscribe(PasswordResetEvent, event => {
+        this.eventBus.ofType(PasswordResetEvent).subscribe(event => {
             sendEmailFn(event);
             sendEmailFn(event);
         });
         });
-        this.eventBus.subscribe(IdentifierChangeRequestEvent, event => {
+        this.eventBus.ofType(IdentifierChangeRequestEvent).subscribe(event => {
             sendEmailFn(event);
             sendEmailFn(event);
         });
         });
-        this.eventBus.subscribe(IdentifierChangeEvent, event => {
+        this.eventBus.ofType(IdentifierChangeEvent).subscribe(event => {
             sendEmailFn(event);
             sendEmailFn(event);
         });
         });
     }
     }

+ 2 - 0
packages/core/e2e/test-server.ts

@@ -64,6 +64,8 @@ export class TestServer {
      * Destroy the Vendure instance. Should be called in the `afterAll` function.
      * Destroy the Vendure instance. Should be called in the `afterAll` function.
      */
      */
     async destroy() {
     async destroy() {
+        // allow a grace period of any outstanding async tasks to complete
+        await new Promise(resolve => global.setTimeout(resolve, 500));
         await this.app.close();
         await this.app.close();
         if (this.worker) {
         if (this.worker) {
             await this.worker.close();
             await this.worker.close();

+ 3 - 0
packages/core/e2e/utils/await-running-jobs.ts

@@ -10,6 +10,9 @@ export async function awaitRunningJobs(adminClient: TestAdminClient, timeout: nu
     let runningJobs = 0;
     let runningJobs = 0;
     const startTime = +new Date();
     const startTime = +new Date();
     let timedOut = false;
     let timedOut = false;
+    // Allow a brief period for the jobs to start in the case that
+    // e.g. event debouncing is used before triggering the job.
+    await new Promise(resolve => setTimeout(resolve, 100));
     do {
     do {
         const { jobs } = await adminClient.query<GetRunningJobs.Query>(GET_RUNNING_JOBS);
         const { jobs } = await adminClient.query<GetRunningJobs.Query>(GET_RUNNING_JOBS);
         runningJobs = jobs.filter(job => job.state !== JobState.COMPLETED).length;
         runningJobs = jobs.filter(job => job.state !== JobState.COMPLETED).length;

+ 21 - 5
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -1,4 +1,6 @@
 import { SearchReindexResponse } from '@vendure/common/lib/generated-types';
 import { SearchReindexResponse } from '@vendure/common/lib/generated-types';
+import { ID } from '@vendure/common/lib/shared-types';
+import { buffer, debounceTime, filter, map } from 'rxjs/operators';
 
 
 import { idsAreEqual } from '../../common/utils';
 import { idsAreEqual } from '../../common/utils';
 import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
 import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
@@ -64,15 +66,29 @@ export class DefaultSearchPlugin implements OnVendureBootstrap {
 
 
     /** @internal */
     /** @internal */
     async onVendureBootstrap() {
     async onVendureBootstrap() {
-        this.eventBus.subscribe(CatalogModificationEvent, event => {
+        this.eventBus.ofType(CatalogModificationEvent).subscribe(event => {
             if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
             if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
                 return this.searchIndexService.updateProductOrVariant(event.ctx, event.entity).start();
                 return this.searchIndexService.updateProductOrVariant(event.ctx, event.entity).start();
             }
             }
         });
         });
-        this.eventBus.subscribe(CollectionModificationEvent, event => {
-            return this.searchIndexService.updateVariantsById(event.ctx, event.productVariantIds).start();
-        });
-        this.eventBus.subscribe(TaxRateModificationEvent, event => {
+
+        const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent);
+        const closingNotifier$ = collectionModification$.pipe(debounceTime(50));
+        collectionModification$
+            .pipe(
+                buffer(closingNotifier$),
+                filter(events => 0 < events.length),
+                map(events => ({
+                    ctx: events[0].ctx,
+                    ids: events.reduce((ids, e) => [...ids, ...e.productVariantIds], [] as ID[]),
+                })),
+                filter(e => 0 < e.ids.length),
+            )
+            .subscribe(events => {
+                return this.searchIndexService.updateVariantsById(events.ctx, events.ids).start();
+            });
+
+        this.eventBus.ofType(TaxRateModificationEvent).subscribe(event => {
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
                 return this.searchIndexService.reindex(event.ctx).start();
                 return this.searchIndexService.reindex(event.ctx).start();

+ 2 - 2
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -27,7 +27,7 @@ export class SearchIndexService {
             name: 'reindex',
             name: 'reindex',
             singleInstance: true,
             singleInstance: true,
             work: async reporter => {
             work: async reporter => {
-                Logger.verbose(`sending reindex message`);
+                Logger.verbose(`sending ReindexMessage`);
                 this.workerService.send(new ReindexMessage({ ctx })).subscribe(this.createObserver(reporter));
                 this.workerService.send(new ReindexMessage({ ctx })).subscribe(this.createObserver(reporter));
             },
             },
         });
         });
@@ -66,7 +66,7 @@ export class SearchIndexService {
                 variantIds: ids,
                 variantIds: ids,
             },
             },
             work: reporter => {
             work: reporter => {
-                Logger.verbose(`sending reindex message`);
+                Logger.verbose(`sending UpdateVariantsByIdMessage`);
                 this.workerService
                 this.workerService
                     .send(new UpdateVariantsByIdMessage({ ctx, ids }))
                     .send(new UpdateVariantsByIdMessage({ ctx, ids }))
                     .subscribe(this.createObserver(reporter));
                     .subscribe(this.createObserver(reporter));

+ 15 - 2
packages/core/src/service/services/collection.service.ts

@@ -12,6 +12,8 @@ import {
 import { pick } from '@vendure/common/lib/pick';
 import { pick } from '@vendure/common/lib/pick';
 import { ROOT_COLLECTION_NAME } from '@vendure/common/lib/shared-constants';
 import { ROOT_COLLECTION_NAME } from '@vendure/common/lib/shared-constants';
 import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
 import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
+import { Subject } from 'rxjs';
+import { debounceTime } from 'rxjs/operators';
 import { Connection } from 'typeorm';
 import { Connection } from 'typeorm';
 
 
 import { RequestContext } from '../../api/common/request-context';
 import { RequestContext } from '../../api/common/request-context';
@@ -65,12 +67,22 @@ export class CollectionService implements OnModuleInit {
     ) {}
     ) {}
 
 
     onModuleInit() {
     onModuleInit() {
-        this.eventBus.subscribe(CatalogModificationEvent, async event => {
+        /*this.eventBus.subscribe(CatalogModificationEvent, async event => {
             const collections = await this.connection.getRepository(Collection).find({
             const collections = await this.connection.getRepository(Collection).find({
                 relations: ['productVariants'],
                 relations: ['productVariants'],
             });
             });
             this.applyCollectionFilters(event.ctx, collections);
             this.applyCollectionFilters(event.ctx, collections);
-        });
+        });*/
+
+        this.eventBus
+            .ofType(CatalogModificationEvent)
+            .pipe(debounceTime(50))
+            .subscribe(async event => {
+                const collections = await this.connection.getRepository(Collection).find({
+                    relations: ['productVariants'],
+                });
+                this.applyCollectionFilters(event.ctx, collections);
+            });
     }
     }
 
 
     async findAll(
     async findAll(
@@ -358,6 +370,7 @@ export class CollectionService implements OnModuleInit {
      */
      */
     private async applyCollectionFilters(ctx: RequestContext, collections: Collection[]): Promise<void> {
     private async applyCollectionFilters(ctx: RequestContext, collections: Collection[]): Promise<void> {
         const collectionIds = collections.map(c => c.id);
         const collectionIds = collections.map(c => c.id);
+        // Logger.info('applyCollectionFilters');
 
 
         const job = this.jobService.createJob({
         const job = this.jobService.createJob({
             name: 'apply-collection-filters',
             name: 'apply-collection-filters',

+ 21 - 7
packages/elasticsearch-plugin/src/plugin.ts

@@ -4,6 +4,7 @@ import {
     CollectionModificationEvent,
     CollectionModificationEvent,
     DeepRequired,
     DeepRequired,
     EventBus,
     EventBus,
+    ID,
     idsAreEqual,
     idsAreEqual,
     Logger,
     Logger,
     OnVendureBootstrap,
     OnVendureBootstrap,
@@ -16,6 +17,7 @@ import {
     VendurePlugin,
     VendurePlugin,
 } from '@vendure/core';
 } from '@vendure/core';
 import { gql } from 'apollo-server-core';
 import { gql } from 'apollo-server-core';
+import { buffer, debounceTime, filter, map } from 'rxjs/operators';
 
 
 import { ELASTIC_SEARCH_CLIENT, ELASTIC_SEARCH_OPTIONS, loggerCtx } from './constants';
 import { ELASTIC_SEARCH_CLIENT, ELASTIC_SEARCH_OPTIONS, loggerCtx } from './constants';
 import { ElasticsearchIndexService } from './elasticsearch-index.service';
 import { ElasticsearchIndexService } from './elasticsearch-index.service';
@@ -260,17 +262,29 @@ export class ElasticsearchPlugin implements OnVendureBootstrap, OnVendureClose {
 
 
         await this.elasticsearchService.createIndicesIfNotExists();
         await this.elasticsearchService.createIndicesIfNotExists();
 
 
-        this.eventBus.subscribe(CatalogModificationEvent, event => {
+        this.eventBus.ofType(CatalogModificationEvent).subscribe(event => {
             if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
             if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
                 return this.elasticsearchIndexService.updateProductOrVariant(event.ctx, event.entity).start();
                 return this.elasticsearchIndexService.updateProductOrVariant(event.ctx, event.entity).start();
             }
             }
         });
         });
-        this.eventBus.subscribe(CollectionModificationEvent, event => {
-            return this.elasticsearchIndexService
-                .updateVariantsById(event.ctx, event.productVariantIds)
-                .start();
-        });
-        this.eventBus.subscribe(TaxRateModificationEvent, event => {
+
+        const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent);
+        const closingNotifier$ = collectionModification$.pipe(debounceTime(50));
+        collectionModification$
+            .pipe(
+                buffer(closingNotifier$),
+                filter(events => 0 < events.length),
+                map(events => ({
+                    ctx: events[0].ctx,
+                    ids: events.reduce((ids, e) => [...ids, ...e.productVariantIds], [] as ID[]),
+                })),
+                filter(e => 0 < e.ids.length),
+            )
+            .subscribe(events => {
+                return this.elasticsearchIndexService.updateVariantsById(events.ctx, events.ids).start();
+            });
+
+        this.eventBus.ofType(TaxRateModificationEvent).subscribe(event => {
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
                 return this.elasticsearchService.reindex(event.ctx);
                 return this.elasticsearchService.reindex(event.ctx);

+ 1 - 1
packages/email-plugin/src/plugin.ts

@@ -210,7 +210,7 @@ export class EmailPlugin implements OnVendureBootstrap, OnVendureClose {
 
 
     private async setupEventSubscribers() {
     private async setupEventSubscribers() {
         for (const handler of EmailPlugin.options.handlers) {
         for (const handler of EmailPlugin.options.handlers) {
-            this.eventBus.subscribe(handler.event, event => {
+            this.eventBus.ofType(handler.event).subscribe(event => {
                 return this.handleEvent(handler, event);
                 return this.handleEvent(handler, event);
             });
             });
         }
         }