Просмотр исходного кода

fix(core): Wait for worker tasks to complete on app shutdown

Michael Bromley 6 лет назад
Родитель
Сommit
2a9fb0b952

+ 11 - 4
packages/core/src/app.module.ts

@@ -1,11 +1,11 @@
-import { MiddlewareConsumer, Module, NestModule, OnModuleDestroy } from '@nestjs/common';
+import { MiddlewareConsumer, Module, NestModule, OnApplicationShutdown, OnModuleDestroy } from '@nestjs/common';
 import cookieSession = require('cookie-session');
 import { RequestHandler } from 'express';
-import { GraphQLDateTime } from 'graphql-iso-date';
 
 import { ApiModule } from './api/api.module';
 import { ConfigModule } from './config/config.module';
 import { ConfigService } from './config/config.service';
+import { Logger } from './config/logger/vendure-logger';
 import { validateCustomFieldsConfig } from './entity/custom-entity-fields';
 import { I18nModule } from './i18n/i18n.module';
 import { I18nService } from './i18n/i18n.service';
@@ -13,8 +13,9 @@ import { I18nService } from './i18n/i18n.service';
 @Module({
     imports: [ConfigModule, I18nModule, ApiModule],
 })
-export class AppModule implements NestModule, OnModuleDestroy {
-    constructor(private configService: ConfigService, private i18nService: I18nService) {}
+export class AppModule implements NestModule, OnModuleDestroy, OnApplicationShutdown {
+    constructor(private configService: ConfigService,
+                private i18nService: I18nService) {}
 
     configure(consumer: MiddlewareConsumer) {
         const { adminApiPath, shopApiPath } = this.configService;
@@ -51,6 +52,12 @@ export class AppModule implements NestModule, OnModuleDestroy {
         }
     }
 
+    onApplicationShutdown(signal?: string) {
+        if (signal) {
+            Logger.info('Received shutdown signal:' + signal);
+        }
+    }
+
     /**
      * Groups middleware handlers together in an object with the route as the key.
      */

+ 2 - 0
packages/core/src/bootstrap.ts

@@ -36,6 +36,7 @@ export async function bootstrap(userConfig: Partial<VendureConfig>): Promise<INe
     app.useLogger(new Logger());
     await runPluginOnBootstrapMethods(config, app);
     await app.listen(config.port, config.hostname);
+    app.enableShutdownHooks();
     if (config.workerOptions.runInMainProcess) {
         const worker = await bootstrapWorkerInternal(config);
         Logger.warn(`Worker is running in main process. This is not recommended for production.`);
@@ -77,6 +78,7 @@ async function bootstrapWorkerInternal(userConfig: Partial<VendureConfig>): Prom
     });
     DefaultLogger.restoreOriginalLogLevel();
     workerApp.useLogger(new Logger());
+    workerApp.enableShutdownHooks();
     await workerApp.listenAsync();
     workerWelcomeMessage(config);
     return workerApp;

+ 1 - 3
packages/core/src/plugin/plugin.module.ts

@@ -1,5 +1,5 @@
 import { DynamicModule, Module } from '@nestjs/common';
-import { ClientProxyFactory, ClientsModule, Transport } from '@nestjs/microservices';
+import { ClientProxyFactory } from '@nestjs/microservices';
 import { Type } from '@vendure/common/lib/shared-types';
 import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 
@@ -55,11 +55,9 @@ export class PluginModule {
     }
 
     static forWorker(): DynamicModule {
-        const controllers = getWorkerControllers();
         return {
             module: PluginModule,
             imports: [ServiceModule.forWorker()],
-            controllers,
         };
     }
 }

+ 25 - 0
packages/core/src/worker/message-interceptor.ts

@@ -0,0 +1,25 @@
+import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
+import { Observable } from 'rxjs';
+import { finalize, tap } from 'rxjs/operators';
+
+import { WorkerMonitor } from './worker-monitor';
+
+/**
+ * This interceptor is used to keep track of open worker tasks, so that the WorkerModule
+ * is not allowed to be destroyed while tasks are in progress.
+ */
+@Injectable()
+export class MessageInterceptor implements NestInterceptor {
+    constructor(private monitor: WorkerMonitor) {}
+
+    intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
+        this.monitor.increment();
+        return next
+            .handle()
+            .pipe(
+                finalize(() => {
+                    this.monitor.decrement();
+                }),
+            );
+    }
+}

+ 37 - 0
packages/core/src/worker/worker-monitor.ts

@@ -0,0 +1,37 @@
+import { Injectable } from '@nestjs/common';
+import { BehaviorSubject } from 'rxjs';
+import { debounceTime, takeWhile, tap } from 'rxjs/operators';
+
+import { Logger } from '../config/logger/vendure-logger';
+
+/**
+ * This service is responsible for keeping track of incomplete worker tasks
+ * to ensure that the WorkerModule is not destroyed before active tasks complete.
+ */
+@Injectable()
+export class WorkerMonitor {
+    openTasks = new BehaviorSubject<number>(0);
+    get openTaskCount(): number {
+        return this.openTasks.value;
+    }
+    increment() {
+        this.openTasks.next(this.openTasks.value + 1);
+    }
+    decrement() {
+        this.openTasks.next(this.openTasks.value - 1);
+    }
+    waitForOpenTasksToComplete(): Promise<number> {
+        if (0 < this.openTaskCount) {
+            Logger.info('Waiting for open worker tasks to complete...');
+        }
+        return this.openTasks.asObservable().pipe(
+            tap(count => {
+                if (0 < count) {
+                    Logger.info(`${count} tasks open`);
+                }
+            }),
+            debounceTime(100),
+            takeWhile(value => value > 0),
+        ).toPromise();
+    }
+}

+ 36 - 2
packages/core/src/worker/worker.module.ts

@@ -1,14 +1,48 @@
-import { Module } from '@nestjs/common';
+import { Module, OnApplicationShutdown, OnModuleDestroy } from '@nestjs/common';
+import { APP_INTERCEPTOR } from '@nestjs/core';
+import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 
+import { getConfig } from '../config/config-helpers';
 import { ConfigModule } from '../config/config.module';
+import { Logger } from '../config/logger/vendure-logger';
 import { PluginModule } from '../plugin/plugin.module';
 import { ServiceModule } from '../service/service.module';
 
+import { MessageInterceptor } from './message-interceptor';
+import { WorkerMonitor } from './worker-monitor';
+
 @Module({
     imports: [
         ConfigModule,
         ServiceModule.forWorker(),
         PluginModule.forWorker(),
     ],
+    providers: [
+        WorkerMonitor,
+        {
+            provide: APP_INTERCEPTOR,
+            useClass: MessageInterceptor,
+        },
+    ],
+    controllers: getWorkerControllers(),
 })
-export class WorkerModule {}
+export class WorkerModule implements OnModuleDestroy, OnApplicationShutdown {
+    constructor(private monitor: WorkerMonitor) {}
+    onModuleDestroy() {
+        return this.monitor.waitForOpenTasksToComplete();
+    }
+
+    onApplicationShutdown(signal?: string) {
+        if (signal) {
+            Logger.info('Worker Received shutdown signal:' + signal);
+        }
+    }
+}
+
+function getWorkerControllers() {
+    const plugins = getConfig().plugins;
+    return plugins
+        .map(p => (p.defineWorkers ? p.defineWorkers() : undefined))
+        .filter(notNullOrUndefined)
+        .reduce((flattened, controllers) => flattened.concat(controllers), []);
+}