Browse Source

perf(core): Run job queues in the worker process

* feat(core) Run job queues in the worker process

Moves the responsibility to get jobs out of the queue and run them to the worker process
Job queue and event listeners should be created in server and worker.

* feat(core) Add config to control which queues will be started

This will allow to have separate worker process for each queue

* refactor(core) Refactor job queue to remove logic into the job strategy

* fix(core) Return type of add should be same as arg

* fix(core) The job id isnt related to the id strategy used in the database

* feat(core) create InjectableJobQueueStrategy to avoid crashes when job queues are started in init

* feat(pub-sub-plugin) Create a job queue strategy based on google pub sub

use InjectableJobQueueStrategy in PubSub

* chore(scripts) publish can be run from another dir

* refactor(core) remove health check for microservice

* refactor(core) remove hanging promise in tests

* refactor(core) add missing awaits

* fix(core) off for job listener and use ticks

* introduce QueueNameProcessStorage

* use QueueNameProcessStorage in pub sub

* refactor(core) removes circular dependancy

* chore(pubsub-plugin) Add ci to package
Fred Cox 4 years ago
parent
commit
f05210a341
48 changed files with 1434 additions and 535 deletions
  1. 8 9
      packages/core/e2e/fixtures/test-plugins/with-job-queue.ts
  2. 3 0
      packages/core/e2e/job-queue.e2e-spec.ts
  3. 2 2
      packages/core/src/api/config/generate-permissions.ts
  4. 1 1
      packages/core/src/api/resolvers/admin/global-settings.resolver.ts
  5. 46 13
      packages/core/src/api/resolvers/admin/job.resolver.ts
  6. 6 1
      packages/core/src/common/constants.ts
  7. 0 7
      packages/core/src/common/permission-definition.ts
  8. 1 1
      packages/core/src/config/default-config.ts
  9. 2 0
      packages/core/src/config/index.ts
  10. 55 0
      packages/core/src/config/job-queue/inspectable-job-queue-strategy.ts
  11. 14 38
      packages/core/src/config/job-queue/job-queue-strategy.ts
  12. 4 5
      packages/core/src/config/vendure-config.ts
  13. 1 9
      packages/core/src/health-check/health-check.module.ts
  14. 9 6
      packages/core/src/job-queue/in-memory-job-queue-strategy.ts
  15. 3 0
      packages/core/src/job-queue/index.ts
  16. 43 0
      packages/core/src/job-queue/injectable-job-queue-strategy.ts
  17. 141 81
      packages/core/src/job-queue/job-queue.service.spec.ts
  18. 24 74
      packages/core/src/job-queue/job-queue.service.ts
  19. 11 90
      packages/core/src/job-queue/job-queue.ts
  20. 9 11
      packages/core/src/job-queue/job.ts
  21. 188 0
      packages/core/src/job-queue/polling-job-queue-strategy.ts
  22. 38 0
      packages/core/src/job-queue/queue-name-process-storage.ts
  23. 2 2
      packages/core/src/job-queue/testing-job-queue-strategy.ts
  24. 3 9
      packages/core/src/job-queue/types.ts
  25. 21 20
      packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts
  26. 4 3
      packages/core/src/plugin/default-search-plugin/default-search-plugin.ts
  27. 49 57
      packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts
  28. 25 24
      packages/core/src/service/services/collection.service.ts
  29. 1 1
      packages/core/src/service/services/role.service.ts
  30. 47 55
      packages/elasticsearch-plugin/src/elasticsearch-index.service.ts
  31. 3 3
      packages/elasticsearch-plugin/src/plugin.ts
  32. 1 1
      packages/email-plugin/src/plugin.spec.ts
  33. 13 7
      packages/email-plugin/src/plugin.ts
  34. 4 0
      packages/pub-sub-plugin/.gitignore
  35. 7 0
      packages/pub-sub-plugin/README.md
  36. 2 0
      packages/pub-sub-plugin/index.ts
  37. 25 0
      packages/pub-sub-plugin/jest.config.js
  38. 29 0
      packages/pub-sub-plugin/package.json
  39. 2 0
      packages/pub-sub-plugin/src/constants.ts
  40. 13 0
      packages/pub-sub-plugin/src/options.ts
  41. 26 0
      packages/pub-sub-plugin/src/plugin.ts
  42. 70 0
      packages/pub-sub-plugin/src/pub-sub-job-queue-strategy.spec.ts
  43. 146 0
      packages/pub-sub-plugin/src/pub-sub-job-queue-strategy.ts
  44. 9 0
      packages/pub-sub-plugin/tsconfig.build.json
  45. 10 0
      packages/pub-sub-plugin/tsconfig.json
  46. 1 1
      packages/testing/src/test-server.ts
  47. 5 0
      scripts/publish-to-verdaccio.sh
  48. 307 4
      yarn.lock

+ 8 - 9
packages/core/e2e/fixtures/test-plugins/with-job-queue.ts

@@ -1,6 +1,7 @@
 import { Controller, Get, OnModuleInit } from '@nestjs/common';
 import { JobQueue, JobQueueService, PluginCommonModule, VendurePlugin } from '@vendure/core';
 import { Subject } from 'rxjs';
+import { take } from 'rxjs/operators';
 
 @Controller('run-job')
 class TestController implements OnModuleInit {
@@ -11,22 +12,20 @@ class TestController implements OnModuleInit {
     onModuleInit(): any {
         this.queue = this.jobQueueService.createQueue({
             name: 'test',
-            concurrency: 1,
             process: job => {
-                PluginWithJobQueue.jobSubject.subscribe({
-                    next: () => {
+                return PluginWithJobQueue.jobSubject
+                    .pipe(take(1))
+                    .toPromise()
+                    .then(() => {
                         PluginWithJobQueue.jobHasDoneWork = true;
-                        job.complete();
-                    },
-                    error: err => job.fail(err),
-                });
+                    });
             },
         });
     }
 
     @Get()
-    runJob() {
-        this.queue.add({});
+    async runJob() {
+        await this.queue.add({});
         return true;
     }
 }

+ 3 - 0
packages/core/e2e/job-queue.e2e-spec.ts

@@ -24,6 +24,9 @@ describe('JobQueue', () => {
     const { server, adminClient } = createTestEnvironment(
         mergeConfig(testConfig, {
             plugins: [DefaultJobQueuePlugin, PluginWithJobQueue],
+            workerOptions: {
+                runInMainProcess: true,
+            },
         }),
     );
 

+ 2 - 2
packages/core/src/api/config/generate-permissions.ts

@@ -2,8 +2,8 @@ import { stitchSchemas } from '@graphql-tools/stitch';
 import { GraphQLEnumType, GraphQLInputObjectType, GraphQLSchema } from 'graphql';
 import { GraphQLEnumValueConfigMap } from 'graphql/type/definition';
 
-import { DEFAULT_PERMISSIONS } from '../../common/constants';
-import { getAllPermissionsMetadata, PermissionDefinition } from '../../common/permission-definition';
+import { getAllPermissionsMetadata } from '../../common/constants';
+import { PermissionDefinition } from '../../common/permission-definition';
 
 const PERMISSION_DESCRIPTION = `@description
 Permissions for administrators and customers. Used to control access to

+ 1 - 1
packages/core/src/api/resolvers/admin/global-settings.resolver.ts

@@ -19,10 +19,10 @@ import {
     TypeNode,
 } from 'graphql';
 
+import { getAllPermissionsMetadata } from '../../../common/constants';
 import { ErrorResultUnion } from '../../../common/error/error-result';
 import { UserInputError } from '../../../common/error/errors';
 import { ChannelDefaultLanguageError } from '../../../common/error/generated-graphql-admin-errors';
-import { getAllPermissionsMetadata } from '../../../common/permission-definition';
 import { ConfigService } from '../../../config/config.service';
 import {
     CustomFieldConfig,

+ 46 - 13
packages/core/src/api/resolvers/admin/job.resolver.ts

@@ -1,5 +1,6 @@
 import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
 import {
+    JobQueue,
     MutationCancelJobArgs,
     MutationRemoveSettledJobsArgs,
     Permission,
@@ -8,46 +9,78 @@ import {
     QueryJobsByIdArgs,
 } from '@vendure/common/lib/generated-types';
 
-import { JobQueueService } from '../../../job-queue/job-queue.service';
+import { ConfigService, InspectableJobQueueStrategy, isInspectableJobQueueStrategy } from '../../../config';
+import { JobQueueService } from '../../../job-queue';
 import { Allow } from '../../decorators/allow.decorator';
 
 @Resolver()
 export class JobResolver {
-    constructor(private jobService: JobQueueService) {}
+    constructor(private configService: ConfigService, private jobService: JobQueueService) {}
 
     @Query()
     @Allow(Permission.ReadSettings)
-    job(@Args() args: QueryJobArgs) {
-        return this.jobService.getJob(args.jobId);
+    async job(@Args() args: QueryJobArgs) {
+        const strategy = this.requireInspectableJobQueueStrategy();
+        if (!strategy) {
+            return;
+        }
+        return strategy.findOne(args.jobId);
     }
 
     @Query()
     @Allow(Permission.ReadSettings)
-    jobs(@Args() args: QueryJobsArgs) {
-        return this.jobService.getJobs(args.options || undefined);
+    async jobs(@Args() args: QueryJobsArgs) {
+        const strategy = this.requireInspectableJobQueueStrategy();
+        if (!strategy) {
+            return {
+                items: [],
+                totalItems: 0,
+            };
+        }
+        return strategy.findMany(args.options || undefined);
     }
 
     @Query()
     @Allow(Permission.ReadSettings)
-    jobsById(@Args() args: QueryJobsByIdArgs) {
-        return this.jobService.getJobsById(args.jobIds || undefined);
+    async jobsById(@Args() args: QueryJobsByIdArgs) {
+        const strategy = this.requireInspectableJobQueueStrategy();
+        if (!strategy) {
+            return [];
+        }
+        return strategy.findManyById(args.jobIds || undefined);
     }
 
     @Query()
     @Allow(Permission.ReadSettings)
-    jobQueues() {
+    jobQueues(): JobQueue[] {
         return this.jobService.getJobQueues();
     }
 
     @Mutation()
     @Allow(Permission.DeleteSettings)
-    removeSettledJobs(@Args() args: MutationRemoveSettledJobsArgs) {
-        return this.jobService.removeSettledJobs(args.queueNames || [], args.olderThan);
+    async removeSettledJobs(@Args() args: MutationRemoveSettledJobsArgs) {
+        const strategy = this.requireInspectableJobQueueStrategy();
+        if (!strategy) {
+            return 0;
+        }
+        return strategy.removeSettledJobs(args.queueNames || [], args.olderThan);
     }
 
     @Mutation()
     @Allow(Permission.DeleteSettings)
-    cancelJob(@Args() args: MutationCancelJobArgs) {
-        return this.jobService.cancelJob(args.jobId);
+    async cancelJob(@Args() args: MutationCancelJobArgs) {
+        const strategy = this.requireInspectableJobQueueStrategy();
+        if (!strategy) {
+            return;
+        }
+        return strategy.cancelJob(args.jobId);
+    }
+
+    private requireInspectableJobQueueStrategy(): InspectableJobQueueStrategy | undefined {
+        if (!isInspectableJobQueueStrategy(this.configService.jobQueueOptions.jobQueueStrategy)) {
+            return;
+        }
+
+        return this.configService.jobQueueOptions.jobQueueStrategy;
     }
 }

+ 6 - 1
packages/core/src/common/constants.ts

@@ -1,6 +1,6 @@
 import { LanguageCode } from '@vendure/common/lib/generated-types';
 
-import { CrudPermissionDefinition, PermissionDefinition } from './permission-definition';
+import { CrudPermissionDefinition, PermissionDefinition, PermissionMetadata } from './permission-definition';
 
 /**
  * This value should be rarely used - only in those contexts where we have no access to the
@@ -41,3 +41,8 @@ export const DEFAULT_PERMISSIONS: PermissionDefinition[] = [
     new CrudPermissionDefinition('Promotion'),
     new CrudPermissionDefinition('Settings'),
 ];
+
+export function getAllPermissionsMetadata(customPermissions: PermissionDefinition[]): PermissionMetadata[] {
+    const allPermissions = [...DEFAULT_PERMISSIONS, ...customPermissions];
+    return allPermissions.reduce((all, def) => [...all, ...def.getMetadata()], [] as PermissionMetadata[]);
+}

+ 0 - 7
packages/core/src/common/permission-definition.ts

@@ -1,7 +1,5 @@
 import { Permission } from '@vendure/common/lib/generated-types';
 
-import { DEFAULT_PERMISSIONS } from './constants';
-
 /**
  * @description
  * Configures a {@link PermissionDefinition}
@@ -196,8 +194,3 @@ export class CrudPermissionDefinition extends PermissionDefinition {
         return `Delete${this.config.name}` as Permission;
     }
 }
-
-export function getAllPermissionsMetadata(customPermissions: PermissionDefinition[]): PermissionMetadata[] {
-    const allPermissions = [...DEFAULT_PERMISSIONS, ...customPermissions];
-    return allPermissions.reduce((all, def) => [...all, ...def.getMetadata()], [] as PermissionMetadata[]);
-}

+ 1 - 1
packages/core/src/config/default-config.ts

@@ -139,7 +139,7 @@ export const defaultConfig: RuntimeVendureConfig = {
     },
     jobQueueOptions: {
         jobQueueStrategy: new InMemoryJobQueueStrategy(),
-        pollInterval: 200,
+        activeQueues: [],
     },
     customFields: {
         Address: [],

+ 2 - 0
packages/core/src/config/index.ts

@@ -16,6 +16,8 @@ export * from './entity-id-strategy/uuid-id-strategy';
 export * from './fulfillment/custom-fulfillment-process';
 export * from './fulfillment/fulfillment-handler';
 export * from './fulfillment/manual-fulfillment-handler';
+export * from './job-queue/inspectable-job-queue-strategy';
+export * from './job-queue/job-queue-strategy';
 export * from './logger/default-logger';
 export * from './logger/noop-logger';
 export * from './logger/vendure-logger';

+ 55 - 0
packages/core/src/config/job-queue/inspectable-job-queue-strategy.ts

@@ -0,0 +1,55 @@
+import { JobListOptions } from '@vendure/common/lib/generated-types';
+import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
+
+import { Job } from '../../job-queue';
+
+import { JobQueueStrategy } from './job-queue-strategy';
+
+/**
+ * @description
+ * Defines a job queue strategy that can be inspected using the default admin ui
+ *
+ * @docsCategory JobQueue
+ */
+export interface InspectableJobQueueStrategy extends JobQueueStrategy {
+    /**
+     * @description
+     * Returns a job by its id.
+     */
+    findOne(id: ID): Promise<Job | undefined>;
+
+    /**
+     * @description
+     * Returns a list of jobs according to the specified options.
+     */
+    findMany(options?: JobListOptions): Promise<PaginatedList<Job>>;
+
+    /**
+     * @description
+     * Returns an array of jobs for the given ids.
+     */
+    findManyById(ids: ID[]): Promise<Job[]>;
+
+    /**
+     * @description
+     * Remove all settled jobs in the specified queues older than the given date.
+     * If no queueName is passed, all queues will be considered. If no olderThan
+     * date is passed, all jobs older than the current time will be removed.
+     *
+     * Returns a promise of the number of jobs removed.
+     */
+    removeSettledJobs(queueNames?: string[], olderThan?: Date): Promise<number>;
+
+    cancelJob(jobId: ID): Promise<Job | undefined>;
+}
+
+export function isInspectableJobQueueStrategy(
+    strategy: JobQueueStrategy,
+): strategy is InspectableJobQueueStrategy {
+    return (
+        (strategy as InspectableJobQueueStrategy).findOne !== undefined &&
+        (strategy as InspectableJobQueueStrategy).findMany !== undefined &&
+        (strategy as InspectableJobQueueStrategy).findManyById !== undefined &&
+        (strategy as InspectableJobQueueStrategy).removeSettledJobs !== undefined
+    );
+}

+ 14 - 38
packages/core/src/config/job-queue/job-queue-strategy.ts

@@ -1,9 +1,9 @@
-import { ModuleRef } from '@nestjs/core';
 import { JobListOptions } from '@vendure/common/lib/generated-types';
 import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
 
-import { InjectableStrategy } from '../../common/types/injectable-strategy';
-import { Job } from '../../job-queue/job';
+import { InjectableStrategy } from '../../common';
+import { JobData } from '../../job-queue';
+import { Job } from '../../job-queue';
 
 /**
  * @description
@@ -18,47 +18,23 @@ export interface JobQueueStrategy extends InjectableStrategy {
      * @description
      * Add a new job to the queue.
      */
-    add(job: Job): Promise<Job>;
+    add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>>;
 
     /**
      * @description
-     * Should return the next job in the given queue. The implementation is
-     * responsible for returning the correct job according to the time of
-     * creation.
+     * Start the job queue
      */
-    next(queueName: string): Promise<Job | undefined>;
+    start<Data extends JobData<Data> = {}>(
+        queueName: string,
+        process: (job: Job<Data>) => Promise<any>,
+    ): void;
 
     /**
      * @description
-     * Update the job details in the store.
+     * Stops a queue from running. Its not guaranteed to stop immediately.
      */
-    update(job: Job): Promise<void>;
-
-    /**
-     * @description
-     * Returns a job by its id.
-     */
-    findOne(id: ID): Promise<Job | undefined>;
-
-    /**
-     * @description
-     * Returns a list of jobs according to the specified options.
-     */
-    findMany(options?: JobListOptions): Promise<PaginatedList<Job>>;
-
-    /**
-     * @description
-     * Returns an array of jobs for the given ids.
-     */
-    findManyById(ids: ID[]): Promise<Job[]>;
-
-    /**
-     * @description
-     * Remove all settled jobs in the specified queues older than the given date.
-     * If no queueName is passed, all queues will be considered. If no olderThan
-     * date is passed, all jobs older than the current time will be removed.
-     *
-     * Returns a promise of the number of jobs removed.
-     */
-    removeSettledJobs(queueNames?: string[], olderThan?: Date): Promise<number>;
+    stop<Data extends JobData<Data> = {}>(
+        queueName: string,
+        process: (job: Job<Data>) => Promise<any>,
+    ): Promise<void>;
 }

+ 4 - 5
packages/core/src/config/vendure-config.ts

@@ -731,12 +731,11 @@ export interface JobQueueOptions {
     jobQueueStrategy?: JobQueueStrategy;
     /**
      * @description
-     * Defines the interval in ms used by the {@link JobQueueService} to poll for new
-     * jobs in the queue to process.
-     *
-     * @default 200
+     * Defines the queues that will run in this process.
+     * This can be used to configure only certain queues to run in this process.
+     * If its empty all queues will be run
      */
-    pollInterval?: number;
+    activeQueues?: string[];
 }
 
 /**

+ 1 - 9
packages/core/src/health-check/health-check.module.ts

@@ -18,16 +18,8 @@ export class HealthCheckModule {
         private configService: ConfigService,
         private healthCheckRegistryService: HealthCheckRegistryService,
         private typeOrm: TypeOrmHealthIndicator,
-        private microservice: MicroserviceHealthIndicator,
     ) {
         // Register the default health checks for database and worker
-        this.healthCheckRegistryService.registerIndicatorFunction([
-            () => this.typeOrm.pingCheck('database'),
-            () =>
-                this.microservice.pingCheck('worker', {
-                    transport: this.configService.workerOptions.transport,
-                    options: this.configService.workerOptions.options,
-                }),
-        ]);
+        this.healthCheckRegistryService.registerIndicatorFunction([() => this.typeOrm.pingCheck('database')]);
     }
 }

+ 9 - 6
packages/core/src/job-queue/in-memory-job-queue-strategy.ts

@@ -10,11 +10,12 @@ import {
 import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
 import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 
-import { generatePublicId } from '../common/generate-public-id';
-import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
-import { Logger } from '../config/logger/vendure-logger';
+import { Injector } from '../common';
+import { InspectableJobQueueStrategy } from '../config';
 
 import { Job } from './job';
+import { PollingJobQueueStrategy } from './polling-job-queue-strategy';
+import { JobData } from './types';
 
 /**
  * @description
@@ -25,21 +26,23 @@ import { Job } from './job';
  *
  * @docsCategory JobQueue
  */
-export class InMemoryJobQueueStrategy implements JobQueueStrategy {
+export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements InspectableJobQueueStrategy {
     protected jobs = new Map<ID, Job>();
     protected unsettledJobs: { [queueName: string]: Job[] } = {};
     private timer: any;
     private evictJobsAfterMs = 1000 * 60 * 60 * 2; // 2 hours
 
-    init() {
+    init(injector: Injector) {
+        super.init(injector);
         this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
     }
 
     destroy() {
+        super.destroy();
         clearTimeout(this.timer);
     }
 
-    async add(job: Job): Promise<Job> {
+    async add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>> {
         if (!job.id) {
             (job as any).id = Math.floor(Math.random() * 1000000000)
                 .toString()

+ 3 - 0
packages/core/src/job-queue/index.ts

@@ -1,4 +1,7 @@
+export * from './injectable-job-queue-strategy';
 export * from './job';
 export * from './job-queue';
 export * from './job-queue.service';
+export * from './polling-job-queue-strategy';
+export * from './queue-name-process-storage';
 export * from './types';

+ 43 - 0
packages/core/src/job-queue/injectable-job-queue-strategy.ts

@@ -0,0 +1,43 @@
+import { Injector } from '../common';
+
+import { Job } from './job';
+import { JobData } from './types';
+
+type ProcessFunc<Data extends JobData<Data> = {}> = (job: Job<Data>) => Promise<any>;
+
+/**
+ * @description
+ * This is a helper class for implementations of {@link JobQueueStrategy} that need to
+ * have init called before they can start a queue.
+ * It simply collects calls to {@link JobQueueStrategy} `start()` and calls `start()` again after init.
+ * When using the class `start()` should start with this snippet
+ *
+ * ```
+ * Typescript
+ * if (!this.hasInitialized) {
+ *   this.started.set(queueName, process);
+ *   return;
+ * }
+ * ```
+ */
+export abstract class InjectableJobQueueStrategy {
+    protected started = new Map<string, ProcessFunc<any>>();
+    protected hasInitialized = false;
+
+    init(injector: Injector) {
+        this.hasInitialized = true;
+        for (const [queueName, process] of this.started) {
+            this.start(queueName, process);
+        }
+        this.started.clear();
+    }
+
+    destroy() {
+        this.hasInitialized = false;
+    }
+
+    abstract start<Data extends JobData<Data> = {}>(
+        queueName: string,
+        process: (job: Job<Data>) => Promise<any>,
+    ): void;
+}

+ 141 - 81
packages/core/src/job-queue/job-queue.service.spec.ts

@@ -1,10 +1,14 @@
 /* tslint:disable:no-non-null-assertion */
+import { Injectable, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';
+import { ModuleRef } from '@nestjs/core';
 import { Test, TestingModule } from '@nestjs/testing';
 import { JobState } from '@vendure/common/lib/generated-types';
 import { Subject } from 'rxjs';
+import { take } from 'rxjs/operators';
 
+import { Injector } from '../common';
 import { ConfigService } from '../config/config.service';
-import { ProcessContext, ServerProcessContext } from '../process-context/process-context';
+import { ProcessContext, WorkerProcessContext } from '../process-context/process-context';
 
 import { Job } from './job';
 import { JobQueueService } from './job-queue.service';
@@ -20,7 +24,7 @@ describe('JobQueueService', () => {
         module = await Test.createTestingModule({
             providers: [
                 { provide: ConfigService, useClass: MockConfigService },
-                { provide: ProcessContext, useClass: ServerProcessContext },
+                { provide: ProcessContext, useClass: WorkerProcessContext },
                 JobQueueService,
             ],
         }).compile();
@@ -33,29 +37,27 @@ describe('JobQueueService', () => {
         await module.close();
     });
 
-    it('data is passed into job', (cb) => {
+    it('data is passed into job', async () => {
+        const subject = new Subject<string>();
+        const subNext = subject.pipe(take(1)).toPromise();
         const testQueue = jobQueueService.createQueue<string>({
             name: 'test',
-            concurrency: 1,
-            process: (job) => {
-                job.complete();
-                expect(job.data).toBe('hello');
-                cb();
+            process: async job => {
+                subject.next(job.data);
             },
         });
 
-        testQueue.add('hello');
+        await testQueue.add('hello');
+        const data = await subNext;
+        expect(data).toBe('hello');
     });
 
     it('job marked as complete', async () => {
-        const subject = new Subject();
+        const subject = new Subject<string>();
         const testQueue = jobQueueService.createQueue<string>({
             name: 'test',
-            concurrency: 1,
-            process: (job) => {
-                subject.subscribe(() => {
-                    job.complete('yay');
-                });
+            process: job => {
+                return subject.toPromise();
             },
         });
 
@@ -65,22 +67,22 @@ describe('JobQueueService', () => {
         await tick(queuePollInterval);
         expect(testJob.state).toBe(JobState.RUNNING);
 
-        subject.next();
+        subject.next('yay');
+        subject.complete();
+
+        await tick();
+
         expect(testJob.state).toBe(JobState.COMPLETED);
         expect(testJob.result).toBe('yay');
-
-        subject.complete();
     });
 
-    it('job marked as failed when .fail() called', async () => {
+    it('job marked as failed when exception thrown', async () => {
         const subject = new Subject();
         const testQueue = jobQueueService.createQueue<string>({
             name: 'test',
-            concurrency: 1,
-            process: (job) => {
-                subject.subscribe(() => {
-                    job.fail('uh oh');
-                });
+            process: async job => {
+                const result = await subject.toPromise();
+                throw result;
             },
         });
 
@@ -90,41 +92,19 @@ describe('JobQueueService', () => {
         await tick(queuePollInterval);
         expect(testJob.state).toBe(JobState.RUNNING);
 
-        subject.next();
-        expect(testJob.state).toBe(JobState.FAILED);
-        expect(testJob.error).toBe('uh oh');
-
+        subject.next('uh oh');
         subject.complete();
-    });
-
-    it('job marked as failed when sync error thrown', async () => {
-        const subject = new Subject();
-        const err = new Error('something bad happened');
-        const testQueue = jobQueueService.createQueue<string>({
-            name: 'test',
-            concurrency: 1,
-            process: (job) => {
-                throw err;
-            },
-        });
+        await tick();
 
-        const testJob = await testQueue.add('hello');
-        expect(testJob.state).toBe(JobState.PENDING);
-
-        await tick(queuePollInterval);
         expect(testJob.state).toBe(JobState.FAILED);
-        expect(testJob.error).toBe(err.message);
-
-        subject.complete();
+        expect(testJob.error).toBe('uh oh');
     });
 
     it('job marked as failed when async error thrown', async () => {
-        const subject = new Subject();
         const err = new Error('something bad happened');
         const testQueue = jobQueueService.createQueue<string>({
             name: 'test',
-            concurrency: 1,
-            process: async (job) => {
+            process: async job => {
                 throw err;
             },
         });
@@ -135,19 +115,14 @@ describe('JobQueueService', () => {
         await tick(queuePollInterval);
         expect(testJob.state).toBe(JobState.FAILED);
         expect(testJob.error).toBe(err.message);
-
-        subject.complete();
     });
 
     it('jobs processed in FIFO queue', async () => {
         const subject = new Subject();
         const testQueue = jobQueueService.createQueue<string>({
             name: 'test',
-            concurrency: 1,
-            process: (job) => {
-                subject.subscribe(() => {
-                    job.complete();
-                });
+            process: job => {
+                return subject.pipe(take(1)).toPromise();
             },
         });
 
@@ -162,32 +137,37 @@ describe('JobQueueService', () => {
         expect(getStates()).toEqual([JobState.RUNNING, JobState.PENDING, JobState.PENDING]);
 
         subject.next();
+        await tick();
         expect(getStates()).toEqual([JobState.COMPLETED, JobState.PENDING, JobState.PENDING]);
 
         await tick(queuePollInterval);
         expect(getStates()).toEqual([JobState.COMPLETED, JobState.RUNNING, JobState.PENDING]);
 
         subject.next();
+        await tick();
         expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
 
         await tick(queuePollInterval);
         expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
 
         subject.next();
+        await tick();
         expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
 
         subject.complete();
     });
 
     it('with concurrency', async () => {
+        const testingJobQueueStrategy = module.get(ConfigService).jobQueueOptions
+            .jobQueueStrategy as TestingJobQueueStrategy;
+
+        testingJobQueueStrategy.concurrency = 2;
+
         const subject = new Subject();
         const testQueue = jobQueueService.createQueue<string>({
             name: 'test',
-            concurrency: 2,
-            process: (job) => {
-                subject.subscribe(() => {
-                    job.complete();
-                });
+            process: job => {
+                return subject.pipe(take(1)).toPromise();
             },
         });
 
@@ -202,12 +182,14 @@ describe('JobQueueService', () => {
         expect(getStates()).toEqual([JobState.RUNNING, JobState.RUNNING, JobState.PENDING]);
 
         subject.next();
+        await tick();
         expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
 
         await tick(queuePollInterval);
         expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
 
         subject.next();
+        await tick();
         expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
 
         subject.complete();
@@ -217,7 +199,7 @@ describe('JobQueueService', () => {
         const testingJobQueueStrategy = module.get(ConfigService).jobQueueOptions
             .jobQueueStrategy as TestingJobQueueStrategy;
 
-        testingJobQueueStrategy.prePopulate([
+        await testingJobQueueStrategy.prePopulate([
             new Job<any>({
                 queueName: 'test',
                 data: {},
@@ -232,14 +214,15 @@ describe('JobQueueService', () => {
 
         const testQueue = jobQueueService.createQueue<string>({
             name: 'test',
-            concurrency: 1,
-            process: (job) => {
-                job.complete();
+            process: async job => {
+                return;
             },
         });
 
-        const job1 = await jobQueueService.getJob('job-1');
-        const job2 = await jobQueueService.getJob('job-2');
+        await tick();
+
+        const job1 = await testingJobQueueStrategy.findOne('job-1');
+        const job2 = await testingJobQueueStrategy.findOne('job-2');
         expect(job1?.state).toBe(JobState.COMPLETED);
         expect(job2?.state).toBe(JobState.PENDING);
 
@@ -251,9 +234,15 @@ describe('JobQueueService', () => {
         const subject = new Subject<boolean>();
         const testQueue = jobQueueService.createQueue<string>({
             name: 'test',
-            concurrency: 1,
-            process: (job) => {
-                subject.subscribe((success) => (success ? job.complete() : job.fail()));
+            process: job => {
+                return subject
+                    .pipe(take(1))
+                    .toPromise()
+                    .then(success => {
+                        if (!success) {
+                            throw new Error();
+                        }
+                    });
             },
         });
 
@@ -264,27 +253,32 @@ describe('JobQueueService', () => {
         expect(testJob.isSettled).toBe(false);
 
         subject.next(false);
+        await tick();
         expect(testJob.state).toBe(JobState.RETRYING);
         expect(testJob.isSettled).toBe(false);
 
         await tick(queuePollInterval);
         subject.next(false);
+        await tick();
         expect(testJob.state).toBe(JobState.RETRYING);
         expect(testJob.isSettled).toBe(false);
 
         await tick(queuePollInterval);
         subject.next(false);
+        await tick();
         expect(testJob.state).toBe(JobState.FAILED);
         expect(testJob.isSettled).toBe(true);
     });
 
     it('sets long-running jobs to pending on destroy', async () => {
+        const testingJobQueueStrategy = module.get(ConfigService).jobQueueOptions
+            .jobQueueStrategy as TestingJobQueueStrategy;
+
         const subject = new Subject<boolean>();
         const testQueue = jobQueueService.createQueue<string>({
             name: 'test',
-            concurrency: 1,
-            process: (job) => {
-                subject.subscribe((success) => (success ? job.complete() : job.fail()));
+            process: job => {
+                return subject.pipe(take(1)).toPromise();
             },
         });
 
@@ -292,21 +286,87 @@ describe('JobQueueService', () => {
 
         await tick(queuePollInterval);
 
-        expect((await jobQueueService.getJob(testJob.id!))?.state).toBe(JobState.RUNNING);
+        expect((await testingJobQueueStrategy.findOne(testJob.id!))?.state).toBe(JobState.RUNNING);
 
         await testQueue.destroy();
 
-        expect((await jobQueueService.getJob(testJob.id!))?.state).toBe(JobState.PENDING);
+        expect((await testingJobQueueStrategy.findOne(testJob.id!))?.state).toBe(JobState.PENDING);
     }, 10000);
+
+    it('should start a queue if its name is in the active list', async () => {
+        module.get(ConfigService).jobQueueOptions.activeQueues = ['test'];
+
+        const subject = new Subject();
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            process: job => {
+                return subject.toPromise();
+            },
+        });
+
+        const testJob = await testQueue.add('hello');
+        expect(testJob.state).toBe(JobState.PENDING);
+
+        await tick(queuePollInterval);
+        expect(testJob.state).toBe(JobState.RUNNING);
+
+        subject.next('yay');
+        subject.complete();
+        await tick();
+
+        expect(testJob.state).toBe(JobState.COMPLETED);
+        expect(testJob.result).toBe('yay');
+    });
+
+    it('should not start a queue if its name is in the active list', async () => {
+        module.get(ConfigService).jobQueueOptions.activeQueues = ['another'];
+
+        const subject = new Subject();
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            process: job => {
+                return subject.toPromise();
+            },
+        });
+
+        const testJob = await testQueue.add('hello');
+        expect(testJob.state).toBe(JobState.PENDING);
+
+        await tick(queuePollInterval);
+        expect(testJob.state).toBe(JobState.PENDING);
+
+        subject.next('yay');
+        subject.complete();
+
+        expect(testJob.state).toBe(JobState.PENDING);
+    });
 });
 
-function tick(ms: number): Promise<void> {
-    return new Promise<void>((resolve) => setTimeout(resolve, ms));
+function tick(ms: number = 0): Promise<void> {
+    return new Promise<void>(resolve => {
+        if (ms > 0) {
+            setTimeout(resolve, ms);
+        } else {
+            process.nextTick(resolve);
+        }
+    });
 }
 
-class MockConfigService {
+@Injectable()
+class MockConfigService implements OnApplicationBootstrap, OnModuleDestroy {
+    constructor(private moduleRef: ModuleRef) {}
+
     jobQueueOptions = {
-        jobQueueStrategy: new TestingJobQueueStrategy(),
-        pollInterval: queuePollInterval,
+        jobQueueStrategy: new TestingJobQueueStrategy(1, queuePollInterval),
+        activeQueues: [],
     };
+
+    async onApplicationBootstrap() {
+        const injector = new Injector(this.moduleRef);
+        await this.jobQueueOptions.jobQueueStrategy.init(injector);
+    }
+
+    async onModuleDestroy() {
+        await this.jobQueueOptions.jobQueueStrategy.destroy();
+    }
 }

+ 24 - 74
packages/core/src/job-queue/job-queue.service.ts

@@ -1,13 +1,9 @@
-import { Injectable, OnApplicationBootstrap, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
-import { JobListOptions, JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types';
-import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
+import { Injectable, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';
+import { JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types';
 
-import { ConfigService } from '../config/config.service';
-import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
-import { Logger } from '../config/logger/vendure-logger';
-import { ProcessContext } from '../process-context/process-context';
+import { ConfigService, JobQueueStrategy } from '../config';
+import { ProcessContext } from '../process-context';
 
-import { Job } from './job';
 import { JobQueue } from './job-queue';
 import { CreateQueueOptions, JobData } from './types';
 
@@ -27,14 +23,8 @@ import { CreateQueueOptions, JobData } from './types';
  *     // The JobQueue is created on initialization
  *     this.jobQueue = this.jobQueueService.createQueue({
  *       name: 'transcode-video',
- *       concurrency: 5,
  *       process: async job => {
- *         try {
- *           const result = await this.transcodeVideo(job.data.videoId);
- *           job.complete(result);
- *         } catch (e) {
- *           job.fail(e);
- *         }
+ *         return await this.transcodeVideo(job.data.videoId);
  *       },
  *     });
  *   }
@@ -43,7 +33,7 @@ import { CreateQueueOptions, JobData } from './types';
  *     this.jobQueue.add({ videoId, })
  *   }
  *
- *   private transcodeVideo(videoId: string) {
+ *   private async transcodeVideo(videoId: string) {
  *     // e.g. call some external transcoding service
  *   }
  *
@@ -54,7 +44,6 @@ import { CreateQueueOptions, JobData } from './types';
  */
 @Injectable()
 export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy {
-    private cleanJobsTimer: NodeJS.Timeout;
     private queues: Array<JobQueue<any>> = [];
     private hasInitialized = false;
 
@@ -66,25 +55,17 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
 
     /** @internal */
     async onApplicationBootstrap() {
-        if (this.processContext.isServer) {
-            const { pollInterval } = this.configService.jobQueueOptions;
-            if (pollInterval < 100) {
-                Logger.warn(
-                    `jobQueueOptions.pollInterval is set to ${pollInterval}ms. It is not recommended to set this lower than 100ms`,
-                );
-            }
-            await new Promise(resolve => setTimeout(resolve, 1000));
-            this.hasInitialized = true;
-            for (const queue of this.queues) {
-                if (!queue.started) {
-                    queue.start();
-                }
+        this.hasInitialized = true;
+        for (const queue of this.queues) {
+            if (!queue.started && this.shouldStartQueue(queue.name)) {
+                queue.start();
             }
         }
     }
 
     /** @internal */
     onModuleDestroy() {
+        this.hasInitialized = false;
         return Promise.all(this.queues.map(q => q.destroy()));
     }
 
@@ -93,42 +74,15 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
      * Configures and creates a new {@link JobQueue} instance.
      */
     createQueue<Data extends JobData<Data>>(options: CreateQueueOptions<Data>): JobQueue<Data> {
-        const { jobQueueStrategy, pollInterval } = this.configService.jobQueueOptions;
-        const queue = new JobQueue(options, jobQueueStrategy, pollInterval);
-        if (this.processContext.isServer && this.hasInitialized) {
+        const { jobQueueStrategy } = this.configService.jobQueueOptions;
+        const queue = new JobQueue(options, jobQueueStrategy);
+        if (this.hasInitialized && this.shouldStartQueue(queue.name)) {
             queue.start();
         }
         this.queues.push(queue);
         return queue;
     }
 
-    /**
-     * @description
-     * Gets a job by id. The implementation is handled by the configured
-     * {@link JobQueueStrategy}.
-     */
-    getJob(id: ID): Promise<Job | undefined> {
-        return this.jobQueueStrategy.findOne(id);
-    }
-
-    /**
-     * @description
-     * Gets jobs according to the supplied options. The implementation is handled by the configured
-     * {@link JobQueueStrategy}.
-     */
-    getJobs(options?: JobListOptions): Promise<PaginatedList<Job>> {
-        return this.jobQueueStrategy.findMany(options);
-    }
-
-    /**
-     * @description
-     * Gets jobs by ids. The implementation is handled by the configured
-     * {@link JobQueueStrategy}.
-     */
-    getJobsById(ids: ID[]): Promise<Job[]> {
-        return this.jobQueueStrategy.findManyById(ids);
-    }
-
     /**
      * @description
      * Returns an array of `{ name: string; running: boolean; }` for each
@@ -141,21 +95,17 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
         }));
     }
 
-    /**
-     * @description
-     * Removes settled jobs (completed or failed). The implementation is handled by the configured
-     * {@link JobQueueStrategy}.
-     */
-    removeSettledJobs(queueNames: string[], olderThan?: Date) {
-        return this.jobQueueStrategy.removeSettledJobs(queueNames, olderThan);
-    }
+    private shouldStartQueue(queueName: string): boolean {
+        if (this.processContext.isServer) {
+            return false;
+        }
 
-    async cancelJob(jobId: ID) {
-        const job = await this.jobQueueStrategy.findOne(jobId);
-        if (job) {
-            job.cancel();
-            await this.jobQueueStrategy.update(job);
-            return job;
+        if (this.configService.jobQueueOptions.activeQueues.length > 0) {
+            if (!this.configService.jobQueueOptions.activeQueues.includes(queueName)) {
+                return false;
+            }
         }
+
+        return true;
     }
 }

+ 11 - 90
packages/core/src/job-queue/job-queue.ts

@@ -2,7 +2,7 @@ import { JobState } from '@vendure/common/lib/generated-types';
 import { Subject, Subscription } from 'rxjs';
 import { throttleTime } from 'rxjs/operators';
 
-import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+import { JobQueueStrategy } from '../config';
 import { Logger } from '../config/logger/vendure-logger';
 
 import { Job } from './job';
@@ -21,16 +21,7 @@ import { CreateQueueOptions, JobConfig, JobData } from './types';
  * @docsCategory JobQueue
  */
 export class JobQueue<Data extends JobData<Data> = {}> {
-    private activeJobs: Array<Job<Data>> = [];
-    private timer: any;
-    private fooId: number;
     private running = false;
-    private errorNotifier$ = new Subject<[string, string]>();
-    private subscription: Subscription;
-
-    get concurrency(): number {
-        return this.options.concurrency;
-    }
 
     get name(): string {
         return this.options.name;
@@ -40,94 +31,34 @@ export class JobQueue<Data extends JobData<Data> = {}> {
         return this.running;
     }
 
-    constructor(
-        private options: CreateQueueOptions<Data>,
-        private jobQueueStrategy: JobQueueStrategy,
-        private pollInterval: number,
-    ) {
-        this.subscription = this.errorNotifier$.pipe(throttleTime(3000)).subscribe(([message, stack]) => {
-            Logger.error(message);
-            Logger.debug(stack);
-        });
-    }
+    constructor(private options: CreateQueueOptions<Data>, private jobQueueStrategy: JobQueueStrategy) {}
 
     /** @internal */
     start() {
         if (this.running) {
             return;
         }
-        Logger.debug(`Starting JobQueue "${this.options.name}"`);
         this.running = true;
-        const concurrency = this.options.concurrency;
-        const runNextJobs = async () => {
-            try {
-                const runningJobsCount = this.activeJobs.length;
-                for (let i = runningJobsCount; i < concurrency; i++) {
-                    const nextJob: Job<Data> | undefined = await this.jobQueueStrategy.next(
-                        this.options.name,
-                    );
-                    if (nextJob) {
-                        this.activeJobs.push(nextJob);
-                        await this.jobQueueStrategy.update(nextJob);
-                        nextJob.on('complete', job => this.onFailOrComplete(job));
-                        nextJob.on('progress', job => this.jobQueueStrategy.update(job));
-                        nextJob.on('fail', job => this.onFailOrComplete(job));
-                        try {
-                            const returnVal = this.options.process(nextJob);
-                            if (returnVal instanceof Promise) {
-                                returnVal.catch(err => nextJob.fail(err));
-                            }
-                        } catch (err) {
-                            nextJob.fail(err);
-                        }
-                    }
-                }
-            } catch (e) {
-                this.errorNotifier$.next([
-                    `Job queue "${this.options.name}" encountered an error (set log level to Debug for trace): ${e.message}`,
-                    e.stack,
-                ]);
-            }
-            if (this.running) {
-                this.timer = setTimeout(runNextJobs, this.pollInterval);
-            }
-        };
-
-        runNextJobs();
+        this.jobQueueStrategy.start<Data>(this.options.name, this.options.process);
     }
 
     /** @internal */
     pause() {
         Logger.debug(`Pausing JobQueue "${this.options.name}"`);
+        if (!this.running) {
+            return;
+        }
         this.running = false;
-        clearTimeout(this.timer);
+        this.jobQueueStrategy.stop(this.options.name, this.options.process);
     }
 
     /** @internal */
     async destroy(): Promise<void> {
+        if (!this.running) {
+            return;
+        }
         this.running = false;
-        clearTimeout(this.timer);
-        const start = +new Date();
-        // Wait for 2 seconds to allow running jobs to complete
-        const maxTimeout = 2000;
-        return new Promise(resolve => {
-            const pollActiveJobs = async () => {
-                const timedOut = +new Date() - start > maxTimeout;
-                if (this.activeJobs.length === 0 || timedOut) {
-                    // if there are any incomplete jobs after the 2 second
-                    // wait period, set them back to "pending" so they can
-                    // be re-run on next bootstrap.
-                    for (const job of this.activeJobs) {
-                        job.defer();
-                        await this.jobQueueStrategy.update(job);
-                    }
-                    resolve();
-                } else {
-                    setTimeout(pollActiveJobs, 50);
-                }
-            };
-            pollActiveJobs();
-        });
+        return this.jobQueueStrategy.stop(this.options.name, this.options.process);
     }
 
     /**
@@ -142,14 +73,4 @@ export class JobQueue<Data extends JobData<Data> = {}> {
         });
         return this.jobQueueStrategy.add(job);
     }
-
-    private async onFailOrComplete(job: Job<Data>) {
-        await this.jobQueueStrategy.update(job);
-        this.removeJobFromActive(job);
-    }
-
-    private removeJobFromActive(job: Job<Data>) {
-        const index = this.activeJobs.indexOf(job);
-        this.activeJobs.splice(index, 1);
-    }
 }

+ 9 - 11
packages/core/src/job-queue/job.ts

@@ -1,5 +1,4 @@
 import { JobState } from '@vendure/common/lib/generated-types';
-import { ID } from '@vendure/common/lib/shared-types';
 import { isClassInstance, isObject } from '@vendure/common/lib/shared-utils';
 
 import { JobConfig, JobData } from './types';
@@ -11,7 +10,7 @@ import { JobConfig, JobData } from './types';
  * @docsCategory JobQueue
  * @docsPage Job
  */
-export type JobEventType = 'start' | 'progress' | 'complete' | 'fail' | 'cancel';
+export type JobEventType = 'progress';
 
 /**
  * @description
@@ -34,7 +33,7 @@ export type JobEventListener<T extends JobData<T>> = (job: Job<T>) => void;
  * @docsWeight 0
  */
 export class Job<T extends JobData<T> = any> {
-    readonly id: ID | null;
+    readonly id: number | string | null;
     readonly queueName: string;
     readonly retries: number;
     readonly createdAt: Date;
@@ -47,11 +46,7 @@ export class Job<T extends JobData<T> = any> {
     private _startedAt?: Date;
     private _settledAt?: Date;
     private readonly eventListeners: { [type in JobEventType]: Array<JobEventListener<T>> } = {
-        start: [],
         progress: [],
-        complete: [],
-        fail: [],
-        cancel: [],
     };
 
     get name(): string {
@@ -124,7 +119,6 @@ export class Job<T extends JobData<T> = any> {
             this._state = JobState.RUNNING;
             this._startedAt = new Date();
             this._attempts++;
-            this.fireEvent('start');
         }
     }
 
@@ -147,7 +141,6 @@ export class Job<T extends JobData<T> = any> {
         this._progress = 100;
         this._state = JobState.COMPLETED;
         this._settledAt = new Date();
-        this.fireEvent('complete');
     }
 
     /**
@@ -163,14 +156,12 @@ export class Job<T extends JobData<T> = any> {
             this._state = JobState.FAILED;
             this._settledAt = new Date();
         }
-        this.fireEvent('fail');
     }
 
     cancel() {
         this._progress = 0;
         this._settledAt = new Date();
         this._state = JobState.CANCELLED;
-        this.fireEvent('cancel');
     }
 
     /**
@@ -193,6 +184,13 @@ export class Job<T extends JobData<T> = any> {
         this.eventListeners[eventType].push(listener);
     }
 
+    off(eventType: JobEventType, listener: JobEventListener<T>) {
+        const idx = this.eventListeners[eventType].indexOf(listener);
+        if (idx !== -1) {
+            this.eventListeners[eventType].splice(idx, 1);
+        }
+    }
+
     private fireEvent(eventType: JobEventType) {
         for (const listener of this.eventListeners[eventType]) {
             listener(this);

+ 188 - 0
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -0,0 +1,188 @@
+import { ID } from '@vendure/common/lib/shared-types';
+import { Subject, Subscription } from 'rxjs';
+import { throttleTime } from 'rxjs/operators';
+
+import { Logger } from '../config/logger/vendure-logger';
+
+import { InjectableJobQueueStrategy } from './injectable-job-queue-strategy';
+import { Job } from './job';
+import { QueueNameProcessStorage } from './queue-name-process-storage';
+import { JobData } from './types';
+
+class ActiveQueue<Data extends JobData<Data> = {}> {
+    private timer: any;
+    private running = false;
+    private activeJobs: Array<Job<Data>> = [];
+
+    private errorNotifier$ = new Subject<[string, string]>();
+    private subscription: Subscription;
+
+    constructor(
+        private readonly queueName: string,
+        private readonly process: (job: Job<Data>) => Promise<any>,
+        private readonly jobQueueStrategy: PollingJobQueueStrategy,
+    ) {
+        this.subscription = this.errorNotifier$.pipe(throttleTime(3000)).subscribe(([message, stack]) => {
+            Logger.error(message);
+            Logger.debug(stack);
+        });
+    }
+
+    start() {
+        Logger.debug(`Starting JobQueue "${this.queueName}"`);
+        this.running = true;
+        const runNextJobs = async () => {
+            try {
+                const runningJobsCount = this.activeJobs.length;
+                for (let i = runningJobsCount; i < this.jobQueueStrategy.concurrency; i++) {
+                    const nextJob = await this.jobQueueStrategy.next(this.queueName);
+                    if (nextJob) {
+                        this.activeJobs.push(nextJob);
+                        await this.jobQueueStrategy.update(nextJob);
+                        const onProgress = (job: Job) => this.jobQueueStrategy.update(job);
+                        nextJob.on('progress', onProgress);
+                        this.process(nextJob)
+                            .then(
+                                result => {
+                                    nextJob.complete(result);
+                                },
+                                err => {
+                                    nextJob.fail(err);
+                                },
+                            )
+                            .finally(() => {
+                                if (!this.running) {
+                                    return;
+                                }
+                                nextJob.off('progress', onProgress);
+                                return this.onFailOrComplete(nextJob);
+                            })
+                            .catch(err => {
+                                Logger.warn(`Error updating job info: ${err}`);
+                            });
+                    }
+                }
+            } catch (e) {
+                this.errorNotifier$.next([
+                    `Job queue "${this.queueName}" encountered an error (set log level to Debug for trace): ${e.message}`,
+                    e.stack,
+                ]);
+            }
+            if (this.running) {
+                this.timer = setTimeout(runNextJobs, this.jobQueueStrategy.pollInterval);
+            }
+        };
+
+        runNextJobs();
+    }
+
+    stop(): Promise<void> {
+        this.running = false;
+        clearTimeout(this.timer);
+
+        const start = +new Date();
+        // Wait for 2 seconds to allow running jobs to complete
+        const maxTimeout = 2000;
+        return new Promise(resolve => {
+            const pollActiveJobs = async () => {
+                const timedOut = +new Date() - start > maxTimeout;
+                if (this.activeJobs.length === 0 || timedOut) {
+                    // if there are any incomplete jobs after the 2 second
+                    // wait period, set them back to "pending" so they can
+                    // be re-run on next bootstrap.
+                    for (const job of this.activeJobs) {
+                        job.defer();
+                        try {
+                            await this.jobQueueStrategy.update(job);
+                        } catch (err) {
+                            Logger.info(`Error stopping job queue: ${err}`);
+                        }
+                    }
+                    resolve();
+                } else {
+                    setTimeout(pollActiveJobs, 50);
+                }
+            };
+            pollActiveJobs();
+        });
+    }
+
+    private async onFailOrComplete(job: Job<Data>) {
+        await this.jobQueueStrategy.update(job);
+        this.removeJobFromActive(job);
+    }
+
+    private removeJobFromActive(job: Job<Data>) {
+        const index = this.activeJobs.indexOf(job);
+        this.activeJobs.splice(index, 1);
+    }
+}
+
+/**
+ * @description
+ * This class allows easier implementation of {@link JobQueueStrategy} in a polling style.
+ * Instead of providing {@link JobQueueStrategy} `start()` you should provide a `next` method.
+ */
+export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy {
+    private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
+
+    constructor(public concurrency: number = 1, public pollInterval: number = 200) {
+        super();
+    }
+
+    start<Data extends JobData<Data> = {}>(
+        queueName: string,
+        process: (job: Job<Data>) => Promise<any>,
+    ): void {
+        if (!this.hasInitialized) {
+            this.started.set(queueName, process);
+            return;
+        }
+        if (this.activeQueues.has(queueName, process)) {
+            return;
+        }
+        const active = new ActiveQueue<Data>(queueName, process, this);
+        active.start();
+        this.activeQueues.set(queueName, process, active);
+    }
+
+    async stop<Data extends JobData<Data> = {}>(
+        queueName: string,
+        process: (job: Job<Data>) => Promise<any>,
+    ) {
+        const active = this.activeQueues.getAndDelete(queueName, process);
+        if (!active) {
+            return;
+        }
+        await active.stop();
+    }
+
+    async cancelJob(jobId: ID): Promise<Job | undefined> {
+        const job = await this.findOne(jobId);
+        if (job) {
+            job.cancel();
+            await this.update(job);
+            return job;
+        }
+    }
+
+    /**
+     * @description
+     * Should return the next job in the given queue. The implementation is
+     * responsible for returning the correct job according to the time of
+     * creation.
+     */
+    abstract next(queueName: string): Promise<Job | undefined>;
+
+    /**
+     * @description
+     * Update the job details in the store.
+     */
+    abstract update(job: Job): Promise<void>;
+
+    /**
+     * @description
+     * Returns a job by its id.
+     */
+    abstract findOne(id: ID): Promise<Job | undefined>;
+}

+ 38 - 0
packages/core/src/job-queue/queue-name-process-storage.ts

@@ -0,0 +1,38 @@
+import { Job } from './job';
+
+export class QueueNameProcessStorage<T> {
+    private items = new Map<string, Map<(job: Job) => Promise<any>, T>>();
+
+    set(queueName: string, process: (job: Job) => Promise<any>, listener: T) {
+        let items = this.items.get(queueName);
+        if (!items) {
+            items = new Map();
+            this.items.set(queueName, items);
+        }
+        items.set(process, listener);
+    }
+
+    has(queueName: string, process: (job: Job) => Promise<any>): boolean {
+        const items = this.items.get(queueName);
+        if (!items) {
+            return false;
+        }
+        return items.has(process);
+    }
+
+    getAndDelete(queueName: string, process: (job: Job) => Promise<any>): T | undefined {
+        const items = this.items.get(queueName);
+        if (!items) {
+            return;
+        }
+        const item = items.get(process);
+        if (!item) {
+            return;
+        }
+        items.delete(process);
+        if (items.size === 0) {
+            this.items.delete(queueName);
+        }
+        return item;
+    }
+}

+ 2 - 2
packages/core/src/job-queue/testing-job-queue-strategy.ts

@@ -6,9 +6,9 @@ import { Job } from './job';
  * An in-memory {@link JobQueueStrategy} design for testing purposes.
  */
 export class TestingJobQueueStrategy extends InMemoryJobQueueStrategy {
-    prePopulate(jobs: Job[]) {
+    async prePopulate(jobs: Job[]) {
         for (const job of jobs) {
-            this.add(job);
+            await this.add(job);
         }
     }
 }

+ 3 - 9
packages/core/src/job-queue/types.ts

@@ -18,16 +18,10 @@ export interface CreateQueueOptions<T extends JobData<T>> {
     name: string;
     /**
      * @description
-     * How many jobs of this type may be run concurrently.
+     * Defines the work to be done for each job in the queue. The returned promise
+     * should resolve when the job is complete, or be rejected in case of an error.
      */
-    concurrency: number;
-    /**
-     * @description
-     * Defines the work to be done for each job in the queue. When the work is complete,
-     * `job.complete()` should be called, and for any errors, `job.fail()` should be called.
-     * Unhandled exceptions will automatically call `job.fail()`.
-     */
-    process: (job: Job<T>) => any | Promise<any>;
+    process: (job: Job<T>) => Promise<any>;
 }
 
 /**

+ 21 - 20
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -3,9 +3,10 @@ import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
 import { Brackets, Connection, FindConditions, In, LessThan } from 'typeorm';
 
 import { Injector } from '../../common/injector';
-import { JobQueueStrategy } from '../../config/job-queue/job-queue-strategy';
-import { Job } from '../../job-queue/job';
-import { ProcessContext } from '../../process-context/process-context';
+import { InspectableJobQueueStrategy, JobQueueStrategy } from '../../config';
+import { Job, JobData } from '../../job-queue';
+import { PollingJobQueueStrategy } from '../../job-queue/polling-job-queue-strategy';
+import { TransactionalConnection } from '../../service';
 import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder';
 
 import { JobRecord } from './job-record.entity';
@@ -17,21 +18,24 @@ import { JobRecord } from './job-record.entity';
  *
  * @docsCategory JobQueue
  */
-export class SqlJobQueueStrategy implements JobQueueStrategy {
+export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements InspectableJobQueueStrategy {
     private connection: Connection | undefined;
     private listQueryBuilder: ListQueryBuilder;
 
     init(injector: Injector) {
-        const processContext = injector.get(ProcessContext);
-        if (processContext.isServer) {
-            this.connection = injector.getConnection();
-            this.listQueryBuilder = injector.get(ListQueryBuilder);
-        }
+        this.connection = injector.get(TransactionalConnection).rawConnection;
+        this.listQueryBuilder = injector.get(ListQueryBuilder);
+        super.init(injector);
+    }
+
+    destroy() {
+        this.connection = undefined;
+        super.destroy();
     }
 
-    async add(job: Job): Promise<Job> {
+    async add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>> {
         if (!this.connectionAvailable(this.connection)) {
-            return job;
+            throw new Error('Connection not available');
         }
         const newRecord = this.toRecord(job);
         const record = await this.connection.getRepository(JobRecord).save(newRecord);
@@ -40,7 +44,7 @@ export class SqlJobQueueStrategy implements JobQueueStrategy {
 
     async next(queueName: string): Promise<Job | undefined> {
         if (!this.connectionAvailable(this.connection)) {
-            return;
+            throw new Error('Connection not available');
         }
         const record = await this.connection
             .getRepository(JobRecord)
@@ -64,17 +68,14 @@ export class SqlJobQueueStrategy implements JobQueueStrategy {
 
     async update(job: Job<any>): Promise<void> {
         if (!this.connectionAvailable(this.connection)) {
-            return;
+            throw new Error('Connection not available');
         }
         await this.connection.getRepository(JobRecord).save(this.toRecord(job));
     }
 
     async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
         if (!this.connectionAvailable(this.connection)) {
-            return {
-                items: [],
-                totalItems: 0,
-            };
+            throw new Error('Connection not available');
         }
         return this.listQueryBuilder
             .build(JobRecord, options)
@@ -87,7 +88,7 @@ export class SqlJobQueueStrategy implements JobQueueStrategy {
 
     async findOne(id: ID): Promise<Job | undefined> {
         if (!this.connectionAvailable(this.connection)) {
-            return;
+            throw new Error('Connection not available');
         }
         const record = await this.connection.getRepository(JobRecord).findOne(id);
         if (record) {
@@ -97,7 +98,7 @@ export class SqlJobQueueStrategy implements JobQueueStrategy {
 
     async findManyById(ids: ID[]): Promise<Job[]> {
         if (!this.connectionAvailable(this.connection)) {
-            return [];
+            throw new Error('Connection not available');
         }
         return this.connection
             .getRepository(JobRecord)
@@ -107,7 +108,7 @@ export class SqlJobQueueStrategy implements JobQueueStrategy {
 
     async removeSettledJobs(queueNames: string[] = [], olderThan?: Date) {
         if (!this.connectionAvailable(this.connection)) {
-            return 0;
+            throw new Error('Connection not available');
         }
         const findOptions: FindConditions<JobRecord> = {
             ...(0 < queueNames.length ? { queueName: In(queueNames) } : {}),

+ 4 - 3
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -1,3 +1,4 @@
+import { OnApplicationBootstrap } from '@nestjs/common';
 import { SearchReindexResponse } from '@vendure/common/lib/generated-types';
 import { ID } from '@vendure/common/lib/shared-types';
 import { buffer, debounceTime, delay, filter, map } from 'rxjs/operators';
@@ -12,7 +13,7 @@ import { ProductVariantChannelEvent } from '../../event-bus/events/product-varia
 import { ProductVariantEvent } from '../../event-bus/events/product-variant-event';
 import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
 import { PluginCommonModule } from '../plugin-common.module';
-import { OnVendureBootstrap, VendurePlugin } from '../vendure-plugin';
+import { VendurePlugin } from '../vendure-plugin';
 
 import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
 import { FulltextSearchService } from './fulltext-search.service';
@@ -62,12 +63,12 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
     entities: [SearchIndexItem],
     workers: [IndexerController],
 })
-export class DefaultSearchPlugin implements OnVendureBootstrap {
+export class DefaultSearchPlugin implements OnApplicationBootstrap {
     /** @internal */
     constructor(private eventBus: EventBus, private searchIndexService: SearchIndexService) {}
 
     /** @internal */
-    async onVendureBootstrap() {
+    async onApplicationBootstrap() {
         this.searchIndexService.initJobQueue();
 
         this.eventBus.ofType(ProductEvent).subscribe(event => {

+ 49 - 57
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -41,49 +41,37 @@ export class SearchIndexService {
     initJobQueue() {
         updateIndexQueue = this.jobService.createQueue({
             name: 'update-search-index',
-            concurrency: 1,
             process: job => {
                 const data = job.data;
                 switch (data.type) {
                     case 'reindex':
                         Logger.verbose(`sending ReindexMessage`);
-                        this.sendMessageWithProgress(job, new ReindexMessage(data));
-                        break;
+                        return this.sendMessageWithProgress(job, new ReindexMessage(data));
                     case 'update-product':
-                        this.sendMessage(job, new UpdateProductMessage(data));
-                        break;
+                        return this.sendMessage(job, new UpdateProductMessage(data));
                     case 'update-variants':
-                        this.sendMessage(job, new UpdateVariantMessage(data));
-                        break;
+                        return this.sendMessage(job, new UpdateVariantMessage(data));
                     case 'delete-product':
-                        this.sendMessage(job, new DeleteProductMessage(data));
-                        break;
+                        return this.sendMessage(job, new DeleteProductMessage(data));
                     case 'delete-variant':
-                        this.sendMessage(job, new DeleteVariantMessage(data));
-                        break;
+                        return this.sendMessage(job, new DeleteVariantMessage(data));
                     case 'update-variants-by-id':
-                        this.sendMessageWithProgress(job, new UpdateVariantsByIdMessage(data));
-                        break;
+                        return this.sendMessageWithProgress(job, new UpdateVariantsByIdMessage(data));
                     case 'update-asset':
-                        this.sendMessage(job, new UpdateAssetMessage(data));
-                        break;
+                        return this.sendMessage(job, new UpdateAssetMessage(data));
                     case 'delete-asset':
-                        this.sendMessage(job, new DeleteAssetMessage(data));
-                        break;
+                        return this.sendMessage(job, new DeleteAssetMessage(data));
                     case 'assign-product-to-channel':
-                        this.sendMessage(job, new AssignProductToChannelMessage(data));
-                        break;
+                        return this.sendMessage(job, new AssignProductToChannelMessage(data));
                     case 'remove-product-from-channel':
-                        this.sendMessage(job, new RemoveProductFromChannelMessage(data));
-                        break;
+                        return this.sendMessage(job, new RemoveProductFromChannelMessage(data));
                     case 'assign-variant-to-channel':
-                        this.sendMessage(job, new AssignVariantToChannelMessage(data));
-                        break;
+                        return this.sendMessage(job, new AssignVariantToChannelMessage(data));
                     case 'remove-variant-from-channel':
-                        this.sendMessage(job, new RemoveVariantFromChannelMessage(data));
-                        break;
+                        return this.sendMessage(job, new RemoveVariantFromChannelMessage(data));
                     default:
                         assertNever(data);
+                        return Promise.resolve();
                 }
             },
         });
@@ -165,41 +153,45 @@ export class SearchIndexService {
         }
     }
 
-    private sendMessage(job: Job<any>, message: WorkerMessage<any, any>) {
-        this.workerService.send(message).subscribe({
-            complete: () => job.complete(true),
-            error: err => {
-                Logger.error(err);
-                job.fail(err);
-            },
+    private sendMessage(job: Job<any>, message: WorkerMessage<any, any>): Promise<any> {
+        return new Promise((resolve, reject) => {
+            this.workerService.send(message).subscribe({
+                complete: () => resolve(),
+                error: err => {
+                    Logger.error(err);
+                    reject(err);
+                },
+            });
         });
     }
 
-    private sendMessageWithProgress(job: Job<any>, message: ReindexMessage | UpdateVariantsByIdMessage) {
-        let total: number | undefined;
-        let duration = 0;
-        let completed = 0;
-        this.workerService.send(message).subscribe({
-            next: (response: ReindexMessageResponse) => {
-                if (!total) {
-                    total = response.total;
-                }
-                duration = response.duration;
-                completed = response.completed;
-                const progress = total === 0 ? 100 : Math.ceil((completed / total) * 100);
-                job.setProgress(progress);
-            },
-            complete: () => {
-                job.complete({
-                    success: true,
-                    indexedItemCount: total,
-                    timeTaken: duration,
-                });
-            },
-            error: (err: any) => {
-                Logger.error(JSON.stringify(err));
-                job.fail();
-            },
+    private sendMessageWithProgress(job: Job<any>, message: ReindexMessage | UpdateVariantsByIdMessage): Promise<any> {
+        return new Promise((resolve, reject) => {
+            let total: number | undefined;
+            let duration = 0;
+            let completed = 0;
+            this.workerService.send(message).subscribe({
+                next: (response: ReindexMessageResponse) => {
+                    if (!total) {
+                        total = response.total;
+                    }
+                    duration = response.duration;
+                    completed = response.completed;
+                    const progress = total === 0 ? 100 : Math.ceil((completed / total) * 100);
+                    job.setProgress(progress);
+                },
+                complete: () => {
+                    resolve({
+                        success: true,
+                        indexedItemCount: total,
+                        timeTaken: duration,
+                    });
+                },
+                error: (err: any) => {
+                    Logger.error(JSON.stringify(err));
+                    reject(err);
+                },
+            });
         });
     }
 }

+ 25 - 24
packages/core/src/service/services/collection.service.ts

@@ -75,7 +75,7 @@ export class CollectionService implements OnModuleInit {
             .pipe(debounceTime(50))
             .subscribe(async event => {
                 const collections = await this.connection.getRepository(Collection).find();
-                this.applyFiltersQueue.add({
+                await this.applyFiltersQueue.add({
                     ctx: event.ctx.serialize(),
                     collectionIds: collections.map(c => c.id),
                 });
@@ -83,12 +83,11 @@ export class CollectionService implements OnModuleInit {
 
         this.applyFiltersQueue = this.jobQueueService.createQueue({
             name: 'apply-collection-filters',
-            concurrency: 1,
             process: async job => {
                 const collections = await this.connection
                     .getRepository(Collection)
                     .findByIds(job.data.collectionIds);
-                this.applyCollectionFilters(job.data.ctx, collections, job);
+                return this.applyCollectionFilters(job.data.ctx, collections, job);
             },
         });
     }
@@ -300,7 +299,7 @@ export class CollectionService implements OnModuleInit {
         });
         await this.assetService.updateEntityAssets(ctx, collection, input);
         await this.customFieldRelationService.updateRelations(ctx, Collection, input, collection);
-        this.applyFiltersQueue.add({
+        await this.applyFiltersQueue.add({
             ctx: ctx.serialize(),
             collectionIds: [collection.id],
         });
@@ -324,7 +323,7 @@ export class CollectionService implements OnModuleInit {
         });
         await this.customFieldRelationService.updateRelations(ctx, Collection, input, collection);
         if (input.filters) {
-            this.applyFiltersQueue.add({
+            await this.applyFiltersQueue.add({
                 ctx: ctx.serialize(),
                 collectionIds: [collection.id],
             });
@@ -374,7 +373,7 @@ export class CollectionService implements OnModuleInit {
         siblings = moveToIndex(input.index, target, siblings);
 
         await this.connection.getRepository(ctx, Collection).save(siblings);
-        this.applyFiltersQueue.add({
+        await this.applyFiltersQueue.add({
             ctx: ctx.serialize(),
             collectionIds: [target.id],
         });
@@ -404,24 +403,26 @@ export class CollectionService implements OnModuleInit {
         const collectionIds = collections.map(c => c.id);
         const requestContext = RequestContext.deserialize(ctx);
 
-        this.workerService.send(new ApplyCollectionFiltersMessage({ collectionIds })).subscribe({
-            next: ({ total, completed, duration, collectionId, affectedVariantIds }) => {
-                const progress = Math.ceil((completed / total) * 100);
-                const collection = collections.find(c => idsAreEqual(c.id, collectionId));
-                if (collection) {
-                    this.eventBus.publish(
-                        new CollectionModificationEvent(requestContext, collection, affectedVariantIds),
-                    );
-                }
-                job.setProgress(progress);
-            },
-            complete: () => {
-                job.complete();
-            },
-            error: err => {
-                Logger.error(err);
-                job.fail(err);
-            },
+        return new Promise<void>((resolve, reject) => {
+            this.workerService.send(new ApplyCollectionFiltersMessage({ collectionIds })).subscribe({
+                next: ({ total, completed, duration, collectionId, affectedVariantIds }) => {
+                    const progress = Math.ceil((completed / total) * 100);
+                    const collection = collections.find(c => idsAreEqual(c.id, collectionId));
+                    if (collection) {
+                        this.eventBus.publish(
+                            new CollectionModificationEvent(requestContext, collection, affectedVariantIds),
+                        );
+                    }
+                    job.setProgress(progress);
+                },
+                complete: () => {
+                    resolve();
+                },
+                error: err => {
+                    Logger.error(err);
+                    reject(err);
+                },
+            });
         });
     }
 

+ 1 - 1
packages/core/src/service/services/role.service.ts

@@ -16,13 +16,13 @@ import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
 import { unique } from '@vendure/common/lib/unique';
 
 import { RequestContext } from '../../api/common/request-context';
+import { getAllPermissionsMetadata } from '../../common/constants';
 import {
     EntityNotFoundError,
     ForbiddenError,
     InternalServerError,
     UserInputError,
 } from '../../common/error/errors';
-import { getAllPermissionsMetadata } from '../../common/permission-definition';
 import { ListQueryOptions } from '../../common/types/common-types';
 import { assertFound, idsAreEqual } from '../../common/utils';
 import { ConfigService } from '../../config/config.service';

+ 47 - 55
packages/elasticsearch-plugin/src/elasticsearch-index.service.ts

@@ -40,49 +40,37 @@ export class ElasticsearchIndexService {
     initJobQueue() {
         updateIndexQueue = this.jobService.createQueue({
             name: 'update-search-index',
-            concurrency: 1,
             process: job => {
                 const data = job.data;
                 switch (data.type) {
                     case 'reindex':
                         Logger.verbose(`sending ReindexMessage`);
-                        this.sendMessageWithProgress(job, new ReindexMessage(data));
-                        break;
+                        return this.sendMessageWithProgress(job, new ReindexMessage(data));
                     case 'update-product':
-                        this.sendMessage(job, new UpdateProductMessage(data));
-                        break;
+                        return this.sendMessage(job, new UpdateProductMessage(data));
                     case 'update-variants':
-                        this.sendMessage(job, new UpdateVariantMessage(data));
-                        break;
+                        return this.sendMessage(job, new UpdateVariantMessage(data));
                     case 'delete-product':
-                        this.sendMessage(job, new DeleteProductMessage(data));
-                        break;
+                        return this.sendMessage(job, new DeleteProductMessage(data));
                     case 'delete-variant':
-                        this.sendMessage(job, new DeleteVariantMessage(data));
-                        break;
+                        return this.sendMessage(job, new DeleteVariantMessage(data));
                     case 'update-variants-by-id':
-                        this.sendMessageWithProgress(job, new UpdateVariantsByIdMessage(data));
-                        break;
+                        return this.sendMessageWithProgress(job, new UpdateVariantsByIdMessage(data));
                     case 'update-asset':
-                        this.sendMessage(job, new UpdateAssetMessage(data));
-                        break;
+                        return this.sendMessage(job, new UpdateAssetMessage(data));
                     case 'delete-asset':
-                        this.sendMessage(job, new DeleteAssetMessage(data));
-                        break;
+                        return this.sendMessage(job, new DeleteAssetMessage(data));
                     case 'assign-product-to-channel':
-                        this.sendMessage(job, new AssignProductToChannelMessage(data));
-                        break;
+                        return this.sendMessage(job, new AssignProductToChannelMessage(data));
                     case 'remove-product-from-channel':
-                        this.sendMessage(job, new RemoveProductFromChannelMessage(data));
-                        break;
+                        return this.sendMessage(job, new RemoveProductFromChannelMessage(data));
                     case 'assign-variant-to-channel':
-                        this.sendMessage(job, new AssignVariantToChannelMessage(data));
-                        break;
+                        return this.sendMessage(job, new AssignVariantToChannelMessage(data));
                     case 'remove-variant-from-channel':
-                        this.sendMessage(job, new RemoveVariantFromChannelMessage(data));
-                        break;
+                        return this.sendMessage(job, new RemoveVariantFromChannelMessage(data));
                     default:
                         assertNever(data);
+                        return Promise.resolve();
                 }
             },
         });
@@ -165,40 +153,44 @@ export class ElasticsearchIndexService {
     }
 
     private sendMessage(job: Job<any>, message: WorkerMessage<any, any>) {
-        this.workerService.send(message).subscribe({
-            complete: () => job.complete(true),
-            error: err => {
-                Logger.error(err);
-                job.fail(err);
-            },
+        return new Promise((resolve, reject) => {
+            this.workerService.send(message).subscribe({
+                complete: () => resolve(true),
+                error: err => {
+                    Logger.error(err);
+                    reject(err);
+                },
+            });
         });
     }
 
     private sendMessageWithProgress(job: Job<any>, message: WorkerMessage<any, ReindexMessageResponse>) {
-        let total: number | undefined;
-        let duration = 0;
-        let completed = 0;
-        this.workerService.send(message).subscribe({
-            next: (response: ReindexMessageResponse) => {
-                if (!total) {
-                    total = response.total;
-                }
-                duration = response.duration;
-                completed = response.completed;
-                const progress = total === 0 ? 100 : Math.ceil((completed / total) * 100);
-                job.setProgress(progress);
-            },
-            complete: () => {
-                job.complete({
-                    success: true,
-                    indexedItemCount: total,
-                    timeTaken: duration,
-                });
-            },
-            error: (err: any) => {
-                Logger.error(JSON.stringify(err));
-                job.fail();
-            },
+        return new Promise((resolve, reject) => {
+            let total: number | undefined;
+            let duration = 0;
+            let completed = 0;
+            this.workerService.send(message).subscribe({
+                next: (response: ReindexMessageResponse) => {
+                    if (!total) {
+                        total = response.total;
+                    }
+                    duration = response.duration;
+                    completed = response.completed;
+                    const progress = total === 0 ? 100 : Math.ceil((completed / total) * 100);
+                    job.setProgress(progress);
+                },
+                complete: () => {
+                    resolve({
+                        success: true,
+                        indexedItemCount: total,
+                        timeTaken: duration,
+                    });
+                },
+                error: (err: any) => {
+                    Logger.error(JSON.stringify(err));
+                    reject(err);
+                },
+            });
         });
     }
 }

+ 3 - 3
packages/elasticsearch-plugin/src/plugin.ts

@@ -1,4 +1,5 @@
 import { NodeOptions } from '@elastic/elasticsearch';
+import { OnApplicationBootstrap } from '@nestjs/common';
 import {
     AssetEvent,
     CollectionModificationEvent,
@@ -7,7 +8,6 @@ import {
     ID,
     idsAreEqual,
     Logger,
-    OnVendureBootstrap,
     PluginCommonModule,
     ProductChannelEvent,
     ProductEvent,
@@ -215,7 +215,7 @@ import { ElasticsearchOptions, ElasticsearchRuntimeOptions, mergeWithDefaults }
     },
     workers: [ElasticsearchIndexerController],
 })
-export class ElasticsearchPlugin implements OnVendureBootstrap {
+export class ElasticsearchPlugin implements OnApplicationBootstrap {
     private static options: ElasticsearchRuntimeOptions;
 
     /** @internal */
@@ -236,7 +236,7 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
     }
 
     /** @internal */
-    async onVendureBootstrap(): Promise<void> {
+    async onApplicationBootstrap(): Promise<void> {
         const { host, port } = ElasticsearchPlugin.options;
         const nodeName = this.nodeName();
         try {

+ 1 - 1
packages/email-plugin/src/plugin.spec.ts

@@ -60,7 +60,7 @@ describe('EmailPlugin', () => {
 
         const plugin = module.get(EmailPlugin);
         eventBus = module.get(EventBus);
-        await plugin.onVendureBootstrap();
+        await plugin.onApplicationBootstrap();
         return module;
     }
 

+ 13 - 7
packages/email-plugin/src/plugin.ts

@@ -1,3 +1,4 @@
+import { OnApplicationBootstrap } from '@nestjs/common';
 import { ModuleRef } from '@nestjs/core';
 import {
     createProxyHandler,
@@ -172,7 +173,7 @@ import {
     workers: [EmailProcessorController],
     configuration: config => EmailPlugin.configure(config),
 })
-export class EmailPlugin implements OnVendureBootstrap, OnVendureClose {
+export class EmailPlugin implements OnApplicationBootstrap, OnVendureBootstrap, OnVendureClose {
     private static options: EmailPluginOptions | EmailPluginDevModeOptions;
     private devMailbox: DevMailbox | undefined;
     private jobQueue: JobQueue<IntermediateEmailDetails> | undefined;
@@ -207,7 +208,7 @@ export class EmailPlugin implements OnVendureBootstrap, OnVendureClose {
     }
 
     /** @internal */
-    async onVendureBootstrap(): Promise<void> {
+    async onVendureBootstrap() {
         const options = EmailPlugin.options;
 
         if (isDevModeOptions(options) && options.mailboxPort !== undefined) {
@@ -215,9 +216,13 @@ export class EmailPlugin implements OnVendureBootstrap, OnVendureClose {
             this.devMailbox.serve(options);
             this.devMailbox.handleMockEvent((handler, event) => this.handleEvent(handler, event));
         }
+    }
 
-        await this.setupEventSubscribers();
+    /** @internal */
+    async onApplicationBootstrap(): Promise<void> {
+        const options = EmailPlugin.options;
 
+        await this.setupEventSubscribers();
         if (!isDevModeOptions(options) && options.transport.type === 'testing') {
             // When running tests, we don't want to go through the JobQueue system,
             // so we just call the email sending logic directly.
@@ -226,11 +231,12 @@ export class EmailPlugin implements OnVendureBootstrap, OnVendureClose {
         } else {
             this.jobQueue = this.jobQueueService.createQueue({
                 name: 'send-email',
-                concurrency: 5,
                 process: job => {
-                    this.workerService.send(new EmailWorkerMessage(job.data)).subscribe({
-                        complete: () => job.complete(),
-                        error: err => job.fail(err),
+                    return new Promise((resolve, reject) => {
+                        this.workerService.send(new EmailWorkerMessage(job.data)).subscribe({
+                            complete: () => resolve(),
+                            error: err => reject(err),
+                        });
                     });
                 },
             });

+ 4 - 0
packages/pub-sub-plugin/.gitignore

@@ -0,0 +1,4 @@
+preview/output
+yarn-error.log
+lib
+e2e/__data__/*.sqlite

+ 7 - 0
packages/pub-sub-plugin/README.md

@@ -0,0 +1,7 @@
+# Vendure Pub/Sub Plugin
+
+The `PubSubPlugin` uses Google Cloud Pub/Sub to power the Vendure job queue. 
+
+`npm install @vendure/pub-sub-plugin`
+
+For documentation, see [www.vendure.io/docs/typescript-api/pub-sub-plugin/](https://www.vendure.io/docs/typescript-api/pub-sub-plugin/)

+ 2 - 0
packages/pub-sub-plugin/index.ts

@@ -0,0 +1,2 @@
+export * from './src/plugin';
+export * from './src/options';

+ 25 - 0
packages/pub-sub-plugin/jest.config.js

@@ -0,0 +1,25 @@
+module.exports = {
+    coverageDirectory: 'coverage',
+    moduleFileExtensions: [
+        'js',
+        'json',
+        'ts',
+    ],
+    preset: 'ts-jest',
+    rootDir: __dirname,
+    roots: [
+        '<rootDir>/src',
+    ],
+    transform: {
+        '^.+\\.(t|j)s$': 'ts-jest',
+    },
+    globals: {
+        'ts-jest': {
+            tsConfig: {
+                allowJs: true,
+                skipLibCheck: true,
+            },
+        },
+    },
+    testEnvironment: 'node',
+};

+ 29 - 0
packages/pub-sub-plugin/package.json

@@ -0,0 +1,29 @@
+{
+  "name": "@vendure/pub-sub-plugin",
+  "version": "0.18.2",
+  "license": "MIT",
+  "main": "lib/index.js",
+  "types": "lib/index.d.ts",
+  "files": [
+    "lib/**/*"
+  ],
+  "scripts": {
+    "watch": "tsc -p ./tsconfig.build.json --watch",
+    "build": "rimraf lib && tsc -p ./tsconfig.build.json",
+    "lint": "tslint --fix --project ./",
+    "test": "jest --config ./jest.config.js",
+    "ci": "yarn build"
+  },
+  "publishConfig": {
+    "access": "public"
+  },
+  "dependencies": {
+    "@google-cloud/pubsub": "^2.8.0"
+  },
+  "devDependencies": {
+    "@vendure/common": "^0.18.1",
+    "@vendure/core": "^0.18.2",
+    "rimraf": "^3.0.2",
+    "typescript": "4.0.3"
+  }
+}

+ 2 - 0
packages/pub-sub-plugin/src/constants.ts

@@ -0,0 +1,2 @@
+export const PUB_SUB_OPTIONS = Symbol('PUB_SUB_OPTIONS');
+export const loggerCtx = 'PubSubPlugin';

+ 13 - 0
packages/pub-sub-plugin/src/options.ts

@@ -0,0 +1,13 @@
+export interface PubSubOptions {
+    /**
+     * @description
+     * Number of jobs that can be inflight at the same time.
+     */
+    concurrency?: number;
+    /**
+     * @description
+     * This is the mapping of Vendure queue names to PubSub Topics and Subscriptions
+     * For each queue a topic and subscription is required to exist.
+     */
+    queueNamePubSubPair?: Map<string, [string, string]>;
+}

+ 26 - 0
packages/pub-sub-plugin/src/plugin.ts

@@ -0,0 +1,26 @@
+import { PluginCommonModule, Type, VendurePlugin } from '@vendure/core';
+
+import { PubSubOptions } from './options';
+import { PUB_SUB_OPTIONS } from './constants';
+import { PubSubJobQueueStrategy } from './pub-sub-job-queue-strategy';
+import { PubSub } from '@google-cloud/pubsub';
+
+@VendurePlugin({
+    imports: [PluginCommonModule],
+    providers: [
+        { provide: PUB_SUB_OPTIONS, useFactory: () => PubSubPlugin.options },
+        { provide: PubSub, useFactory: () => new PubSub() },
+    ],
+    configuration: config => {
+        config.jobQueueOptions.jobQueueStrategy = new PubSubJobQueueStrategy();
+        return config;
+    },
+})
+export class PubSubPlugin {
+    private static options: PubSubOptions;
+
+    static init(options: PubSubOptions): Type<PubSubPlugin> {
+        this.options = options;
+        return PubSubPlugin;
+    }
+}

+ 70 - 0
packages/pub-sub-plugin/src/pub-sub-job-queue-strategy.spec.ts

@@ -0,0 +1,70 @@
+import { PubSub } from '@google-cloud/pubsub';
+import { ModuleRef } from '@nestjs/core';
+import { Test } from '@nestjs/testing';
+import { Injector, Job } from '@vendure/core';
+
+import { PubSubOptions } from '../lib';
+
+import { PUB_SUB_OPTIONS } from './constants';
+import { PubSubJobQueueStrategy } from './pub-sub-job-queue-strategy';
+
+describe('PubSubJobQueueStrategy', () => {
+    let strategy: PubSubJobQueueStrategy;
+    let pubsub: any;
+    let topic: any;
+
+    beforeEach(async () => {
+        topic = {
+            publish: jest.fn(),
+        };
+        pubsub = {
+            topic: jest.fn(() => {
+                return topic;
+            }),
+        };
+
+        const options = {
+            concurrency: 1,
+            queueNamePubSubPair: new Map([['test-queue', ['test-topic', 'test-subscription']]]),
+        } as PubSubOptions;
+
+        const moduleRef = await Test.createTestingModule({
+            providers: [
+                { provide: PubSub, useValue: pubsub },
+                { provide: PUB_SUB_OPTIONS, useValue: options },
+            ],
+        }).compile();
+
+        strategy = new PubSubJobQueueStrategy();
+        strategy.init(new Injector(moduleRef.get(ModuleRef)));
+    });
+
+    it('cannot publish to not configured queue', async () => {
+        expect.assertions(2);
+        try {
+            await strategy.add(
+                new Job({
+                    queueName: 'some-queue',
+                    data: {},
+                }),
+            );
+        } catch (err) {
+            expect(err).toEqual(new Error('Topic name not set for queue: some-queue'));
+        }
+        expect(pubsub.topic).not.toHaveBeenCalled();
+    });
+
+    it('publishes new jobs to topic', async () => {
+        const data = {
+            some: 'data',
+        };
+        await strategy.add(
+            new Job({
+                queueName: 'test-queue',
+                data,
+            }),
+        );
+        expect(pubsub.topic).toHaveBeenCalledWith('test-topic');
+        expect(topic.publish).toHaveBeenCalledWith(Buffer.from(JSON.stringify(data)));
+    });
+});

+ 146 - 0
packages/pub-sub-plugin/src/pub-sub-job-queue-strategy.ts

@@ -0,0 +1,146 @@
+import { Message, PubSub, Subscription, Topic } from '@google-cloud/pubsub';
+import { JobState } from '@vendure/common/lib/generated-types';
+import {
+    InjectableJobQueueStrategy,
+    Injector,
+    Job,
+    JobData,
+    JobQueueStrategy,
+    Logger,
+    QueueNameProcessStorage,
+} from '@vendure/core';
+
+import { loggerCtx, PUB_SUB_OPTIONS } from './constants';
+import { PubSubOptions } from './options';
+
+export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implements JobQueueStrategy {
+    private concurrency: number;
+    private queueNamePubSubPair: Map<string, [string, string]>;
+    private pubSubClient: PubSub;
+    private topics = new Map<string, Topic>();
+    private subscriptions = new Map<string, Subscription>();
+    private listeners = new QueueNameProcessStorage<(message: Message) => void>();
+
+    init(injector: Injector) {
+        this.pubSubClient = injector.get(PubSub);
+        const options = injector.get<PubSubOptions>(PUB_SUB_OPTIONS);
+        this.concurrency = options.concurrency ?? 20;
+        this.queueNamePubSubPair = options.queueNamePubSubPair ?? new Map();
+
+        super.init(injector);
+    }
+
+    destroy() {
+        super.destroy();
+        for (const subscription of this.subscriptions.values()) {
+            subscription.removeAllListeners('message');
+        }
+        this.subscriptions.clear();
+        this.topics.clear();
+    }
+
+    async add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>> {
+        if (!this.hasInitialized) {
+            throw new Error('Cannot add job before init');
+        }
+
+        const id = await this.topic(job.queueName).publish(Buffer.from(JSON.stringify(job.data)));
+        Logger.debug(`Sent message ${job.queueName}: ${id}`);
+
+        return new Job<Data>({
+            id,
+            queueName: job.queueName,
+            data: job.data,
+            attempts: 0,
+            state: JobState.PENDING,
+            createdAt: new Date(),
+        });
+    }
+
+    start<Data extends JobData<Data> = {}>(queueName: string, process: (job: Job<Data>) => Promise<any>) {
+        if (!this.hasInitialized) {
+            this.started.set(queueName, process);
+            return;
+        }
+
+        if (this.listeners.has(queueName, process)) {
+            return;
+        }
+
+        const subscription = this.subscription(queueName);
+        const listener = (message: Message) => {
+            Logger.debug(`Received message: ${queueName}: ${message.id}`, loggerCtx);
+
+            const job = new Job<Data>({
+                id: message.id,
+                queueName,
+                data: JSON.parse(message.data.toString()),
+                attempts: message.deliveryAttempt,
+                state: JobState.RUNNING,
+                startedAt: new Date(),
+                createdAt: message.publishTime,
+            });
+
+            process(job)
+                .then(() => {
+                    message.ack();
+                })
+                .catch(err => {
+                    message.nack();
+                });
+        };
+        this.listeners.set(queueName, process, listener);
+        subscription.on('message', listener);
+    }
+
+    async stop<Data extends JobData<Data> = {}>(
+        queueName: string,
+        process: (job: Job<Data>) => Promise<any>,
+    ) {
+        const listener = this.listeners.getAndDelete(queueName, process);
+        if (!listener) {
+            return;
+        }
+        this.subscription(queueName).off('message', listener);
+    }
+
+    private topic(queueName: string): Topic {
+        let topic = this.topics.get(queueName);
+        if (topic) {
+            return topic;
+        }
+
+        const pair = this.queueNamePubSubPair.get(queueName);
+        if (!pair) {
+            throw new Error(`Topic name not set for queue: ${queueName}`);
+        }
+
+        const [topicName, subscriptionName] = pair;
+        topic = this.pubSubClient.topic(topicName);
+        this.topics.set(queueName, topic);
+
+        return topic;
+    }
+
+    private subscription(queueName: string): Subscription {
+        let subscription = this.subscriptions.get(queueName);
+        if (subscription) {
+            return subscription;
+        }
+
+        const pair = this.queueNamePubSubPair.get(queueName);
+        if (!pair) {
+            throw new Error(`Subscription name not set for queue: ${queueName}`);
+        }
+
+        const [topicName, subscriptionName] = pair;
+        subscription = this.topic(queueName).subscription(subscriptionName, {
+            flowControl: {
+                maxMessages: this.concurrency,
+            },
+        });
+        this.subscriptions.set(queueName, subscription);
+
+        return subscription;
+    }
+}

+ 9 - 0
packages/pub-sub-plugin/tsconfig.build.json

@@ -0,0 +1,9 @@
+{
+  "extends": "./tsconfig.json",
+  "compilerOptions": {
+    "outDir": "./lib"
+  },
+  "files": [
+    "./index.ts"
+  ]
+}

+ 10 - 0
packages/pub-sub-plugin/tsconfig.json

@@ -0,0 +1,10 @@
+{
+  "extends": "../../tsconfig.json",
+  "compilerOptions": {
+    "declaration": true,
+    "removeComments": false,
+    "noLib": false,
+    "skipLibCheck": true,
+    "sourceMap": true
+  }
+}

+ 1 - 1
packages/testing/src/test-server.ts

@@ -75,10 +75,10 @@ export class TestServer {
     async destroy() {
         // allow a grace period of any outstanding async tasks to complete
         await new Promise(resolve => global.setTimeout(resolve, 500));
-        await this.app.close();
         if (this.worker) {
             await this.worker.close();
         }
+        await this.app.close();
     }
 
     private getCallerFilename(depth: number): string {

+ 5 - 0
scripts/publish-to-verdaccio.sh

@@ -1,5 +1,9 @@
 #!/bin/bash
 
+# Move into the project root
+dir=$(cd -P -- "$(dirname -- "$0")" && pwd -P)
+cd "$dir"
+
 # A shell script which publishes all packages to a local Verdaccio registry for testing / local dev purposes
 
 if [[ -z "${VERDACCIO_URL}" ]]; then
@@ -17,6 +21,7 @@ cd ../core && npm publish -reg $VERDACCIO &&\
 cd ../create && npm publish -reg $VERDACCIO &&\
 cd ../elasticsearch-plugin && npm publish -reg $VERDACCIO &&\
 cd ../email-plugin && npm publish -reg $VERDACCIO &&\
+cd ../pub-sub-plugin && npm publish -reg $VERDACCIO &&\
 cd ../testing && npm publish -reg $VERDACCIO &&\
 cd ../ui-devkit && npm publish -reg $VERDACCIO &&\
 cd ../admin-ui/package && npm publish -reg $VERDACCIO

+ 307 - 4
yarn.lock

@@ -1581,6 +1581,50 @@
     unique-filename "^1.1.1"
     which "^1.3.1"
 
+"@google-cloud/paginator@^3.0.0":
+  version "3.0.5"
+  resolved "https://registry.yarnpkg.com/@google-cloud/paginator/-/paginator-3.0.5.tgz#9d6b96c421a89bd560c1bc2c197c7611ef21db6c"
+  integrity sha512-N4Uk4BT1YuskfRhKXBs0n9Lg2YTROZc6IMpkO/8DIHODtm5s3xY8K5vVBo23v/2XulY3azwITQlYWgT4GdLsUw==
+  dependencies:
+    arrify "^2.0.0"
+    extend "^3.0.2"
+
+"@google-cloud/precise-date@^2.0.0":
+  version "2.0.3"
+  resolved "https://registry.yarnpkg.com/@google-cloud/precise-date/-/precise-date-2.0.3.tgz#14f6f28ce35dabf3882e7aeab1c9d51bd473faed"
+  integrity sha512-+SDJ3ZvGkF7hzo6BGa8ZqeK3F6Z4+S+KviC9oOK+XCs3tfMyJCh/4j93XIWINgMMDIh9BgEvlw4306VxlXIlYA==
+
+"@google-cloud/projectify@^2.0.0":
+  version "2.0.1"
+  resolved "https://registry.yarnpkg.com/@google-cloud/projectify/-/projectify-2.0.1.tgz#13350ee609346435c795bbfe133a08dfeab78d65"
+  integrity sha512-ZDG38U/Yy6Zr21LaR3BTiiLtpJl6RkPS/JwoRT453G+6Q1DhlV0waNf8Lfu+YVYGIIxgKnLayJRfYlFJfiI8iQ==
+
+"@google-cloud/promisify@^2.0.0":
+  version "2.0.3"
+  resolved "https://registry.yarnpkg.com/@google-cloud/promisify/-/promisify-2.0.3.tgz#f934b5cdc939e3c7039ff62b9caaf59a9d89e3a8"
+  integrity sha512-d4VSA86eL/AFTe5xtyZX+ePUjE8dIFu2T8zmdeNBSa5/kNgXPCx/o/wbFNHAGLJdGnk1vddRuMESD9HbOC8irw==
+
+"@google-cloud/pubsub@^2.8.0":
+  version "2.8.0"
+  resolved "https://registry.yarnpkg.com/@google-cloud/pubsub/-/pubsub-2.8.0.tgz#143f6c5f7dda1d22bf1ffd98104caf8c2611fca6"
+  integrity sha512-AoSKAbpHCoLq6jO9vMX+K6hJhkayafan24Rs2RKHU8Y0qF6IGSm1+ly0OG12TgziHWg818/6dljWWKgwDcp8KA==
+  dependencies:
+    "@google-cloud/paginator" "^3.0.0"
+    "@google-cloud/precise-date" "^2.0.0"
+    "@google-cloud/projectify" "^2.0.0"
+    "@google-cloud/promisify" "^2.0.0"
+    "@opentelemetry/api" "^0.12.0"
+    "@opentelemetry/tracing" "^0.12.0"
+    "@types/duplexify" "^3.6.0"
+    "@types/long" "^4.0.0"
+    arrify "^2.0.0"
+    extend "^3.0.2"
+    google-auth-library "^6.1.2"
+    google-gax "^2.9.2"
+    is-stream-ended "^0.1.4"
+    lodash.snakecase "^4.1.1"
+    p-defer "^3.0.0"
+
 "@graphql-codegen/add@2.0.1":
   version "2.0.1"
   resolved "https://registry.npmjs.org/@graphql-codegen/add/-/add-2.0.1.tgz#b2cf2ef0e2c83f49dfa1f6a52fee94dfb47e296f"
@@ -2027,6 +2071,23 @@
   resolved "https://registry.npmjs.org/@graphql-typed-document-node/core/-/core-3.1.0.tgz#0eee6373e11418bfe0b5638f654df7a4ca6a3950"
   integrity sha512-wYn6r8zVZyQJ6rQaALBEln5B1pzxb9shV5Ef97kTvn6yVGrqyXVnDqnU24MXnFubR+rZjBY9NWuxX3FB2sTsjg==
 
+"@grpc/grpc-js@~1.2.0":
+  version "1.2.4"
+  resolved "https://registry.yarnpkg.com/@grpc/grpc-js/-/grpc-js-1.2.4.tgz#04f0bbefb2636296d17e821f3d52152fbe2f6989"
+  integrity sha512-z+EI20HYHLd3/uERtwOqP8Q4EPhGbz5RKUpiyo6xPWfR3pcjpf8sfNvY9XytDQ4xo1wNz7NqH1kh2UBonwzbfg==
+  dependencies:
+    "@types/node" "^12.12.47"
+    google-auth-library "^6.1.1"
+    semver "^6.2.0"
+
+"@grpc/proto-loader@^0.5.1":
+  version "0.5.6"
+  resolved "https://registry.yarnpkg.com/@grpc/proto-loader/-/proto-loader-0.5.6.tgz#1dea4b8a6412b05e2d58514d507137b63a52a98d"
+  integrity sha512-DT14xgw3PSzPxwS13auTEwxhMMOoz33DPUKNtmYK/QYbBSpLXJy78FGGs5yVoxVobEqPm4iW9MOIoz0A3bLTRQ==
+  dependencies:
+    lodash.camelcase "^4.3.0"
+    protobufjs "^6.8.6"
+
 "@istanbuljs/load-nyc-config@^1.0.0":
   version "1.1.0"
   resolved "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz#fd3db1d59ecf7cf121e80650bb86712f9b55eced"
@@ -3211,6 +3272,51 @@
   dependencies:
     "@types/node" ">= 8"
 
+"@opentelemetry/api@^0.12.0":
+  version "0.12.0"
+  resolved "https://registry.yarnpkg.com/@opentelemetry/api/-/api-0.12.0.tgz#0359c3926e8f16fdcd8c78f196bd1e9fc4e66777"
+  integrity sha512-Dn4vU5GlaBrIWzLpsM6xbJwKHdlpwBQ4Bd+cL9ofJP3hKT8jBXpBpribmyaqAzrajzzl2Yt8uTa9rFVLfjDAvw==
+  dependencies:
+    "@opentelemetry/context-base" "^0.12.0"
+
+"@opentelemetry/context-base@^0.12.0":
+  version "0.12.0"
+  resolved "https://registry.yarnpkg.com/@opentelemetry/context-base/-/context-base-0.12.0.tgz#4906ae27359d3311e3dea1b63770a16f60848550"
+  integrity sha512-UXwSsXo3F3yZ1dIBOG9ID8v2r9e+bqLWoizCtTb8rXtwF+N5TM7hzzvQz72o3nBU+zrI/D5e+OqAYK8ZgDd3DA==
+
+"@opentelemetry/core@^0.12.0":
+  version "0.12.0"
+  resolved "https://registry.yarnpkg.com/@opentelemetry/core/-/core-0.12.0.tgz#a888badc9a408fa1f13976a574e69d14be32488e"
+  integrity sha512-oLZIkmTNWTJXzo1eA4dGu/S7wOVtylsgnEsCmhSJGhrJVDXm1eW/aGuNs3DVBeuxp0ZvQLAul3/PThsC3YrnzA==
+  dependencies:
+    "@opentelemetry/api" "^0.12.0"
+    "@opentelemetry/context-base" "^0.12.0"
+    semver "^7.1.3"
+
+"@opentelemetry/resources@^0.12.0":
+  version "0.12.0"
+  resolved "https://registry.yarnpkg.com/@opentelemetry/resources/-/resources-0.12.0.tgz#5eb287c3032a2bebb2bb9f69b44bd160d2a7d591"
+  integrity sha512-8cYvIKB68cyupc7D6SWzkLtt13mbjgxMahL4JKCM6hWPyiGSJlPFEAey4XFXI5LLpPZRYTPHLVoLqI/xwCFZZA==
+  dependencies:
+    "@opentelemetry/api" "^0.12.0"
+    "@opentelemetry/core" "^0.12.0"
+
+"@opentelemetry/semantic-conventions@^0.12.0":
+  version "0.12.0"
+  resolved "https://registry.yarnpkg.com/@opentelemetry/semantic-conventions/-/semantic-conventions-0.12.0.tgz#7e392aecdbdbd5d737d3995998b120dc17589ab0"
+  integrity sha512-BuCcDW0uLNYYTns0/LwXkJ8lp8aDm7kpS+WunEmPAPRSCe6ciOYRvzn5reqJfX93rf+6A3U2SgrBnCTH+0qoQQ==
+
+"@opentelemetry/tracing@^0.12.0":
+  version "0.12.0"
+  resolved "https://registry.yarnpkg.com/@opentelemetry/tracing/-/tracing-0.12.0.tgz#769927721d417bfac85eef50c2af068bedce8873"
+  integrity sha512-2TUGhTGkhgnxTciHCNAILPSeyXageJewRqfP9wOrx65sKd/jgvNYoY8nYf4EVWVMirDOxKDsmYgUkjdQrwb2dg==
+  dependencies:
+    "@opentelemetry/api" "^0.12.0"
+    "@opentelemetry/context-base" "^0.12.0"
+    "@opentelemetry/core" "^0.12.0"
+    "@opentelemetry/resources" "^0.12.0"
+    "@opentelemetry/semantic-conventions" "^0.12.0"
+
 "@phenomnomnominal/tsquery@^4.0.0":
   version "4.1.1"
   resolved "https://registry.npmjs.org/@phenomnomnominal/tsquery/-/tsquery-4.1.1.tgz#42971b83590e9d853d024ddb04a18085a36518df"
@@ -3509,6 +3615,13 @@
   resolved "https://registry.npmjs.org/@types/detect-port/-/detect-port-1.3.0.tgz#3e9cbd049ec29e84a2ff7852dbc629c81245774c"
   integrity sha512-NnDUDk1Ry5cHljTFetg0BNT79FaJSddTh9RsGOS2v/97DwOUJ+hBkgxtQHF6T8IarZD4i+bFEL787Nz+xpodfA==
 
+"@types/duplexify@^3.6.0":
+  version "3.6.0"
+  resolved "https://registry.yarnpkg.com/@types/duplexify/-/duplexify-3.6.0.tgz#dfc82b64bd3a2168f5bd26444af165bf0237dcd8"
+  integrity sha512-5zOA53RUlzN74bvrSGwjudssD9F3a797sDZQkiYpUOxW+WHaXTCPz4/d5Dgi6FKnOqZ2CpaTo0DhgIfsXAOE/A==
+  dependencies:
+    "@types/node" "*"
+
 "@types/estree@*":
   version "0.0.45"
   resolved "https://registry.npmjs.org/@types/estree/-/estree-0.0.45.tgz#e9387572998e5ecdac221950dab3e8c3b16af884"
@@ -3785,7 +3898,7 @@
     "@types/node" "*"
     rxjs "^6.5.1"
 
-"@types/long@^4.0.0":
+"@types/long@^4.0.0", "@types/long@^4.0.1":
   version "4.0.1"
   resolved "https://registry.npmjs.org/@types/long/-/long-4.0.1.tgz#459c65fa1867dafe6a8f322c4c51695663cc55e9"
   integrity sha512-5tXH6Bx/kNGd3MgffdmP4dy2Z+G4eaXw0SE81Tq3BNadtnMR5/ySMzX4SLEzHJzSmPNn4HIdpQsBvXMUykr58w==
@@ -3864,6 +3977,16 @@
   resolved "https://registry.npmjs.org/@types/node/-/node-12.12.62.tgz#733923d73669188d35950253dd18a21570085d2b"
   integrity sha512-qAfo81CsD7yQIM9mVyh6B/U47li5g7cfpVQEDMfQeF8pSZVwzbhwU3crc0qG4DmpsebpJPR49AKOExQyJ05Cpg==
 
+"@types/node@^12.12.47":
+  version "12.19.15"
+  resolved "https://registry.yarnpkg.com/@types/node/-/node-12.19.15.tgz#0de7e978fb43db62da369db18ea088a63673c182"
+  integrity sha512-lowukE3GUI+VSYSu6VcBXl14d61Rp5hA1D+61r16qnwC0lYNSqdxcvRh0pswejorHfS+HgwBasM8jLXz0/aOsw==
+
+"@types/node@^13.7.0":
+  version "13.13.40"
+  resolved "https://registry.yarnpkg.com/@types/node/-/node-13.13.40.tgz#f655ef327362cc83912f2e69336ddc62a24a9f88"
+  integrity sha512-eKaRo87lu1yAXrzEJl0zcJxfUMDT5/mZalFyOkT44rnQps41eS2pfWzbaulSPpQLFNy29bFqn+Y5lOTL8ATlEQ==
+
 "@types/nodemailer@^6.4.0":
   version "6.4.0"
   resolved "https://registry.npmjs.org/@types/nodemailer/-/nodemailer-6.4.0.tgz#d8c039be3ed685c4719a026455555be82c124b74"
@@ -4328,6 +4451,13 @@ abbrev@1:
   resolved "https://registry.npmjs.org/abbrev/-/abbrev-1.1.1.tgz#f8f2c887ad10bf67f634f005b6987fed3179aac8"
   integrity sha512-nne9/IiQ/hzIhY6pdDnbBtz7DjPTKrY00P/zvPSm5pOFkl6xuGrGnXn/VtTNNfNtAfZ9/1RtehkszU9qcTii0Q==
 
+abort-controller@^3.0.0:
+  version "3.0.0"
+  resolved "https://registry.yarnpkg.com/abort-controller/-/abort-controller-3.0.0.tgz#eaf54d53b62bae4138e809ca225c8439a6efb392"
+  integrity sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg==
+  dependencies:
+    event-target-shim "^5.0.0"
+
 accepts@^1.3.5, accepts@~1.3.4, accepts@~1.3.5, accepts@~1.3.7:
   version "1.3.7"
   resolved "https://registry.npmjs.org/accepts/-/accepts-1.3.7.tgz#531bc726517a3b2b41f850021c6cc15eaab507cd"
@@ -5066,7 +5196,7 @@ arrify@^1.0.0, arrify@^1.0.1:
   resolved "https://registry.npmjs.org/arrify/-/arrify-1.0.1.tgz#898508da2226f380df904728456849c1501a4b0d"
   integrity sha1-iYUI2iIm84DfkEcoRWhJwVAaSw0=
 
-arrify@^2.0.1:
+arrify@^2.0.0, arrify@^2.0.1:
   version "2.0.1"
   resolved "https://registry.npmjs.org/arrify/-/arrify-2.0.1.tgz#c9655e9331e0abcd588d2a7cad7e9956f66701fa"
   integrity sha512-3duEwti880xqi4eAMN8AyR4a0ByT90zoYdLlevfrvU43vb0YZwZVfxOgxWrLXXXpyugL0hNZc9G6BiB5B3nUug==
@@ -5402,6 +5532,11 @@ base64-js@^1.0.2:
   resolved "https://registry.npmjs.org/base64-js/-/base64-js-1.3.1.tgz#58ece8cb75dd07e71ed08c736abc5fac4dbf8df1"
   integrity sha512-mLQ4i2QO1ytvGWFWmcngKO//JXAQueZvwEKtjgQFM4jIK0kU+ytMfplL8j+n5mspOfjHwoAg+9yhb7BwAHm36g==
 
+base64-js@^1.3.0:
+  version "1.5.1"
+  resolved "https://registry.yarnpkg.com/base64-js/-/base64-js-1.5.1.tgz#1b1b440160a5bf7ad40b650f095963481903930a"
+  integrity sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==
+
 base64id@2.0.0:
   version "2.0.0"
   resolved "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz#2770ac6bc47d312af97a8bf9a634342e0cd25cb6"
@@ -5471,6 +5606,11 @@ bignumber.js@9.0.0:
   resolved "https://registry.npmjs.org/bignumber.js/-/bignumber.js-9.0.0.tgz#805880f84a329b5eac6e7cb6f8274b6d82bdf075"
   integrity sha512-t/OYhhJ2SD+YGBQcjY8GzzDHEk9f3nerxjtfa6tlMXfe7frs/WozhvCNoGvpM0P3bNf3Gq5ZRMlGr5f3r4/N8A==
 
+bignumber.js@^9.0.0:
+  version "9.0.1"
+  resolved "https://registry.yarnpkg.com/bignumber.js/-/bignumber.js-9.0.1.tgz#8d7ba124c882bfd8e43260c67475518d0689e4e5"
+  integrity sha512-IdZR9mh6ahOBv/hYGiXyVuyCetmGJhtYkqLBpTStdhEGjegpPlUawydyaF3pbIOFynJTpllEs+NP+CS9jKFLjA==
+
 binary-extensions@^1.0.0:
   version "1.13.1"
   resolved "https://registry.npmjs.org/binary-extensions/-/binary-extensions-1.13.1.tgz#598afe54755b2868a5330d2aff9d4ebb53209b65"
@@ -7889,6 +8029,16 @@ duplexify@^3.4.2, duplexify@^3.6.0:
     readable-stream "^2.0.0"
     stream-shift "^1.0.0"
 
+duplexify@^4.0.0:
+  version "4.1.1"
+  resolved "https://registry.yarnpkg.com/duplexify/-/duplexify-4.1.1.tgz#7027dc374f157b122a8ae08c2d3ea4d2d953aa61"
+  integrity sha512-DY3xVEmVHTv1wSzKNbwoU6nVjzI369Y6sPoqfYr0/xlx3IdX2n94xIszTcjPO8W8ZIv0Wb0PXNcjuZyT4wiICA==
+  dependencies:
+    end-of-stream "^1.4.1"
+    inherits "^2.0.3"
+    readable-stream "^3.1.1"
+    stream-shift "^1.0.0"
+
 each-props@^1.3.0:
   version "1.3.2"
   resolved "https://registry.npmjs.org/each-props/-/each-props-1.3.2.tgz#ea45a414d16dd5cfa419b1a81720d5ca06892333"
@@ -7905,7 +8055,7 @@ ecc-jsbn@~0.1.1:
     jsbn "~0.1.0"
     safer-buffer "^2.1.0"
 
-ecdsa-sig-formatter@1.0.11:
+ecdsa-sig-formatter@1.0.11, ecdsa-sig-formatter@^1.0.11:
   version "1.0.11"
   resolved "https://registry.npmjs.org/ecdsa-sig-formatter/-/ecdsa-sig-formatter-1.0.11.tgz#ae0f0fa2d85045ef14a817daa3ce9acd0489e5bf"
   integrity sha512-nagl3RYrbNv6kQkeJIpt6NJZy8twLB/2vtz6yN9Z4vRKHN4/QZJIEbqohALSgwKdnksuY3k5Addp5lg8sVoVcQ==
@@ -8293,6 +8443,11 @@ event-emitter@^0.3.5:
     d "1"
     es5-ext "~0.10.14"
 
+event-target-shim@^5.0.0:
+  version "5.0.1"
+  resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-5.0.1.tgz#5d4d3ebdf9583d63a5333ce2deb7480ab2b05789"
+  integrity sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==
+
 eventemitter3@^3.1.0:
   version "3.1.2"
   resolved "https://registry.npmjs.org/eventemitter3/-/eventemitter3-3.1.2.tgz#2d3d48f9c346698fce83a85d7d664e98535df6e7"
@@ -8461,7 +8616,7 @@ extend-shallow@^3.0.0, extend-shallow@^3.0.2:
     assign-symbols "^1.0.0"
     is-extendable "^1.0.1"
 
-extend@^3.0.0, extend@~3.0.2:
+extend@^3.0.0, extend@^3.0.2, extend@~3.0.2:
   version "3.0.2"
   resolved "https://registry.npmjs.org/extend/-/extend-3.0.2.tgz#f8b1136b4071fbd8eb140aff858b1019ec2915fa"
   integrity sha512-fjquC59cD7CyW6urNXK0FBufkZcoiGG80wTuPujX590cB5Ttln20E2UB4S/WARVqhXffZl2LNgS+gQdPIIim/g==
@@ -8583,6 +8738,11 @@ fast-safe-stringify@2.0.7:
   resolved "https://registry.npmjs.org/fast-safe-stringify/-/fast-safe-stringify-2.0.7.tgz#124aa885899261f68aedb42a7c080de9da608743"
   integrity sha512-Utm6CdzT+6xsDk2m8S6uL8VHxNwI6Jub+e9NYTcAms28T84pTa25GJQV9j0CY0N1rM8hK4x6grpF2BQf+2qwVA==
 
+fast-text-encoding@^1.0.0, fast-text-encoding@^1.0.3:
+  version "1.0.3"
+  resolved "https://registry.yarnpkg.com/fast-text-encoding/-/fast-text-encoding-1.0.3.tgz#ec02ac8e01ab8a319af182dae2681213cfe9ce53"
+  integrity sha512-dtm4QZH9nZtcDt8qJiOH9fcQd1NAgi+K1O2DbE6GG1PPCK/BWfOH3idCTRQ4ImXRUOyopDEgDEnVEE7Y/2Wrig==
+
 fastparse@^1.1.2:
   version "1.1.2"
   resolved "https://registry.npmjs.org/fastparse/-/fastparse-1.1.2.tgz#91728c5a5942eced8531283c79441ee4122c35a9"
@@ -9038,6 +9198,25 @@ gauge@~2.7.3:
     strip-ansi "^3.0.1"
     wide-align "^1.1.0"
 
+gaxios@^4.0.0:
+  version "4.1.0"
+  resolved "https://registry.yarnpkg.com/gaxios/-/gaxios-4.1.0.tgz#e8ad466db5a4383c70b9d63bfd14dfaa87eb0099"
+  integrity sha512-vb0to8xzGnA2qcgywAjtshOKKVDf2eQhJoiL6fHhgW5tVN7wNk7egnYIO9zotfn3lQ3De1VPdf7V5/BWfCtCmg==
+  dependencies:
+    abort-controller "^3.0.0"
+    extend "^3.0.2"
+    https-proxy-agent "^5.0.0"
+    is-stream "^2.0.0"
+    node-fetch "^2.3.0"
+
+gcp-metadata@^4.2.0:
+  version "4.2.1"
+  resolved "https://registry.yarnpkg.com/gcp-metadata/-/gcp-metadata-4.2.1.tgz#31849fbcf9025ef34c2297c32a89a1e7e9f2cd62"
+  integrity sha512-tSk+REe5iq/N+K+SK1XjZJUrFPuDqGZVzCy2vocIHIGmPlTGsa8owXMJwGkrXr73NO0AzhPW4MF2DEHz7P2AVw==
+  dependencies:
+    gaxios "^4.0.0"
+    json-bigint "^1.0.0"
+
 genfun@^5.0.0:
   version "5.0.0"
   resolved "https://registry.npmjs.org/genfun/-/genfun-5.0.0.tgz#9dd9710a06900a5c4a5bf57aca5da4e52fe76537"
@@ -9352,6 +9531,45 @@ glogg@^1.0.0:
   dependencies:
     sparkles "^1.0.0"
 
+google-auth-library@^6.1.1, google-auth-library@^6.1.2, google-auth-library@^6.1.3:
+  version "6.1.4"
+  resolved "https://registry.yarnpkg.com/google-auth-library/-/google-auth-library-6.1.4.tgz#bc70c4f3b6681ae5273343466bcef37577b7ee44"
+  integrity sha512-q0kYtGWnDd9XquwiQGAZeI2Jnglk7NDi0cChE4tWp6Kpo/kbqnt9scJb0HP+/xqt03Beqw/xQah1OPrci+pOxw==
+  dependencies:
+    arrify "^2.0.0"
+    base64-js "^1.3.0"
+    ecdsa-sig-formatter "^1.0.11"
+    fast-text-encoding "^1.0.0"
+    gaxios "^4.0.0"
+    gcp-metadata "^4.2.0"
+    gtoken "^5.0.4"
+    jws "^4.0.0"
+    lru-cache "^6.0.0"
+
+google-gax@^2.9.2:
+  version "2.10.0"
+  resolved "https://registry.yarnpkg.com/google-gax/-/google-gax-2.10.0.tgz#7918e5194fe3c5f4330cccb3a3b66e368ac70d8f"
+  integrity sha512-K+1JK5ofNl5k30LsI8UQb/DeLMEbhL/SWirCx0L9pnMcApSfAjRAO7yajXT5X1vicxDBnNSwKs+cu4elxpYraw==
+  dependencies:
+    "@grpc/grpc-js" "~1.2.0"
+    "@grpc/proto-loader" "^0.5.1"
+    "@types/long" "^4.0.0"
+    abort-controller "^3.0.0"
+    duplexify "^4.0.0"
+    fast-text-encoding "^1.0.3"
+    google-auth-library "^6.1.3"
+    is-stream-ended "^0.1.4"
+    node-fetch "^2.6.1"
+    protobufjs "^6.10.2"
+    retry-request "^4.0.0"
+
+google-p12-pem@^3.0.3:
+  version "3.0.3"
+  resolved "https://registry.yarnpkg.com/google-p12-pem/-/google-p12-pem-3.0.3.tgz#673ac3a75d3903a87f05878f3c75e06fc151669e"
+  integrity sha512-wS0ek4ZtFx/ACKYF3JhyGe5kzH7pgiQ7J5otlumqR9psmWMYc+U9cErKlCYVYHoUaidXHdZ2xbo34kB+S+24hA==
+  dependencies:
+    node-forge "^0.10.0"
+
 graceful-fs@^4.0.0, graceful-fs@^4.1.11, graceful-fs@^4.1.15, graceful-fs@^4.1.2, graceful-fs@^4.1.6, graceful-fs@^4.2.0, graceful-fs@^4.2.2, graceful-fs@^4.2.4:
   version "4.2.4"
   resolved "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.4.tgz#2256bde14d3632958c465ebc96dc467ca07a29fb"
@@ -9451,6 +9669,16 @@ growly@^1.3.0:
   resolved "https://registry.npmjs.org/growly/-/growly-1.3.0.tgz#f10748cbe76af964b7c96c93c6bcc28af120c081"
   integrity sha1-8QdIy+dq+WS3yWyTxrzCivEgwIE=
 
+gtoken@^5.0.4:
+  version "5.2.0"
+  resolved "https://registry.yarnpkg.com/gtoken/-/gtoken-5.2.0.tgz#7f1e029f9472bb8899d6911c03c66f7ad985c849"
+  integrity sha512-qbf6JWEYFMj3WMAluvYXl8GAiji6w8d9OmAGCbBg0xF4xD/yu6ZaO6BhoXNddRjKcOUpZD81iea1H5B45gAo1g==
+  dependencies:
+    gaxios "^4.0.0"
+    google-p12-pem "^3.0.3"
+    jws "^4.0.0"
+    mime "^2.2.0"
+
 gulp-cli@^2.2.0:
   version "2.3.0"
   resolved "https://registry.npmjs.org/gulp-cli/-/gulp-cli-2.3.0.tgz#ec0d380e29e52aa45e47977f0d32e18fd161122f"
@@ -10614,6 +10842,11 @@ is-ssh@^1.3.0:
   dependencies:
     protocols "^1.1.0"
 
+is-stream-ended@^0.1.4:
+  version "0.1.4"
+  resolved "https://registry.yarnpkg.com/is-stream-ended/-/is-stream-ended-0.1.4.tgz#f50224e95e06bce0e356d440a4827cd35b267eda"
+  integrity sha512-xj0XPvmr7bQFTvirqnFr50o0hQIh6ZItDqloxt5aJrR4NQsYeSsyFQERYGCAzfindAcnKjINnwEEgLx4IqVzQw==
+
 is-stream@^1.0.1, is-stream@^1.1.0:
   version "1.1.0"
   resolved "https://registry.npmjs.org/is-stream/-/is-stream-1.1.0.tgz#12d4a3dd4e68e0b79ceb8dbc84173ae80d91ca44"
@@ -11343,6 +11576,13 @@ jsesc@~0.5.0:
   resolved "https://registry.npmjs.org/jsesc/-/jsesc-0.5.0.tgz#e7dee66e35d6fc16f710fe91d5cf69f70f08911d"
   integrity sha1-597mbjXW/Bb3EP6R1c9p9w8IkR0=
 
+json-bigint@^1.0.0:
+  version "1.0.0"
+  resolved "https://registry.yarnpkg.com/json-bigint/-/json-bigint-1.0.0.tgz#ae547823ac0cad8398667f8cd9ef4730f5b01ff1"
+  integrity sha512-SiPv/8VpZuWbvLSMtTDU8hEfrZWg/mH/nV/b4o0CYbSxu1UIQPLdwKOCIyLQX+VIPO5vrLX3i8qtqFyhdPSUSQ==
+  dependencies:
+    bignumber.js "^9.0.0"
+
 json-parse-better-errors@^1.0.0, json-parse-better-errors@^1.0.1, json-parse-better-errors@^1.0.2:
   version "1.0.2"
   resolved "https://registry.npmjs.org/json-parse-better-errors/-/json-parse-better-errors-1.0.2.tgz#bb867cfb3450e69107c131d1c514bab3dc8bcaa9"
@@ -11511,6 +11751,15 @@ jwa@^1.4.1:
     ecdsa-sig-formatter "1.0.11"
     safe-buffer "^5.0.1"
 
+jwa@^2.0.0:
+  version "2.0.0"
+  resolved "https://registry.yarnpkg.com/jwa/-/jwa-2.0.0.tgz#a7e9c3f29dae94027ebcaf49975c9345593410fc"
+  integrity sha512-jrZ2Qx916EA+fq9cEAeCROWPTfCwi1IVHqT2tapuqLEVVDKFDENFw1oL+MwrTvH6msKxsd1YTDVw6uKEcsrLEA==
+  dependencies:
+    buffer-equal-constant-time "1.0.1"
+    ecdsa-sig-formatter "1.0.11"
+    safe-buffer "^5.0.1"
+
 jws@^3.2.2:
   version "3.2.2"
   resolved "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz#001099f3639468c9414000e99995fa52fb478304"
@@ -11519,6 +11768,14 @@ jws@^3.2.2:
     jwa "^1.4.1"
     safe-buffer "^5.0.1"
 
+jws@^4.0.0:
+  version "4.0.0"
+  resolved "https://registry.yarnpkg.com/jws/-/jws-4.0.0.tgz#2d4e8cf6a318ffaa12615e9dec7e86e6c97310f4"
+  integrity sha512-KDncfTmOZoOMTFG4mBlG0qUIOlc03fmzH+ru6RgYVZhPkyiy/92Owlt/8UEN+a4TXR1FQetfIpJE8ApdvdVxTg==
+  dependencies:
+    jwa "^2.0.0"
+    safe-buffer "^5.0.1"
+
 karma-chrome-launcher@~3.1.0:
   version "3.1.0"
   resolved "https://registry.npmjs.org/karma-chrome-launcher/-/karma-chrome-launcher-3.1.0.tgz#805a586799a4d05f4e54f72a204979f3f3066738"
@@ -11975,6 +12232,11 @@ lodash._reinterpolate@^3.0.0:
   resolved "https://registry.npmjs.org/lodash._reinterpolate/-/lodash._reinterpolate-3.0.0.tgz#0ccf2d89166af03b3663c796538b75ac6e114d9d"
   integrity sha1-DM8tiRZq8Ds2Y8eWU4t1rG4RTZ0=
 
+lodash.camelcase@^4.3.0:
+  version "4.3.0"
+  resolved "https://registry.yarnpkg.com/lodash.camelcase/-/lodash.camelcase-4.3.0.tgz#b28aa6288a2b9fc651035c7711f65ab6190331a6"
+  integrity sha1-soqmKIorn8ZRA1x3EfZathkDMaY=
+
 lodash.clonedeep@^4.5.0:
   version "4.5.0"
   resolved "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz#e23f3f9c4f8fbdde872529c1071857a086e5ccef"
@@ -12035,6 +12297,11 @@ lodash.set@^4.3.2:
   resolved "https://registry.npmjs.org/lodash.set/-/lodash.set-4.3.2.tgz#d8757b1da807dde24816b0d6a84bea1a76230b23"
   integrity sha1-2HV7HagH3eJIFrDWqEvqGnYjCyM=
 
+lodash.snakecase@^4.1.1:
+  version "4.1.1"
+  resolved "https://registry.yarnpkg.com/lodash.snakecase/-/lodash.snakecase-4.1.1.tgz#39d714a35357147837aefd64b5dcbb16becd8f8d"
+  integrity sha1-OdcUo1NXFHg3rv1ktdy7Fr7Nj40=
+
 lodash.sortby@^4.7.0:
   version "4.7.0"
   resolved "https://registry.npmjs.org/lodash.sortby/-/lodash.sortby-4.7.0.tgz#edd14c824e2cc9c1e0b0a1b42bb5210516a42438"
@@ -12542,6 +12809,11 @@ mime@^2.0.3, mime@^2.3.1, mime@^2.4.4, mime@^2.4.5, mime@^2.4.6:
   resolved "https://registry.npmjs.org/mime/-/mime-2.4.6.tgz#e5b407c90db442f2beb5b162373d07b69affa4d1"
   integrity sha512-RZKhC3EmpBchfTGBVb8fb+RL2cWyw/32lshnsETttkBAyAUXSGHxbEJWWRXc751DrIxG1q04b8QwMbAwkRPpUA==
 
+mime@^2.2.0:
+  version "2.5.0"
+  resolved "https://registry.yarnpkg.com/mime/-/mime-2.5.0.tgz#2b4af934401779806ee98026bb42e8c1ae1876b1"
+  integrity sha512-ft3WayFSFUVBuJj7BMLKAQcSlItKtfjsKDDsii3rqFDAZ7t11zRe8ASw/GlmivGwVUYtwkQrxiGGpL6gFvB0ag==
+
 mimic-fn@^1.0.0:
   version "1.2.0"
   resolved "https://registry.npmjs.org/mimic-fn/-/mimic-fn-1.2.0.tgz#820c86a39334640e99516928bd03fca88057d022"
@@ -13987,6 +14259,11 @@ osenv@0, osenv@^0.1.4, osenv@^0.1.5:
     os-homedir "^1.0.0"
     os-tmpdir "^1.0.0"
 
+p-defer@^3.0.0:
+  version "3.0.0"
+  resolved "https://registry.yarnpkg.com/p-defer/-/p-defer-3.0.0.tgz#d1dceb4ee9b2b604b1d94ffec83760175d4e6f83"
+  integrity sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw==
+
 p-each-series@^2.1.0:
   version "2.1.0"
   resolved "https://registry.npmjs.org/p-each-series/-/p-each-series-2.1.0.tgz#961c8dd3f195ea96c747e636b262b800a6b1af48"
@@ -15234,6 +15511,25 @@ proto-list@~1.2.1:
   resolved "https://registry.npmjs.org/proto-list/-/proto-list-1.2.4.tgz#212d5bfe1318306a420f6402b8e26ff39647a849"
   integrity sha1-IS1b/hMYMGpCD2QCuOJv85ZHqEk=
 
+protobufjs@^6.10.2, protobufjs@^6.8.6:
+  version "6.10.2"
+  resolved "https://registry.yarnpkg.com/protobufjs/-/protobufjs-6.10.2.tgz#b9cb6bd8ec8f87514592ba3fdfd28e93f33a469b"
+  integrity sha512-27yj+04uF6ya9l+qfpH187aqEzfCF4+Uit0I9ZBQVqK09hk/SQzKa2MUqUpXaVa7LOFRg1TSSr3lVxGOk6c0SQ==
+  dependencies:
+    "@protobufjs/aspromise" "^1.1.2"
+    "@protobufjs/base64" "^1.1.2"
+    "@protobufjs/codegen" "^2.0.4"
+    "@protobufjs/eventemitter" "^1.1.0"
+    "@protobufjs/fetch" "^1.1.0"
+    "@protobufjs/float" "^1.0.2"
+    "@protobufjs/inquire" "^1.1.0"
+    "@protobufjs/path" "^1.1.2"
+    "@protobufjs/pool" "^1.1.0"
+    "@protobufjs/utf8" "^1.1.0"
+    "@types/long" "^4.0.1"
+    "@types/node" "^13.7.0"
+    long "^4.0.0"
+
 protocols@^1.1.0, protocols@^1.4.0:
   version "1.4.8"
   resolved "https://registry.npmjs.org/protocols/-/protocols-1.4.8.tgz#48eea2d8f58d9644a4a32caae5d5db290a075ce8"
@@ -16059,6 +16355,13 @@ ret@~0.1.10:
   resolved "https://registry.npmjs.org/ret/-/ret-0.1.15.tgz#b8a4825d5bdb1fc3f6f53c2bc33f81388681c7bc"
   integrity sha512-TTlYpa+OL+vMMNG24xSlQGEJ3B/RzEfUlLct7b5G/ytav+wPrplCpVMFuwzXbkecJrb6IYo1iFb0S9v37754mg==
 
+retry-request@^4.0.0:
+  version "4.1.3"
+  resolved "https://registry.yarnpkg.com/retry-request/-/retry-request-4.1.3.tgz#d5f74daf261372cff58d08b0a1979b4d7cab0fde"
+  integrity sha512-QnRZUpuPNgX0+D1xVxul6DbJ9slvo4Rm6iV/dn63e048MvGbUZiKySVt6Tenp04JqmchxjiLltGerOJys7kJYQ==
+  dependencies:
+    debug "^4.1.1"
+
 retry@0.12.0, retry@^0.12.0:
   version "0.12.0"
   resolved "https://registry.npmjs.org/retry/-/retry-0.12.0.tgz#1b42a6266a21f07421d1b0b54b7dc167b01c013b"