Переглянути джерело

feat(core): Resume interrupted jobs in queue on restart

Michael Bromley 5 роки тому
батько
коміт
9b66d33e7c

+ 41 - 0
packages/core/e2e/fixtures/test-plugins/with-job-queue.ts

@@ -0,0 +1,41 @@
+import { Controller, Get, OnModuleInit } from '@nestjs/common';
+import { JobQueue, JobQueueService, PluginCommonModule, VendurePlugin } from '@vendure/core';
+import { Subject } from 'rxjs';
+
+@Controller('run-job')
+class TestController implements OnModuleInit {
+    private queue: JobQueue;
+
+    constructor(private jobQueueService: JobQueueService) {}
+
+    onModuleInit(): any {
+        this.queue = this.jobQueueService.createQueue({
+            name: 'test',
+            concurrency: 1,
+            process: (job) => {
+                PluginWithJobQueue.jobSubject.subscribe({
+                    complete: () => {
+                        PluginWithJobQueue.jobHasDoneWork = true;
+                        job.complete();
+                    },
+                    error: (err) => job.fail(err),
+                });
+            },
+        });
+    }
+
+    @Get()
+    runJob() {
+        this.queue.add({});
+        return true;
+    }
+}
+
+@VendurePlugin({
+    imports: [PluginCommonModule],
+    controllers: [TestController],
+})
+export class PluginWithJobQueue {
+    static jobHasDoneWork = false;
+    static jobSubject = new Subject();
+}

+ 105 - 0
packages/core/e2e/job-queue.e2e-spec.ts

@@ -0,0 +1,105 @@
+import { DefaultJobQueuePlugin, mergeConfig } from '@vendure/core';
+import { createTestEnvironment } from '@vendure/testing';
+import path from 'path';
+
+import { initialData } from '../../../e2e-common/e2e-initial-data';
+import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
+
+import { PluginWithJobQueue } from './fixtures/test-plugins/with-job-queue';
+import { GetRunningJobs, JobState } from './graphql/generated-e2e-admin-types';
+import { GET_RUNNING_JOBS } from './graphql/shared-definitions';
+
+describe('JobQueue', () => {
+    if (testConfig.dbConnectionOptions.type === 'sqljs') {
+        it.only('skip JobQueue tests for sqljs', () => {
+            // The tests in this suite will fail when running on sqljs because
+            // the DB state is not persisted after shutdown. In this case it is
+            // an acceptable tradeoff to just skip them, since the other DB engines
+            // _will_ run in CI, and sqljs is less of a production use-case anyway.
+            return;
+        });
+    }
+
+    const { server, adminClient } = createTestEnvironment(
+        mergeConfig(testConfig, {
+            plugins: [DefaultJobQueuePlugin, PluginWithJobQueue],
+        }),
+    );
+
+    beforeAll(async () => {
+        await server.init({
+            initialData,
+            productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-full.csv'),
+            customerCount: 1,
+        });
+        await adminClient.asSuperAdmin();
+        await sleep(1000);
+    }, TEST_SETUP_TIMEOUT_MS);
+
+    afterAll(async () => {
+        await server.destroy();
+    });
+
+    function getJobsInTestQueue() {
+        return adminClient
+            .query<GetRunningJobs.Query, GetRunningJobs.Variables>(GET_RUNNING_JOBS, {
+                options: {
+                    filter: {
+                        queueName: {
+                            eq: 'test',
+                        },
+                    },
+                },
+            })
+            .then((data) => data.jobs);
+    }
+
+    let testJobId: string;
+
+    it('creates and starts running a job', async () => {
+        const restControllerUrl = `http://localhost:${testConfig.port}/run-job`;
+        await adminClient.fetch(restControllerUrl);
+
+        await sleep(300);
+        const jobs = await getJobsInTestQueue();
+
+        expect(jobs.items.length).toBe(1);
+        expect(jobs.items[0].state).toBe(JobState.RUNNING);
+        expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
+        testJobId = jobs.items[0].id;
+    });
+
+    it(
+        'shutdown server before completing job',
+        async () => {
+            await server.destroy();
+            await server.bootstrap();
+            await adminClient.asSuperAdmin();
+
+            await sleep(300);
+            const jobs = await getJobsInTestQueue();
+
+            expect(jobs.items.length).toBe(1);
+            expect(jobs.items[0].state).toBe(JobState.RUNNING);
+            expect(jobs.items[0].id).toBe(testJobId);
+            expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
+        },
+        TEST_SETUP_TIMEOUT_MS,
+    );
+
+    it('complete job after restart', async () => {
+        PluginWithJobQueue.jobSubject.complete();
+
+        await sleep(300);
+        const jobs = await getJobsInTestQueue();
+
+        expect(jobs.items.length).toBe(1);
+        expect(jobs.items[0].state).toBe(JobState.COMPLETED);
+        expect(jobs.items[0].id).toBe(testJobId);
+        expect(PluginWithJobQueue.jobHasDoneWork).toBe(true);
+    });
+});
+
+function sleep(ms: number): Promise<void> {
+    return new Promise((resolve) => setTimeout(resolve, ms));
+}

+ 22 - 1
packages/core/src/job-queue/job-queue.service.spec.ts

@@ -1,3 +1,4 @@
+/* tslint:disable:no-non-null-assertion */
 import { Test, TestingModule } from '@nestjs/testing';
 import { JobState } from '@vendure/common/lib/generated-types';
 import { Subject } from 'rxjs';
@@ -6,7 +7,6 @@ import { ConfigService } from '../config/config.service';
 import { ProcessContext, ServerProcessContext } from '../process-context/process-context';
 
 import { Job } from './job';
-import { JobQueue } from './job-queue';
 import { JobQueueService } from './job-queue.service';
 import { TestingJobQueueStrategy } from './testing-job-queue-strategy';
 
@@ -277,6 +277,27 @@ describe('JobQueueService', () => {
         expect(testJob.state).toBe(JobState.FAILED);
         expect(testJob.isSettled).toBe(true);
     });
+
+    it('sets long-running jobs to pending on destroy', async () => {
+        const subject = new Subject<boolean>();
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 1,
+            process: (job) => {
+                subject.subscribe((success) => (success ? job.complete() : job.fail()));
+            },
+        });
+
+        const testJob = await testQueue.add('hello');
+
+        await tick(queuePollInterval);
+
+        expect((await jobQueueService.getJob(testJob.id!))?.state).toBe(JobState.RUNNING);
+
+        await testQueue.destroy();
+
+        expect((await jobQueueService.getJob(testJob.id!))?.state).toBe(JobState.PENDING);
+    }, 10000);
 });
 
 function tick(ms: number): Promise<void> {

+ 2 - 2
packages/core/src/job-queue/job-queue.service.ts

@@ -1,6 +1,6 @@
 import { Injectable, OnApplicationBootstrap, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
 import { JobListOptions, JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types';
-import { PaginatedList } from '@vendure/common/lib/shared-types';
+import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
 
 import { ConfigService } from '../config/config.service';
 import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
@@ -72,7 +72,7 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
      * Gets a job by id. The implementation is handled by the configured
      * {@link JobQueueStrategy}.
      */
-    getJob(id: string): Promise<Job | undefined> {
+    getJob(id: ID): Promise<Job | undefined> {
         return this.jobQueueStrategy.findOne(id);
     }
 

+ 10 - 2
packages/core/src/job-queue/job-queue.ts

@@ -80,11 +80,19 @@ export class JobQueue<Data extends JobData<Data> = {}> {
         this.running = false;
         clearTimeout(this.timer);
         const start = +new Date();
-        const maxTimeout = 5000;
+        // Wait for 2 seconds to allow running jobs to complete
+        const maxTimeout = 2000;
         return new Promise((resolve) => {
-            const pollActiveJobs = () => {
+            const pollActiveJobs = async () => {
                 const timedOut = +new Date() - start > maxTimeout;
                 if (this.activeJobs.length === 0 || timedOut) {
+                    // if there are any incomplete jobs after the 2 second
+                    // wait period, set them back to "pending" so they can
+                    // be re-run on next bootstrap.
+                    for (const job of this.activeJobs) {
+                        job.defer();
+                        await this.jobQueueStrategy.update(job);
+                    }
                     resolve();
                 } else {
                     setTimeout(pollActiveJobs, 50);

+ 12 - 0
packages/core/src/job-queue/job.ts

@@ -161,6 +161,18 @@ export class Job<T extends JobData<T> = any> {
         this.fireEvent('fail');
     }
 
+    /**
+     * @description
+     * Sets a RUNNING job back to PENDING. Should be used when the JobQueue is being
+     * destroyed before the job has been completed.
+     */
+    defer() {
+        if (this._state === JobState.RUNNING) {
+            this._state = JobState.PENDING;
+            this._attempts = 0;
+        }
+    }
+
     /**
      * @description
      * Used to register event handlers for job events