Browse Source

feat(core): Create WorkerService for simpler communication to worker

Abstracts away the internals of NestJS microservice communication into a type-safe interface.
Michael Bromley 6 years ago
parent
commit
16ab03d9bf

+ 1 - 1
packages/core/src/index.ts

@@ -7,7 +7,7 @@ export * from './plugin/index';
 export * from './entity/index';
 export * from './data-import/index';
 export * from './service/index';
-export * from './worker/constants';
+export * from './worker/index';
 export * from '@vendure/common/lib/shared-types';
 export {
     Permission,

+ 0 - 6
packages/core/src/plugin/default-search-plugin/constants.ts

@@ -1,6 +0,0 @@
-export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
-export enum Message {
-    Reindex = 'Reindex',
-    UpdateVariantsById = 'UpdateVariantsById',
-    UpdateProductOrVariant = 'UpdateProductOrVariant',
-}

+ 11 - 25
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -17,8 +17,8 @@ import { translateDeep } from '../../../service/helpers/utils/translate-entity';
 import { ProductVariantService } from '../../../service/services/product-variant.service';
 import { TaxRateService } from '../../../service/services/tax-rate.service';
 import { AsyncQueue } from '../async-queue';
-import { Message, workerLoggerCtx } from '../constants';
 import { SearchIndexItem } from '../search-index-item.entity';
+import { ReindexMessage, UpdateProductOrVariantMessage, UpdateVariantsByIdMessage } from '../types';
 
 export const BATCH_SIZE = 1000;
 export const variantRelations = [
@@ -33,11 +33,7 @@ export const variantRelations = [
     'taxCategory',
 ];
 
-export interface ReindexMessageResponse {
-    total: number;
-    completed: number;
-    duration: number;
-}
+export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
 
 @Controller()
 export class IndexerController {
@@ -49,8 +45,8 @@ export class IndexerController {
         private taxRateService: TaxRateService,
     ) {}
 
-    @MessagePattern(Message.Reindex)
-    reindex({ ctx: rawContext }: { ctx: any }): Observable<ReindexMessageResponse> {
+    @MessagePattern(ReindexMessage.pattern)
+    reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
         return new Observable(observer => {
             (async () => {
@@ -95,14 +91,11 @@ export class IndexerController {
         });
     }
 
-    @MessagePattern(Message.UpdateVariantsById)
+    @MessagePattern(UpdateVariantsByIdMessage.pattern)
     updateVariantsById({
         ctx: rawContext,
         ids,
-    }: {
-        ctx: any;
-        ids: ID[];
-    }): Observable<ReindexMessageResponse> {
+    }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
 
         return new Observable(observer => {
@@ -145,21 +138,14 @@ export class IndexerController {
     /**
      * Updates the search index only for the affected entities.
      */
-    @MessagePattern(Message.UpdateProductOrVariant)
-    updateProductOrVariant({
-        ctx: rawContext,
-        productId,
-        variantId,
-    }: {
-        ctx: any;
-        productId?: ID;
-        variantId?: ID;
-    }): Observable<boolean> {
-        const ctx = RequestContext.fromObject(rawContext);
+    @MessagePattern(UpdateProductOrVariantMessage.pattern)
+    updateProductOrVariant(data: UpdateProductOrVariantMessage['data']): Observable<boolean> {
+        const ctx = RequestContext.fromObject(data.ctx);
+        const { productId, variantId } = data;
         let updatedVariants: ProductVariant[] = [];
         let removedVariantIds: ID[] = [];
         return defer(async () => {
-            if (productId) {
+            if (data.productId) {
                 const product = await this.connection.getRepository(Product).findOne(productId, {
                     relations: ['variants'],
                 });

+ 60 - 91
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -1,5 +1,4 @@
-import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
-import { ClientProxy } from '@nestjs/microservices';
+import { Injectable } from '@nestjs/common';
 import { ID } from '@vendure/common/lib/shared-types';
 
 import { RequestContext } from '../../../api/common/request-context';
@@ -7,59 +6,29 @@ import { Logger } from '../../../config/logger/vendure-logger';
 import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
 import { Product } from '../../../entity/product/product.entity';
 import { Job } from '../../../service/helpers/job-manager/job';
-import { JobService } from '../../../service/services/job.service';
-import { VENDURE_WORKER_CLIENT } from '../../../worker/constants';
-import { Message } from '../constants';
-
-import { ReindexMessageResponse } from './indexer.controller';
+import { JobReporter, JobService } from '../../../service/services/job.service';
+import { WorkerService } from '../../../worker/worker.service';
+import {
+    ReindexMessage,
+    ReindexMessageResponse,
+    UpdateProductOrVariantMessage,
+    UpdateVariantsByIdMessage,
+} from '../types';
 
 /**
  * This service is responsible for messaging the {@link IndexerController} with search index updates.
  */
 @Injectable()
-export class SearchIndexService implements OnModuleDestroy {
-
-    constructor(@Inject(VENDURE_WORKER_CLIENT) private readonly client: ClientProxy,
-                private jobService: JobService) {}
+export class SearchIndexService {
+    constructor(private workerService: WorkerService, private jobService: JobService) {}
 
     reindex(ctx: RequestContext): Job {
         return this.jobService.createJob({
             name: 'reindex',
             singleInstance: true,
             work: async reporter => {
-                return new Promise((resolve, reject) => {
-                    Logger.verbose(`sending reindex message`);
-                    let total: number | undefined;
-                    let duration = 0;
-                    let completed = 0;
-                    this.client.send<ReindexMessageResponse>(Message.Reindex, { ctx })
-                        .subscribe({
-                            next: response => {
-                                if (!total) {
-                                    total = response.total;
-                                }
-                                duration = response.duration;
-                                completed = response.completed;
-                                const progress = Math.ceil((completed / total) * 100);
-                                reporter.setProgress(progress);
-                            },
-                            complete: () => {
-                                resolve({
-                                    success: true,
-                                    indexedItemCount: total,
-                                    timeTaken: duration,
-                                });
-                            },
-                            error: (err) => {
-                                Logger.error(JSON.stringify(err));
-                                resolve({
-                                    success: false,
-                                    indexedItemCount: 0,
-                                    timeTaken: 0,
-                                });
-                            },
-                        });
-                });
+                Logger.verbose(`sending reindex message`);
+                this.workerService.send(new ReindexMessage({ ctx })).subscribe(this.createObserver(reporter));
             },
         });
     }
@@ -70,63 +39,63 @@ export class SearchIndexService implements OnModuleDestroy {
     updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
         return this.jobService.createJob({
             name: 'update-index',
-            work: async () => {
-                if (updatedEntity instanceof Product) {
-                    return this.client.send(Message.UpdateProductOrVariant, { ctx, productId: updatedEntity.id })
-                        .toPromise()
-                        .catch(err => Logger.error(err));
-                } else {
-                    return this.client.send(Message.UpdateProductOrVariant, { ctx, variantId: updatedEntity.id })
-                        .toPromise()
-                        .catch(err => Logger.error(err));
-                }
+            work: reporter => {
+                const data =
+                    updatedEntity instanceof Product
+                        ? { ctx, productId: updatedEntity.id }
+                        : { ctx, variantId: updatedEntity.id };
+                this.workerService.send(new UpdateProductOrVariantMessage(data)).subscribe({
+                    complete: () => reporter.complete(true),
+                    error: err => {
+                        Logger.error(err);
+                        reporter.complete(false);
+                    },
+                });
             },
         });
-
     }
 
     updateVariantsById(ctx: RequestContext, ids: ID[]) {
         return this.jobService.createJob({
             name: 'update-index',
-            work: async reporter => {
-                return new Promise((resolve, reject) => {
-                    Logger.verbose(`sending reindex message`);
-                    let total: number | undefined;
-                    let duration = 0;
-                    let completed = 0;
-                    this.client.send<ReindexMessageResponse>(Message.UpdateVariantsById, { ctx, ids })
-                        .subscribe({
-                            next: response => {
-                                if (!total) {
-                                    total = response.total;
-                                }
-                                duration = response.duration;
-                                completed = response.completed;
-                                const progress = Math.ceil((completed / total) * 100);
-                                reporter.setProgress(progress);
-                            },
-                            complete: () => {
-                                resolve({
-                                    success: true,
-                                    indexedItemCount: total,
-                                    timeTaken: duration,
-                                });
-                            },
-                            error: (err) => {
-                                Logger.error(JSON.stringify(err));
-                                resolve({
-                                    success: false,
-                                    indexedItemCount: 0,
-                                    timeTaken: 0,
-                                });
-                            },
-                        });
-                });
+            work: reporter => {
+                Logger.verbose(`sending reindex message`);
+                this.workerService
+                    .send(new UpdateVariantsByIdMessage({ ctx, ids }))
+                    .subscribe(this.createObserver(reporter));
             },
         });
     }
 
-    onModuleDestroy(): any {
-        this.client.close();
+    private createObserver(reporter: JobReporter) {
+        let total: number | undefined;
+        let duration = 0;
+        let completed = 0;
+        return {
+            next: (response: ReindexMessageResponse) => {
+                if (!total) {
+                    total = response.total;
+                }
+                duration = response.duration;
+                completed = response.completed;
+                const progress = Math.ceil((completed / total) * 100);
+                reporter.setProgress(progress);
+            },
+            complete: () => {
+                reporter.complete({
+                    success: true,
+                    indexedItemCount: total,
+                    timeTaken: duration,
+                });
+            },
+            error: (err: any) => {
+                Logger.error(JSON.stringify(err));
+                reporter.complete({
+                    success: false,
+                    indexedItemCount: 0,
+                    timeTaken: 0,
+                });
+            },
+        };
     }
 }

+ 34 - 0
packages/core/src/plugin/default-search-plugin/types.ts

@@ -0,0 +1,34 @@
+import { ID } from '@vendure/common/lib/shared-types';
+
+import { RequestContext } from '../../api/common/request-context';
+import { WorkerMessage } from '../../worker/types';
+
+export interface ReindexMessageResponse {
+    total: number;
+    completed: number;
+    duration: number;
+}
+
+export type UpdateProductOrVariantMessageData = {
+    ctx: RequestContext;
+    productId?: ID;
+    variantId?: ID;
+};
+
+export interface UpdateVariantsByIdMessageData {
+    ctx: RequestContext;
+    ids: ID[];
+}
+
+export class ReindexMessage extends WorkerMessage<{ ctx: RequestContext }, ReindexMessageResponse> {
+    static readonly pattern = 'Reindex';
+}
+export class UpdateProductOrVariantMessage extends WorkerMessage<UpdateProductOrVariantMessageData, boolean> {
+    static readonly pattern = 'UpdateProductOrVariant';
+}
+export class UpdateVariantsByIdMessage extends WorkerMessage<
+    UpdateVariantsByIdMessageData,
+    ReindexMessageResponse
+> {
+    static readonly pattern = 'UpdateVariantsById';
+}

+ 4 - 16
packages/core/src/plugin/plugin-common.module.ts

@@ -1,11 +1,9 @@
 import { Module } from '@nestjs/common';
-import { ClientProxyFactory } from '@nestjs/microservices';
 
 import { ConfigModule } from '../config/config.module';
-import { ConfigService } from '../config/config.service';
 import { EventBusModule } from '../event-bus/event-bus.module';
 import { ServiceModule } from '../service/service.module';
-import { VENDURE_WORKER_CLIENT } from '../worker/constants';
+import { WorkerServiceModule } from '../worker/worker-service.module';
 
 /**
  * @description
@@ -18,25 +16,15 @@ import { VENDURE_WORKER_CLIENT } from '../worker/constants';
  * * EventBusModule, allowing the injection of the {@link EventBus} service.
  * * ServiceModule allowing the injection of any of the various entity services such as ProductService, OrderService etc.
  * * ConfigModule, allowing the injection of the ConfigService.
- * * The `VENDURE_WORKER_CLIENT` token, allowing the injection of the Nest microservice ClientProxy.
+ * * WorkerServiceModule, allowing the injection of the {@link WorkerService}.
  *
  * @docsCategory plugin
  */
 @Module({
-    imports: [EventBusModule, ConfigModule, ServiceModule.forPlugin()],
+    imports: [EventBusModule, ConfigModule, ServiceModule.forPlugin(), WorkerServiceModule],
     providers: [
-        {
-            provide: VENDURE_WORKER_CLIENT,
-            useFactory: (configService: ConfigService) => {
-                return ClientProxyFactory.create({
-                    transport: configService.workerOptions.transport as any,
-                    options: configService.workerOptions.options as any,
-                });
-            },
-            inject: [ConfigService],
-        },
         // TODO: Provide an injectable which defines whether in main or worker context
     ],
-    exports: [EventBusModule, ConfigModule, ServiceModule.forPlugin(), VENDURE_WORKER_CLIENT],
+    exports: [EventBusModule, ConfigModule, ServiceModule.forPlugin(), WorkerServiceModule],
 })
 export class PluginCommonModule {}

+ 42 - 29
packages/core/src/service/helpers/job-manager/job-manager.spec.ts

@@ -3,10 +3,15 @@ import { JobState } from '@vendure/common/lib/generated-types';
 import { pick } from '@vendure/common/lib/pick';
 import { Subject } from 'rxjs';
 
+import { JobReporter, PartialJobReporter } from '../../services/job.service';
+
 import { JobManager } from './job-manager';
 
 describe('JobManager', () => {
-    const noop = () => {};
+    const noop = () => Promise.resolve();
+    const mockReporter: PartialJobReporter = {
+        complete(result?: any): void {},
+    };
 
     it('getOne() returns null for invalid id', () => {
         const jm = new JobManager();
@@ -15,13 +20,13 @@ describe('JobManager', () => {
 
     it('createJob() returns a job', () => {
         const jm = new JobManager();
-        const job = jm.createJob('test', noop);
+        const job = jm.createJob('test', noop, mockReporter);
         expect(job.name).toBe('test');
     });
 
     it('getOne() returns job by id', () => {
         const jm = new JobManager();
-        const job1 = jm.createJob('test', noop);
+        const job1 = jm.createJob('test', noop, mockReporter);
         const job2 = jm.getOne(job1.id);
 
         expect(job1.id).toBe(job2!.id);
@@ -30,7 +35,7 @@ describe('JobManager', () => {
     it('job completes once work fn returns', async () => {
         const jm = new JobManager();
         const subject = new Subject();
-        const job = jm.createJob('test', () => subject.toPromise());
+        const job = jm.createJob('test', () => subject.toPromise(), mockReporter);
         job.start();
         await tick();
 
@@ -48,7 +53,7 @@ describe('JobManager', () => {
     it('job fails if work fn throws', async () => {
         const jm = new JobManager();
         const subject = new Subject();
-        const job = jm.createJob('test', () => subject.toPromise());
+        const job = jm.createJob('test', () => subject.toPromise(), mockReporter);
         job.start();
         await tick();
 
@@ -64,12 +69,22 @@ describe('JobManager', () => {
 
     it('reporter.setProgress updates job progress', async () => {
         const jm = new JobManager();
-        const subject = new Subject();
         const progressSubject = new Subject<number>();
-        const job = jm.createJob('test', (reporter => {
-            progressSubject.subscribe(val => reporter.setProgress(val));
-            return subject.toPromise();
-        }));
+        const testReporter: PartialJobReporter = {
+            complete(r?: any): void {},
+        };
+        const wrappedWork = () => {
+            return new Promise(async (resolve, reject) => {
+                testReporter.complete = res => resolve(res);
+                progressSubject.subscribe(
+                    val => testReporter.setProgress!(val),
+                    () => testReporter.complete(),
+                    () => testReporter.complete(),
+                );
+                const r = await progressSubject.toPromise();
+            });
+        };
+        const job = jm.createJob('test', wrappedWork, testReporter);
         job.start();
         await tick();
         expect(jm.getOne(job.id)!.progress).toBe(0);
@@ -86,7 +101,7 @@ describe('JobManager', () => {
         progressSubject.next(88);
         expect(jm.getOne(job.id)!.progress).toBe(88);
 
-        subject.complete();
+        progressSubject.complete();
         await tick();
 
         const result = jm.getOne(job.id)!;
@@ -95,36 +110,36 @@ describe('JobManager', () => {
 
     it('getAll() returns all jobs', () => {
         const jm = new JobManager();
-        const job1 = jm.createJob('job1', noop);
-        const job2 = jm.createJob('job2', noop);
-        const job3 = jm.createJob('job3', noop);
+        const job1 = jm.createJob('job1', noop, mockReporter);
+        const job2 = jm.createJob('job2', noop, mockReporter);
+        const job3 = jm.createJob('job3', noop, mockReporter);
 
         expect(jm.getAll().map(j => j.id)).toEqual([job1.id, job2.id, job3.id]);
     });
 
     it('getAll() filters by id', () => {
         const jm = new JobManager();
-        const job1 = jm.createJob('job1', noop);
-        const job2 = jm.createJob('job2', noop);
-        const job3 = jm.createJob('job3', noop);
+        const job1 = jm.createJob('job1', noop, mockReporter);
+        const job2 = jm.createJob('job2', noop, mockReporter);
+        const job3 = jm.createJob('job3', noop, mockReporter);
 
-        expect(jm.getAll({ ids: [job1.id, job3.id]}).map(j => j.id)).toEqual([job1.id, job3.id]);
+        expect(jm.getAll({ ids: [job1.id, job3.id] }).map(j => j.id)).toEqual([job1.id, job3.id]);
     });
 
     it('getAll() filters by state', async () => {
         const jm = new JobManager();
         const subject = new Subject();
-        const job1 = jm.createJob('job1', noop);
-        const job2 = jm.createJob('job2', noop);
-        const job3 = jm.createJob('job3', () => subject.toPromise());
+        const job1 = jm.createJob('job1', noop, mockReporter);
+        const job2 = jm.createJob('job2', noop, mockReporter);
+        const job3 = jm.createJob('job3', () => subject.toPromise(), mockReporter);
         job1.start();
         job2.start();
         job3.start();
 
         await tick();
 
-        expect(jm.getAll({ state: JobState.COMPLETED}).map(j => j.id)).toEqual([job1.id, job2.id]);
-        expect(jm.getAll({ state: JobState.RUNNING}).map(j => j.id)).toEqual([job3.id]);
+        expect(jm.getAll({ state: JobState.COMPLETED }).map(j => j.id)).toEqual([job1.id, job2.id]);
+        expect(jm.getAll({ state: JobState.RUNNING }).map(j => j.id)).toEqual([job3.id]);
     });
 
     it('clean() removes completed jobs older than maxAge', async () => {
@@ -132,8 +147,8 @@ describe('JobManager', () => {
         const subject1 = new Subject();
         const subject2 = new Subject();
 
-        const job1 = jm.createJob('job1', () => subject1.toPromise());
-        const job2 = jm.createJob('job2', () => subject2.toPromise());
+        const job1 = jm.createJob('job1', () => subject1.toPromise(), mockReporter);
+        const job2 = jm.createJob('job2', () => subject2.toPromise(), mockReporter);
         job1.start();
         job2.start();
 
@@ -151,16 +166,14 @@ describe('JobManager', () => {
 
         jm.clean();
 
-        expect(jm.getAll().map(pick(['name', 'state']))).toEqual([
-            { name: 'job2', state: JobState.RUNNING },
-        ]);
+        expect(jm.getAll().map(pick(['name', 'state']))).toEqual([{ name: 'job2', state: JobState.RUNNING }]);
     });
 
     it('findRunningJob() works', async () => {
         const jm = new JobManager();
         const subject1 = new Subject();
 
-        const job1 = jm.createJob('job1', () => subject1.toPromise());
+        const job1 = jm.createJob('job1', () => subject1.toPromise(), mockReporter);
         job1.start();
 
         expect(jm.findRunningJob('job1')).toBe(job1);

+ 24 - 27
packages/core/src/service/helpers/job-manager/job-manager.ts

@@ -2,16 +2,9 @@ import { JobInfo, JobListInput, JobState } from '@vendure/common/lib/generated-t
 import { pick } from '@vendure/common/lib/pick';
 import ms = require('ms');
 
-import { Job } from './job';
+import { PartialJobReporter } from '../../services/job.service';
 
-/**
- * The JobReporter allows a long-running job to update its progress during the
- * duration of the work function. This can then be used in the client application
- * to display a progress indication to the user.
- */
-export interface JobReporter {
-    setProgress(percentage: number): void;
-}
+import { Job } from './job';
 
 /**
  * The JobManager is responsible for creating and monitoring {@link Job} instances.
@@ -36,27 +29,29 @@ export class JobManager {
      * property of the job. If the function throws, the job will fail and the `result` property
      * will be the error thrown.
      */
-    createJob(name: string, work: (reporter: JobReporter) => any | Promise<any>): Job {
-        const job = new Job(name, work);
+    createJob(name: string, work: () => Promise<any>, reporter: PartialJobReporter): Job {
+        const job = new Job(name, work, reporter);
         this.jobs.set(job.id, job);
         return job;
     }
 
     getAll(input?: JobListInput): JobInfo[] {
-        return Array.from(this.jobs.values()).map(this.toJobInfo).filter(job => {
-            if (input) {
-                let match = false;
-                if (input.state) {
-                    match = job.state === input.state;
-                }
-                if (input.ids) {
-                    match = input.ids.includes(job.id);
+        return Array.from(this.jobs.values())
+            .map(this.toJobInfo)
+            .filter(job => {
+                if (input) {
+                    let match = false;
+                    if (input.state) {
+                        match = job.state === input.state;
+                    }
+                    if (input.ids) {
+                        match = input.ids.includes(job.id);
+                    }
+                    return match;
+                } else {
+                    return true;
                 }
-                return match;
-            } else {
-                return true;
-            }
-        });
+            });
     }
 
     getOne(jobId: string): JobInfo | null {
@@ -71,7 +66,7 @@ export class JobManager {
      * Removes all completed jobs which are older than the maxAge.
      */
     clean() {
-        const nowMs = +(new Date());
+        const nowMs = +new Date();
         Array.from(this.jobs.values()).forEach(job => {
             if (job.ended) {
                 const delta = nowMs - +job.ended;
@@ -83,11 +78,13 @@ export class JobManager {
     }
 
     findRunningJob(name: string): Job | undefined {
-        return Array.from(this.jobs.values()).find(job => job.name === name && job.state === JobState.RUNNING);
+        return Array.from(this.jobs.values()).find(
+            job => job.name === name && job.state === JobState.RUNNING,
+        );
     }
 
     private toJobInfo(job: Job): JobInfo {
-        const info =  pick(job, ['id', 'name', 'state', 'progress', 'result', 'started', 'ended']);
+        const info = pick(job, ['id', 'name', 'state', 'progress', 'result', 'started', 'ended']);
         const duration = job.ended ? +job.ended - +info.started : Date.now() - +info.started;
         return { ...info, duration };
     }

+ 17 - 4
packages/core/src/service/helpers/job-manager/job.spec.ts

@@ -1,12 +1,25 @@
+import { PartialJobReporter } from '../../services/job.service';
+
 import { Job } from './job';
 
 describe('Job', () => {
     it('does not run work more than once', () => {
         let counter = 0;
-        const job = new Job('test', () => {
-            counter++;
-            return new Promise(() => {});
-        });
+        const mockReporter: PartialJobReporter = {
+            complete: (result?: any) => {
+                /**/
+            },
+        };
+        const job = new Job(
+            'test',
+            () => {
+                counter++;
+                return new Promise(() => {
+                    /**/
+                });
+            },
+            mockReporter,
+        );
         job.start();
 
         expect(counter).toBe(1);

+ 9 - 8
packages/core/src/service/helpers/job-manager/job.ts

@@ -1,8 +1,7 @@
 import { JobState } from '@vendure/common/lib/generated-types';
 
 import { generatePublicId } from '../../../common/generate-public-id';
-
-import { JobReporter } from './job-manager';
+import { PartialJobReporter } from '../../services/job.service';
 
 /**
  * A Job represents a piece of work to be run in the background, i.e. outside the request-response cycle.
@@ -17,7 +16,11 @@ export class Job {
     started: Date;
     ended: Date;
 
-    constructor(public name: string, public work: (reporter: JobReporter) => any | Promise<any>) {
+    constructor(
+        public name: string,
+        public work: () => any | Promise<any>,
+        private reporter: PartialJobReporter,
+    ) {
         this.id = generatePublicId();
         this.started = new Date();
     }
@@ -26,15 +29,13 @@ export class Job {
         if (this.state !== JobState.PENDING) {
             return;
         }
-        const reporter: JobReporter = {
-            setProgress: (percentage: number) => {
-                this.progress = Math.max(Math.min(percentage, 100), 0);
-            },
+        this.reporter.setProgress = (percentage: number) => {
+            this.progress = Math.max(Math.min(percentage, 100), 0);
         };
         let result: any;
         try {
             this.state = JobState.RUNNING;
-            result = await this.work(reporter);
+            result = await this.work();
             this.progress = 100;
             this.result = result;
             this.state = JobState.COMPLETED;

+ 0 - 1
packages/core/src/service/index.ts

@@ -13,7 +13,6 @@ export * from './services/facet.service';
 export * from './services/facet-value.service';
 export * from './services/global-settings.service';
 export * from './services/job.service';
-export * from './services/job.service';
 export * from './services/order.service';
 export * from './services/payment-method.service';
 export * from './services/product.service';

+ 2 - 1
packages/core/src/service/service.module.ts

@@ -4,6 +4,7 @@ import { TypeOrmModule } from '@nestjs/typeorm';
 import { ConfigModule } from '../config/config.module';
 import { ConfigService } from '../config/config.service';
 import { EventBusModule } from '../event-bus/event-bus.module';
+import { WorkerServiceModule } from '../worker/worker-service.module';
 
 import { AssetUpdater } from './helpers/asset-updater/asset-updater';
 import { ListQueryBuilder } from './helpers/list-query-builder/list-query-builder';
@@ -90,7 +91,7 @@ let workerTypeOrmModule: DynamicModule;
  * only run a single time.
  */
 @Module({
-    imports: [ConfigModule, EventBusModule],
+    imports: [ConfigModule, EventBusModule, WorkerServiceModule],
     providers: [
         ...exportedProviders,
         PasswordCiper,

+ 39 - 10
packages/core/src/service/services/job.service.ts

@@ -1,9 +1,28 @@
 import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
-import { JobInfo, JobListInput, JobState } from '@vendure/common/lib/generated-types';
+import { JobInfo, JobListInput } from '@vendure/common/lib/generated-types';
+import ms = require('ms');
 
 import { Job } from '../helpers/job-manager/job';
-import { JobManager, JobReporter } from '../helpers/job-manager/job-manager';
-import ms = require('ms');
+import { JobManager } from '../helpers/job-manager/job-manager';
+
+/**
+ * The JobReporter allows a long-running job to update its progress during the
+ * duration of the work function. This can then be used in the client application
+ * to display a progress indication to the user.
+ */
+export interface JobReporter {
+    setProgress(percentage: number): void;
+    complete(result?: any): void;
+}
+
+export type PartialJobReporter = Omit<JobReporter, 'setProgress'> & Partial<Pick<JobReporter, 'setProgress'>>;
+
+export interface CreateJobOptions {
+    name: string;
+    work: (reporter: JobReporter) => any | Promise<any>;
+    /** Limit this job to a single instance at a time */
+    singleInstance?: boolean;
+}
 
 @Injectable()
 export class JobService implements OnModuleInit, OnModuleDestroy {
@@ -19,19 +38,29 @@ export class JobService implements OnModuleInit, OnModuleDestroy {
         global.clearInterval(this.cleanJobsTimer);
     }
 
-    createJob(options: {
-        name: string;
-        work: (reporter: JobReporter) => any | Promise<any>;
-        /** Limit this job to a single instance at a time */
-        singleInstance?: boolean;
-    }): Job {
+    createJob(options: CreateJobOptions): Job {
         if (options.singleInstance === true) {
             const runningInstance = this.manager.findRunningJob(options.name);
             if (runningInstance) {
                 return runningInstance;
             }
         }
-        return this.manager.createJob(options.name, options.work);
+        const reporter: PartialJobReporter = {
+            complete: (result: any) => {
+                /* empty */
+            },
+        };
+        const wrappedWork = () => {
+            return new Promise(async (resolve, reject) => {
+                reporter.complete = result => resolve(result);
+                try {
+                    const result = await options.work(reporter as JobReporter);
+                } catch (e) {
+                    reject(e);
+                }
+            });
+        };
+        return this.manager.createJob(options.name, wrappedWork, reporter);
     }
 
     getAll(input?: JobListInput): JobInfo[] {

+ 2 - 0
packages/core/src/worker/index.ts

@@ -0,0 +1,2 @@
+export * from './worker.service';
+export * from './types';

+ 38 - 0
packages/core/src/worker/types.ts

@@ -0,0 +1,38 @@
+/**
+ * @description
+ * A class which is used to define the contract between the Vendure server and the worker process. Used
+ * by the {@link WorkerService} `send` method.
+ *
+ * @example
+ * ```
+ * export class ReindexMessage extends WorkerMessage<{ ctx: RequestContext }, boolean> {
+ *     static readonly pattern = 'Reindex';
+ * }
+ *
+ * // in a Service running on the main process
+ * class MyService {
+ *
+ *      constructor(private workerService: WorkerService) {}
+ *
+ *      reindex(ctx: RequestContext): Observable<boolean>> {
+ *          return this.workerService.send(new ReindexMessage({ ctx }))
+ *      }
+ * }
+ *
+ * // in a microservice Controller on the worker process
+ * class MyController {
+ *
+ *      \@MessagePattern(ReindexMessage.pattern)
+ *      reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
+ *         // ... some long-running workload
+ *      }
+ * }
+ * ```
+ *
+ * @docsCategory worker
+ */
+export abstract class WorkerMessage<T, R> {
+    static readonly pattern: string;
+    constructor(public data: T) {}
+    response?: R;
+}

+ 27 - 0
packages/core/src/worker/worker-service.module.ts

@@ -0,0 +1,27 @@
+import { Module } from '@nestjs/common';
+import { ClientProxyFactory } from '@nestjs/microservices';
+
+import { ConfigModule } from '../config/config.module';
+import { ConfigService } from '../config/config.service';
+
+import { VENDURE_WORKER_CLIENT } from './constants';
+import { WorkerService } from './worker.service';
+
+@Module({
+    imports: [ConfigModule],
+    providers: [
+        WorkerService,
+        {
+            provide: VENDURE_WORKER_CLIENT,
+            useFactory: (configService: ConfigService) => {
+                return ClientProxyFactory.create({
+                    transport: configService.workerOptions.transport as any,
+                    options: configService.workerOptions.options as any,
+                });
+            },
+            inject: [ConfigService],
+        },
+    ],
+    exports: [WorkerService],
+})
+export class WorkerServiceModule {}

+ 2 - 1
packages/core/src/worker/worker.module.ts

@@ -8,9 +8,10 @@ import { ServiceModule } from '../service/service.module';
 
 import { MessageInterceptor } from './message-interceptor';
 import { WorkerMonitor } from './worker-monitor';
+import { WorkerServiceModule } from './worker-service.module';
 
 @Module({
-    imports: [ConfigModule, ServiceModule.forWorker(), PluginModule.forWorker()],
+    imports: [ConfigModule, ServiceModule.forWorker(), PluginModule.forWorker(), WorkerServiceModule],
     providers: [
         WorkerMonitor,
         {

+ 62 - 0
packages/core/src/worker/worker.service.ts

@@ -0,0 +1,62 @@
+import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
+import { ClientProxy } from '@nestjs/microservices';
+import { BehaviorSubject, Observable } from 'rxjs';
+import { filter, mergeMap, take, tap } from 'rxjs/operators';
+
+import { VENDURE_WORKER_CLIENT } from './constants';
+import { WorkerMessage } from './types';
+
+/**
+ * @description
+ * This service is responsible for sending messages to the Worker process. See the {@link WorkerMessage}
+ * docs for an example of usage.
+ *
+ * @docsCategory worker
+ */
+@Injectable()
+export class WorkerService implements OnModuleDestroy {
+    private pendingConnection = false;
+    private initialConnection = new BehaviorSubject(false);
+    constructor(@Inject(VENDURE_WORKER_CLIENT) private readonly client: ClientProxy) {}
+
+    /**
+     * @description
+     * Sends a {@link WorkerMessage} to the worker process, where there should be a Controller with a method
+     * listening out for the message's pattern.
+     */
+    send<T, R>(message: WorkerMessage<T, R>): Observable<R> {
+        // The rather convoluted logic here is required in order to prevent more than
+        // one connection being opened in the event that the `send` method is called multiple
+        // times in the same event loop tick.
+        // On the first invokation, the first path is taken, which establishes the single
+        // connection (implicit in the first call to ClientProxt.send()). All subsequent
+        // invokations take the second code path.
+        if (!this.pendingConnection && this.initialConnection.value === false) {
+            this.pendingConnection = true;
+            return this.client
+                .send<R, T>((message.constructor as typeof WorkerMessage).pattern, message.data)
+                .pipe(
+                    tap(() => {
+                        this.initialConnection.next(true);
+                        this.pendingConnection = false;
+                    }),
+                );
+        } else {
+            return this.initialConnection.pipe(
+                filter(val => val),
+                take(1),
+                mergeMap(() => {
+                    return this.client.send<R, T>(
+                        (message.constructor as typeof WorkerMessage).pattern,
+                        message.data,
+                    );
+                }),
+            );
+        }
+    }
+
+    /** @internal */
+    onModuleDestroy() {
+        this.client.close();
+    }
+}