Просмотр исходного кода

feat(core): Make JobQueue jobs subscribable

Relates to #775
Michael Bromley 4 лет назад
Родитель
Сommit
baba268b69

+ 43 - 0
docs/content/docs/plugins/plugin-examples/using-job-queue-service.md

@@ -107,3 +107,46 @@ import { ProductVideoResolver } from './product-video.resolver'
 export class ProductVideoPlugin {}
 ```
 Finally, the `ProductVideoPlugin` brings it all together, extending the GraphQL API, defining the required CustomField to store the transcoded video URL, and registering our service and resolver. The [PluginCommonModule]({{< relref "plugin-common-module" >}}) is imported as it exports the JobQueueService.
+
+## Subscribing to job updates
+
+When creating a new job via `JobQueue.add()`, it is possible to subscribe to updates to that Job (progress and status changes). This allows you, for example, to create resolvers which are able to return the results of a given Job.
+
+In the video transcoding example above, we could modify the `transcodeForProduct()` call to look like this:
+
+```TypeScript
+import { of } from 'rxjs';
+import { map, catchError } from 'rxjs/operators';
+
+@Injectable()
+class ProductVideoService implements OnModuleInit { 
+  // ... omitted
+    
+  transcodeForProduct(productId: ID, videoUrl: string) { 
+    const job = await this.jobQueue.add({ productId, videoUrl }, { retries: 2 });
+    
+    return job.updates().pipe(
+      map(update => {
+        // The returned Observable will emit a value for every update to the job
+        // such as when the `progress` or `status` value changes.
+        Logger.info(`Job ${update.id}: progress: ${update.progress}`);
+        if (update.state === JobState.COMPLETED) {
+          Logger.info(`COMPLETED ${update.id}: ${update.result}`);
+        }
+        return update.result;
+      }),
+      catchError(err => of(err.message)),
+    );
+  }
+}
+```
+
+If you prefer to work with Promises rather than Rxjs Observables, you can also convert the updates to a promise:
+
+```TypeScript
+const job = await this.jobQueue.add({ productId, videoUrl }, { retries: 2 });
+    
+return job.updates().toPromise()
+  .then(/* ... */)
+  .catch(/* ... */);
+```

+ 54 - 45
packages/core/src/job-queue/job-queue.service.spec.ts

@@ -6,7 +6,7 @@ import { JobState } from '@vendure/common/lib/generated-types';
 import { Subject } from 'rxjs';
 import { take } from 'rxjs/operators';
 
-import { Injector } from '../common';
+import { assertFound, Injector } from '../common';
 import { ConfigService } from '../config/config.service';
 
 import { Job } from './job';
@@ -14,11 +14,17 @@ import { JobQueueService } from './job-queue.service';
 import { TestingJobQueueStrategy } from './testing-job-queue-strategy';
 
 const queuePollInterval = 10;
+const testJobQueueStrategy = new TestingJobQueueStrategy(1, queuePollInterval);
 
 describe('JobQueueService', () => {
     let jobQueueService: JobQueueService;
     let module: TestingModule;
 
+    function getJob(job: Job | string): Promise<Job> {
+        const id = typeof job === 'string' ? job : job.id!;
+        return assertFound(testJobQueueStrategy.findOne(id));
+    }
+
     beforeEach(async () => {
         module = await Test.createTestingModule({
             providers: [{ provide: ConfigService, useClass: MockConfigService }, JobQueueService],
@@ -61,15 +67,15 @@ describe('JobQueueService', () => {
         expect(testJob.state).toBe(JobState.PENDING);
 
         await tick(queuePollInterval);
-        expect(testJob.state).toBe(JobState.RUNNING);
+        expect((await getJob(testJob)).state).toBe(JobState.RUNNING);
 
         subject.next('yay');
         subject.complete();
 
         await tick();
 
-        expect(testJob.state).toBe(JobState.COMPLETED);
-        expect(testJob.result).toBe('yay');
+        expect((await getJob(testJob)).state).toBe(JobState.COMPLETED);
+        expect((await getJob(testJob)).result).toBe('yay');
     });
 
     it('job marked as failed when exception thrown', async () => {
@@ -86,14 +92,14 @@ describe('JobQueueService', () => {
         expect(testJob.state).toBe(JobState.PENDING);
 
         await tick(queuePollInterval);
-        expect(testJob.state).toBe(JobState.RUNNING);
+        expect((await getJob(testJob)).state).toBe(JobState.RUNNING);
 
         subject.next('uh oh');
         subject.complete();
         await tick();
 
-        expect(testJob.state).toBe(JobState.FAILED);
-        expect(testJob.error).toBe('uh oh');
+        expect((await getJob(testJob)).state).toBe(JobState.FAILED);
+        expect((await getJob(testJob)).error).toBe('uh oh');
     });
 
     it('job marked as failed when async error thrown', async () => {
@@ -109,8 +115,8 @@ describe('JobQueueService', () => {
         expect(testJob.state).toBe(JobState.PENDING);
 
         await tick(queuePollInterval);
-        expect(testJob.state).toBe(JobState.FAILED);
-        expect(testJob.error).toBe(err.message);
+        expect((await getJob(testJob)).state).toBe(JobState.FAILED);
+        expect((await getJob(testJob)).error).toBe(err.message);
     });
 
     it('jobs processed in FIFO queue', async () => {
@@ -126,29 +132,33 @@ describe('JobQueueService', () => {
         const testJob2 = await testQueue.add('2');
         const testJob3 = await testQueue.add('3');
 
-        const getStates = () => [testJob1.state, testJob2.state, testJob3.state];
+        const getStates = async () => [
+            (await getJob(testJob1)).state,
+            (await getJob(testJob2)).state,
+            (await getJob(testJob3)).state,
+        ];
 
         await tick(queuePollInterval);
 
-        expect(getStates()).toEqual([JobState.RUNNING, JobState.PENDING, JobState.PENDING]);
+        expect(await getStates()).toEqual([JobState.RUNNING, JobState.PENDING, JobState.PENDING]);
 
         subject.next();
         await tick();
-        expect(getStates()).toEqual([JobState.COMPLETED, JobState.PENDING, JobState.PENDING]);
+        expect(await getStates()).toEqual([JobState.COMPLETED, JobState.PENDING, JobState.PENDING]);
 
         await tick(queuePollInterval);
-        expect(getStates()).toEqual([JobState.COMPLETED, JobState.RUNNING, JobState.PENDING]);
+        expect(await getStates()).toEqual([JobState.COMPLETED, JobState.RUNNING, JobState.PENDING]);
 
         subject.next();
         await tick();
-        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
+        expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
 
         await tick(queuePollInterval);
-        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
+        expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
 
         subject.next();
         await tick();
-        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
+        expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
 
         subject.complete();
     });
@@ -171,31 +181,32 @@ describe('JobQueueService', () => {
         const testJob2 = await testQueue.add('2');
         const testJob3 = await testQueue.add('3');
 
-        const getStates = () => [testJob1.state, testJob2.state, testJob3.state];
+        const getStates = async () => [
+            (await getJob(testJob1)).state,
+            (await getJob(testJob2)).state,
+            (await getJob(testJob3)).state,
+        ];
 
         await tick(queuePollInterval);
 
-        expect(getStates()).toEqual([JobState.RUNNING, JobState.RUNNING, JobState.PENDING]);
+        expect(await getStates()).toEqual([JobState.RUNNING, JobState.RUNNING, JobState.PENDING]);
 
         subject.next();
         await tick();
-        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
+        expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
 
         await tick(queuePollInterval);
-        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
+        expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
 
         subject.next();
         await tick();
-        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
+        expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
 
         subject.complete();
     });
 
     it('processes existing jobs on start', async () => {
-        const testingJobQueueStrategy = module.get(ConfigService).jobQueueOptions
-            .jobQueueStrategy as TestingJobQueueStrategy;
-
-        await testingJobQueueStrategy.prePopulate([
+        await testJobQueueStrategy.prePopulate([
             new Job<any>({
                 queueName: 'test',
                 data: {},
@@ -215,15 +226,13 @@ describe('JobQueueService', () => {
             },
         });
 
-        await tick();
-
-        const job1 = await testingJobQueueStrategy.findOne('job-1');
-        const job2 = await testingJobQueueStrategy.findOne('job-2');
+        const job1 = await getJob('job-1');
+        const job2 = await getJob('job-2');
         expect(job1?.state).toBe(JobState.COMPLETED);
-        expect(job2?.state).toBe(JobState.PENDING);
+        expect(job2?.state).toBe(JobState.RUNNING);
 
         await tick(queuePollInterval);
-        expect(job2?.state).toBe(JobState.COMPLETED);
+        expect((await getJob('job-2')).state).toBe(JobState.COMPLETED);
     });
 
     it('retries', async () => {
@@ -245,25 +254,25 @@ describe('JobQueueService', () => {
         const testJob = await testQueue.add('hello', { retries: 2 });
 
         await tick(queuePollInterval);
-        expect(testJob.state).toBe(JobState.RUNNING);
-        expect(testJob.isSettled).toBe(false);
+        expect((await getJob(testJob)).state).toBe(JobState.RUNNING);
+        expect((await getJob(testJob)).isSettled).toBe(false);
 
         subject.next(false);
         await tick();
-        expect(testJob.state).toBe(JobState.RETRYING);
-        expect(testJob.isSettled).toBe(false);
+        expect((await getJob(testJob)).state).toBe(JobState.RETRYING);
+        expect((await getJob(testJob)).isSettled).toBe(false);
 
         await tick(queuePollInterval);
         subject.next(false);
         await tick();
-        expect(testJob.state).toBe(JobState.RETRYING);
-        expect(testJob.isSettled).toBe(false);
+        expect((await getJob(testJob)).state).toBe(JobState.RETRYING);
+        expect((await getJob(testJob)).isSettled).toBe(false);
 
         await tick(queuePollInterval);
         subject.next(false);
         await tick();
-        expect(testJob.state).toBe(JobState.FAILED);
-        expect(testJob.isSettled).toBe(true);
+        expect((await getJob(testJob)).state).toBe(JobState.FAILED);
+        expect((await getJob(testJob)).isSettled).toBe(true);
     });
 
     it('sets long-running jobs to pending on destroy', async () => {
@@ -304,14 +313,14 @@ describe('JobQueueService', () => {
         expect(testJob.state).toBe(JobState.PENDING);
 
         await tick(queuePollInterval);
-        expect(testJob.state).toBe(JobState.RUNNING);
+        expect((await getJob(testJob)).state).toBe(JobState.RUNNING);
 
         subject.next('yay');
         subject.complete();
         await tick();
 
-        expect(testJob.state).toBe(JobState.COMPLETED);
-        expect(testJob.result).toBe('yay');
+        expect((await getJob(testJob)).state).toBe(JobState.COMPLETED);
+        expect((await getJob(testJob)).result).toBe('yay');
     });
 
     it('should not start a queue if its name is in the active list', async () => {
@@ -329,12 +338,12 @@ describe('JobQueueService', () => {
         expect(testJob.state).toBe(JobState.PENDING);
 
         await tick(queuePollInterval);
-        expect(testJob.state).toBe(JobState.PENDING);
+        expect((await getJob(testJob)).state).toBe(JobState.PENDING);
 
         subject.next('yay');
         subject.complete();
 
-        expect(testJob.state).toBe(JobState.PENDING);
+        expect((await getJob(testJob)).state).toBe(JobState.PENDING);
     });
 });
 
@@ -353,7 +362,7 @@ class MockConfigService implements OnApplicationBootstrap, OnModuleDestroy {
     constructor(private moduleRef: ModuleRef) {}
 
     jobQueueOptions = {
-        jobQueueStrategy: new TestingJobQueueStrategy(1, queuePollInterval),
+        jobQueueStrategy: testJobQueueStrategy,
         activeQueues: [],
     };
 

+ 35 - 4
packages/core/src/job-queue/job-queue.ts

@@ -6,12 +6,13 @@ import { JobQueueStrategy } from '../config';
 import { Logger } from '../config/logger/vendure-logger';
 
 import { Job } from './job';
+import { SubscribableJob } from './subscribable-job';
 import { CreateQueueOptions, JobConfig, JobData } from './types';
 
 /**
  * @description
  * A JobQueue is used to process {@link Job}s. A job is added to the queue via the
- * `.add()` method, and the queue will then poll for new jobs and process each
+ * `.add()` method, and the configured {@link JobQueueStrategy} will check for new jobs and process each
  * according to the defined `process` function.
  *
  * *Note*: JobQueue instances should not be directly instantiated. Rather, the
@@ -53,14 +54,44 @@ export class JobQueue<Data extends JobData<Data> = {}> {
 
     /**
      * @description
-     * Adds a new {@link Job} to the queue.
+     * Adds a new {@link Job} to the queue. The resolved {@link SubscribableJob} allows the
+     * calling code to subscribe to updates to the Job:
+     *
+     * @example
+     * ```TypeScript
+     * const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 2 });
+     * return job.updates().pipe(
+     *   map(update => {
+     *     // The returned Observable will emit a value for every update to the job
+     *     // such as when the `progress` or `status` value changes.
+     *     Logger.info(`Job ${update.id}: progress: ${update.progress}`);
+     *     if (update.state === JobState.COMPLETED) {
+     *       Logger.info(`COMPLETED ${update.id}: ${update.result}`);
+     *     }
+     *     return update.result;
+     *   }),
+     *   catchError(err => of(err.message)),
+     * );
+     * ```
+     *
+     * Alternatively, if you aren't interested in the intermediate
+     * `progress` changes, you can convert to a Promise like this:
+     *
+     * @example
+     * ```TypeScript
+     * const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 2 });
+     * return job.updates().toPromise()
+     *   .then(update => update.result),
+     *   .catch(err => err.message);
+     * ```
      */
-    add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>) {
+    async add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>): Promise<SubscribableJob<Data>> {
         const job = new Job<any>({
             data,
             queueName: this.options.name,
             retries: options?.retries ?? 0,
         });
-        return this.jobQueueStrategy.add(job);
+        const addedJob = await this.jobQueueStrategy.add(job);
+        return new SubscribableJob(addedJob, this.jobQueueStrategy);
     }
 }

+ 101 - 0
packages/core/src/job-queue/subscribable-job.ts

@@ -0,0 +1,101 @@
+import { JobState } from '@vendure/common/lib/generated-types';
+import { pick } from '@vendure/common/lib/pick';
+import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
+import ms from 'ms';
+import { interval, Observable } from 'rxjs';
+import { distinctUntilChanged, filter, map, switchMap, takeWhile, tap } from 'rxjs/operators';
+
+import { InternalServerError } from '../common/error/errors';
+import { isInspectableJobQueueStrategy } from '../config/job-queue/inspectable-job-queue-strategy';
+import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+
+import { Job } from './job';
+import { JobConfig, JobData } from './types';
+
+/**
+ * @description
+ * Job update status as returned from the {@link SubscribableJob}'s `update()` method.
+ *
+ * @docsCategory JobQueue
+ * @docsPage types
+ */
+export type JobUpdate<T extends JobData<T>> = Pick<
+    Job<T>,
+    'id' | 'state' | 'progress' | 'result' | 'error' | 'data'
+>;
+
+/**
+ * @description
+ * This is a type of Job object that allows you to subscribe to updates to the Job. It is returned
+ * by the {@link JobQueue}'s `add()` method. Note that the subscription capability is only supported
+ * if the {@link JobQueueStrategy} implements the {@link InspectableJobQueueStrategy} interface (e.g.
+ * the {@link SqlJobQueueStrategy} does support this).
+ *
+ * @docsCategory JobQueue
+ */
+export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
+    private readonly jobQueueStrategy: JobQueueStrategy;
+
+    constructor(job: Job<T>, jobQueueStrategy: JobQueueStrategy) {
+        const config: JobConfig<T> = {
+            ...job,
+            state: job.state,
+            data: job.data,
+            id: job.id || undefined,
+        };
+        super(config);
+        this.jobQueueStrategy = jobQueueStrategy;
+    }
+
+    /**
+     * @description
+     * Returns an Observable stream of updates to the Job. Works by polling the current JobQueueStrategy's `findOne()` method
+     * to obtain updates. If this updates are not subscribed to, then no polling occurs.
+     *
+     * The polling interval defaults to 200ms, but can be configured by passing in an options argument. Polling will also timeout
+     * after 1 hour, but this timeout can also be configured by passing the `timeoutMs` option.
+     */
+    updates(options?: { pollInterval?: number; timeoutMs?: number }): Observable<JobUpdate<T>> {
+        const pollInterval = Math.max(50, options?.pollInterval ?? 200);
+        const timeoutMs = Math.max(pollInterval, options?.timeoutMs ?? ms('1h'));
+        const strategy = this.jobQueueStrategy;
+        if (!isInspectableJobQueueStrategy(strategy)) {
+            throw new InternalServerError(
+                `The configured JobQueueStrategy (${strategy.constructor.name}) is not inspectable, so Job updates cannot be subscribed to`,
+            );
+        } else {
+            // tslint:disable-next-line:no-non-null-assertion
+            return interval(pollInterval).pipe(
+                tap(i => {
+                    if (timeoutMs < i * pollInterval) {
+                        throw new Error(
+                            `Job ${this.id} update polling timed out after ${timeoutMs}ms. The job may still be running.`,
+                        );
+                    }
+                }),
+                switchMap(() => {
+                    const id = this.id;
+                    if (!id) {
+                        throw new Error(`Cannot subscribe to update: Job does not have an ID`);
+                    }
+                    return strategy.findOne(id);
+                }),
+                filter(notNullOrUndefined),
+                distinctUntilChanged((a, b) => a?.progress === b?.progress),
+                takeWhile(
+                    job =>
+                        job?.state !== JobState.FAILED &&
+                        job.state !== JobState.COMPLETED &&
+                        job.state !== JobState.CANCELLED,
+                    true,
+                ),
+                tap(job => {
+                    if (job.state === JobState.FAILED) {
+                        throw new Error(job.error);
+                    }
+                }),
+                map(job => pick(job, ['id', 'state', 'progress', 'result', 'error', 'data'])),
+            );
+        }
+    }
+}

+ 23 - 5
packages/dev-server/test-plugins/job-queue-test/job-queue-test-plugin.ts

@@ -1,7 +1,10 @@
 import { Injectable, OnModuleInit } from '@nestjs/common';
 import { Args, Mutation, Resolver } from '@nestjs/graphql';
+import { JobState } from '@vendure/common/lib/generated-types';
 import { JobQueue, JobQueueService, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
 import { gql } from 'apollo-server-core';
+import { of } from 'rxjs';
+import { catchError, map, tap } from 'rxjs/operators';
 
 @Injectable()
 export class JobQueueTestService implements OnModuleInit {
@@ -26,13 +29,28 @@ export class JobQueueTestService implements OnModuleInit {
                     }
                 }
                 Logger.info(`Completed job ${job.id}`);
+                return 'Done!';
             },
         });
     }
 
-    async startTask(intervalMs: number, shouldFail: boolean) {
-        await this.myQueue.add({ intervalMs, shouldFail }, { retries: 3 });
-        return true;
+    async startTask(intervalMs: number, shouldFail: boolean, subscribeToResult: boolean) {
+        const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 0 });
+        if (subscribeToResult) {
+            return job.updates({ timeoutMs: 1000 }).pipe(
+                map(update => {
+                    Logger.info(`Job ${update.id}: progress: ${update.progress}`);
+                    if (update.state === JobState.COMPLETED) {
+                        Logger.info(`COMPLETED: ${JSON.stringify(update.result, null, 2)}`);
+                        return update.result;
+                    }
+                    return update.progress;
+                }),
+                catchError(err => of(err.message)),
+            );
+        } else {
+            return 'running in background';
+        }
     }
 }
 
@@ -42,7 +60,7 @@ export class JobQueueTestResolver {
 
     @Mutation()
     startTask(@Args() args: any) {
-        return this.service.startTask(args.intervalMs, args.shouldFail);
+        return this.service.startTask(args.intervalMs, args.shouldFail, args.subscribeToResult);
     }
 }
 
@@ -56,7 +74,7 @@ export class JobQueueTestResolver {
         resolvers: [JobQueueTestResolver],
         schema: gql`
             extend type Mutation {
-                startTask(intervalMs: Int, shouldFail: Boolean!): Boolean!
+                startTask(intervalMs: Int, shouldFail: Boolean!, subscribeToResult: Boolean!): JSON!
             }
         `,
     },