Explorar el Código

feat(core): Implement internal health check for worker

Relates to #994
Michael Bromley hace 4 años
padre
commit
812b2cbbd0

+ 10 - 2
packages/core/src/health-check/health-check.module.ts

@@ -3,14 +3,17 @@ import { TerminusModule, TypeOrmHealthIndicator } from '@nestjs/terminus';
 
 import { ConfigModule } from '../config/config.module';
 import { ConfigService } from '../config/config.service';
+import { isInspectableJobQueueStrategy } from '../config/job-queue/inspectable-job-queue-strategy';
+import { JobQueueModule } from '../job-queue/job-queue.module';
 
 import { HealthCheckRegistryService } from './health-check-registry.service';
 import { HealthController } from './health-check.controller';
+import { WorkerHealthIndicator } from './worker-health-indicator';
 
 @Module({
-    imports: [TerminusModule, ConfigModule],
+    imports: [TerminusModule, ConfigModule, JobQueueModule],
     controllers: [HealthController],
-    providers: [HealthCheckRegistryService],
+    providers: [HealthCheckRegistryService, WorkerHealthIndicator],
     exports: [HealthCheckRegistryService],
 })
 export class HealthCheckModule {
@@ -18,8 +21,13 @@ export class HealthCheckModule {
         private configService: ConfigService,
         private healthCheckRegistryService: HealthCheckRegistryService,
         private typeOrm: TypeOrmHealthIndicator,
+        private worker: WorkerHealthIndicator,
     ) {
         // Register the default health checks for database and worker
         this.healthCheckRegistryService.registerIndicatorFunction([() => this.typeOrm.pingCheck('database')]);
+        const { jobQueueStrategy } = this.configService.jobQueueOptions;
+        if (isInspectableJobQueueStrategy(jobQueueStrategy)) {
+            this.healthCheckRegistryService.registerIndicatorFunction([() => this.worker.isHealthy()]);
+        }
     }
 }

+ 56 - 0
packages/core/src/health-check/worker-health-indicator.ts

@@ -0,0 +1,56 @@
+import { Injectable, OnModuleInit } from '@nestjs/common';
+import { HealthCheckError, HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus';
+
+import { ConfigService } from '../config/config.service';
+import { isInspectableJobQueueStrategy } from '../config/job-queue/inspectable-job-queue-strategy';
+import { JobQueue } from '../job-queue/job-queue';
+import { JobQueueService } from '../job-queue/job-queue.service';
+
+export const WORKER_HEALTH_QUEUE_NAME = 'check-worker-health';
+
+@Injectable()
+export class WorkerHealthIndicator extends HealthIndicator implements OnModuleInit {
+    private queue: JobQueue | undefined;
+    constructor(private jobQueueService: JobQueueService, private configService: ConfigService) {
+        super();
+    }
+
+    async onModuleInit() {
+        const { jobQueueStrategy } = this.configService.jobQueueOptions;
+        if (isInspectableJobQueueStrategy(jobQueueStrategy)) {
+            this.queue = await this.jobQueueService.createQueue({
+                name: WORKER_HEALTH_QUEUE_NAME,
+                process: async job => {
+                    return { workerPid: process.pid };
+                },
+            });
+        }
+    }
+
+    /**
+     * This health check works by adding a job to the queue and checking whether it got picked up
+     * by a worker.
+     */
+    async isHealthy(): Promise<HealthIndicatorResult> {
+        if (this.queue) {
+            const job = await this.queue.add({});
+            let isHealthy: boolean;
+            try {
+                isHealthy = !!(await job.updates({ timeoutMs: 10000 }).toPromise());
+            } catch (e) {
+                isHealthy = false;
+            }
+            const result = this.getStatus('worker', isHealthy);
+
+            if (isHealthy) {
+                return result;
+            }
+            throw new HealthCheckError('Worker health check failed', result);
+        } else {
+            throw new HealthCheckError(
+                'Current JobQueueStrategy does not support internal health checks',
+                this.getStatus('worker', false),
+            );
+        }
+    }
+}