Răsfoiți Sursa

Removed Worker

- Replaced with simple Nest.context from which the job queue can be started
- Replaced external servers with NestModule middlewares
- No need for services to be aware of server vs worker
- Removed custom lifecycle as its no longer needed
Fred Cox 4 ani în urmă
părinte
comite
5db92d15e9
70 a modificat fișierele cu 338 adăugiri și 1417 ștergeri
  1. 61 111
      packages/admin-ui-plugin/src/plugin.ts
  2. 9 41
      packages/asset-server-plugin/src/plugin.ts
  3. 1 9
      packages/asset-server-plugin/src/types.ts
  4. 69 0
      packages/core/e2e/__snapshots__/collection.e2e-spec.ts.snap
  5. 1 0
      packages/core/e2e/collection.e2e-spec.ts
  6. 1 0
      packages/core/e2e/default-search-plugin.e2e-spec.ts
  7. 3 3
      packages/core/e2e/fixtures/test-plugins/list-query-plugin.ts
  8. 2 59
      packages/core/e2e/fixtures/test-plugins/with-all-lifecycle-hooks.ts
  9. 0 35
      packages/core/e2e/fixtures/test-plugins/with-config-and-bootstrap.ts
  10. 22 0
      packages/core/e2e/fixtures/test-plugins/with-config.ts
  11. 2 2
      packages/core/e2e/fixtures/test-plugins/with-job-queue.ts
  12. 0 46
      packages/core/e2e/fixtures/test-plugins/with-worker-controller.ts
  13. 0 3
      packages/core/e2e/job-queue.e2e-spec.ts
  14. 1 1
      packages/core/e2e/order-item-price-calculation-strategy.e2e-spec.ts
  15. 6 76
      packages/core/e2e/plugin.e2e-spec.ts
  16. 1 1
      packages/core/e2e/shop-catalog.e2e-spec.ts
  17. 1 1
      packages/core/src/api/common/request-context.ts
  18. 1 9
      packages/core/src/app.module.ts
  19. 0 0
      packages/core/src/async/async-observable.ts
  20. 1 0
      packages/core/src/async/index.ts
  21. 8 115
      packages/core/src/bootstrap.ts
  22. 7 7
      packages/core/src/cli/populate.ts
  23. 3 8
      packages/core/src/common/types/injectable-strategy.ts
  24. 5 25
      packages/core/src/config/config.module.ts
  25. 0 1
      packages/core/src/config/config.service.mock.ts
  26. 0 9
      packages/core/src/config/config.service.ts
  27. 0 7
      packages/core/src/config/default-config.ts
  28. 1 1
      packages/core/src/config/job-queue/job-queue-strategy.ts
  29. 0 60
      packages/core/src/config/vendure-config.ts
  30. 1 2
      packages/core/src/index.ts
  31. 1 0
      packages/core/src/job-queue/constants.ts
  32. 15 19
      packages/core/src/job-queue/job-queue.service.spec.ts
  33. 24 27
      packages/core/src/job-queue/job-queue.service.ts
  34. 3 13
      packages/core/src/job-queue/job-queue.ts
  35. 2 2
      packages/core/src/job-queue/polling-job-queue-strategy.ts
  36. 1 1
      packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts
  37. 2 2
      packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts
  38. 0 4
      packages/core/src/plugin/plugin-common.module.ts
  39. 1 13
      packages/core/src/plugin/plugin-metadata.ts
  40. 2 93
      packages/core/src/plugin/plugin.module.ts
  41. 0 85
      packages/core/src/plugin/vendure-plugin.ts
  42. 0 2
      packages/core/src/process-context/index.ts
  43. 0 22
      packages/core/src/process-context/process-context.module.ts
  44. 0 30
      packages/core/src/process-context/process-context.ts
  45. 0 32
      packages/core/src/service/service.module.ts
  46. 2 2
      packages/core/src/service/services/collection.service.ts
  47. 0 1
      packages/core/src/worker/constants.ts
  48. 0 3
      packages/core/src/worker/index.ts
  49. 0 25
      packages/core/src/worker/message-interceptor.ts
  50. 0 44
      packages/core/src/worker/types.ts
  51. 0 37
      packages/core/src/worker/worker-monitor.ts
  52. 0 27
      packages/core/src/worker/worker-service.module.ts
  53. 0 41
      packages/core/src/worker/worker.module.ts
  54. 0 62
      packages/core/src/worker/worker.service.ts
  55. 5 1
      packages/create/templates/index-worker.hbs
  56. 5 3
      packages/create/templates/vendure-config.hbs
  57. 2 2
      packages/dev-server/dev-config.ts
  58. 10 6
      packages/dev-server/index-worker.ts
  59. 0 3
      packages/dev-server/populate-dev-server.ts
  60. 7 25
      packages/dev-server/test-plugins/google-auth/google-auth-plugin.ts
  61. 6 39
      packages/dev-server/test-plugins/keycloak-auth/keycloak-auth-plugin.ts
  62. 2 2
      packages/elasticsearch-plugin/src/elasticsearch-index.service.ts
  63. 4 8
      packages/email-plugin/src/dev-mailbox.ts
  64. 0 2
      packages/email-plugin/src/plugin.spec.ts
  65. 15 41
      packages/email-plugin/src/plugin.ts
  66. 3 10
      packages/email-plugin/src/types.ts
  67. 4 1
      packages/pub-sub-plugin/src/pub-sub-job-queue-strategy.ts
  68. 0 7
      packages/testing/src/config/test-config.ts
  69. 7 7
      packages/testing/src/data-population/populate-for-testing.ts
  70. 8 41
      packages/testing/src/test-server.ts

+ 61 - 111
packages/admin-ui-plugin/src/plugin.ts

@@ -1,3 +1,4 @@
+import { MiddlewareConsumer, NestModule, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';
 import { DEFAULT_AUTH_TOKEN_HEADER_KEY } from '@vendure/common/lib/shared-constants';
 import {
     AdminUiAppConfig,
@@ -5,20 +6,9 @@ import {
     AdminUiConfig,
     Type,
 } from '@vendure/common/lib/shared-types';
-import {
-    ConfigService,
-    createProxyHandler,
-    LanguageCode,
-    Logger,
-    OnVendureBootstrap,
-    OnVendureClose,
-    PluginCommonModule,
-    RuntimeVendureConfig,
-    VendurePlugin,
-} from '@vendure/core';
+import { ConfigService, createProxyHandler, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
 import express from 'express';
 import fs from 'fs-extra';
-import { Server } from 'http';
 import path from 'path';
 
 import { defaultAvailableLanguages, defaultLanguage, DEFAULT_APP_PATH, loggerCtx } from './constants';
@@ -30,6 +20,11 @@ import { defaultAvailableLanguages, defaultLanguage, DEFAULT_APP_PATH, loggerCtx
  * @docsCategory AdminUiPlugin
  */
 export interface AdminUiPluginOptions {
+    /**
+     * @description
+     * The route to the admin ui.
+     */
+    route: string;
     /**
      * @description
      * The port on which the server will listen. If not
@@ -49,26 +44,6 @@ export interface AdminUiPluginOptions {
      * version, e.g. one pre-compiled with one or more ui extensions.
      */
     app?: AdminUiAppConfig | AdminUiAppDevModeConfig;
-    /**
-     * @description
-     * The hostname of the Vendure server which the admin ui will be making API calls
-     * to. If set to "auto", the admin ui app will determine the hostname from the
-     * current location (i.e. `window.location.hostname`).
-     *
-     * @deprecated Use the adminUiConfig property instead
-     * @default 'auto'
-     */
-    apiHost?: string | 'auto';
-    /**
-     * @description
-     * The port of the Vendure server which the admin ui will be making API calls
-     * to. If set to "auto", the admin ui app will determine the port from the
-     * current location (i.e. `window.location.port`).
-     *
-     * @deprecated Use the adminUiConfig property instead
-     * @default 'auto'
-     */
-    apiPort?: number | 'auto';
     /**
      * @description
      * Allows the contents of the `vendure-ui-config.json` file to be set, e.g.
@@ -109,11 +84,9 @@ export interface AdminUiPluginOptions {
 @VendurePlugin({
     imports: [PluginCommonModule],
     providers: [],
-    configuration: config => AdminUiPlugin.configure(config),
 })
-export class AdminUiPlugin implements OnVendureBootstrap, OnVendureClose {
+export class AdminUiPlugin implements NestModule {
     private static options: AdminUiPluginOptions;
-    private server: Server;
 
     constructor(private configService: ConfigService) {}
 
@@ -126,56 +99,8 @@ export class AdminUiPlugin implements OnVendureBootstrap, OnVendureClose {
         return AdminUiPlugin;
     }
 
-    /** @internal */
-    static async configure(config: RuntimeVendureConfig): Promise<RuntimeVendureConfig> {
-        const route = 'admin';
-        const { app } = this.options;
-        const appWatchMode = this.isDevModeApp(app);
-        let port: number;
-        if (this.isDevModeApp(app)) {
-            port = app.port;
-        } else {
-            port = this.options.port;
-        }
-        config.apiOptions.middleware.push({
-            handler: createProxyHandler({
-                hostname: this.options.hostname,
-                port,
-                route: 'admin',
-                label: 'Admin UI',
-                basePath: appWatchMode ? 'admin' : undefined,
-            }),
-            route,
-        });
-        if (this.isDevModeApp(app)) {
-            config.apiOptions.middleware.push({
-                handler: createProxyHandler({
-                    hostname: this.options.hostname,
-                    port,
-                    route: 'sockjs-node',
-                    label: 'Admin UI live reload',
-                    basePath: 'sockjs-node',
-                }),
-                route: 'sockjs-node',
-            });
-        }
-        return config;
-    }
-
-    /** @internal */
-    async onVendureBootstrap() {
-        const { apiHost, apiPort, port, app, adminUiConfig } = AdminUiPlugin.options;
-        // TODO: Remove in next minor version (0.11.0)
-        if (apiHost || apiPort) {
-            Logger.warn(
-                `The "apiHost" and "apiPort" options are deprecated and will be removed in a future version.`,
-                loggerCtx,
-            );
-            Logger.warn(
-                `Use the "adminUiConfig.apiHost", "adminUiConfig.apiPort" properties instead.`,
-                loggerCtx,
-            );
-        }
+    async configure(consumer: MiddlewareConsumer) {
+        const { app, hostname, port, route, adminUiConfig } = AdminUiPlugin.options;
         const adminUiAppPath = AdminUiPlugin.isDevModeApp(app)
             ? path.join(app.sourcePath, 'src')
             : (app && app.path) || DEFAULT_APP_PATH;
@@ -186,14 +111,45 @@ export class AdminUiPlugin implements OnVendureBootstrap, OnVendureClose {
             return this.overwriteAdminUiConfig(adminUiConfigPath, uiConfig);
         };
 
-        if (!AdminUiPlugin.isDevModeApp(app)) {
-            // If not in dev mode, start a static server for the compiled app
-            const adminUiServer = express();
-            adminUiServer.use(express.static(adminUiAppPath));
-            adminUiServer.use((req, res) => {
-                res.sendFile(path.join(adminUiAppPath, 'index.html'));
-            });
-            this.server = adminUiServer.listen(AdminUiPlugin.options.port);
+        if (AdminUiPlugin.isDevModeApp(app)) {
+            Logger.info('Creating admin ui middleware (dev mode)', loggerCtx);
+            consumer
+                .apply(
+                    createProxyHandler({
+                        hostname,
+                        port,
+                        route,
+                        label: 'Admin UI',
+                        basePath: route,
+                    }),
+                )
+                .forRoutes(route);
+            consumer
+                .apply(
+                    createProxyHandler({
+                        hostname,
+                        port,
+                        route: 'sockjs-node',
+                        label: 'Admin UI live reload',
+                        basePath: 'sockjs-node',
+                    }),
+                )
+                .forRoutes('sockjs-node');
+
+            Logger.info(`Compiling Admin UI app in development mode`, loggerCtx);
+            app.compile().then(
+                () => {
+                    Logger.info(`Admin UI compiling and watching for changes...`, loggerCtx);
+                },
+                (err: any) => {
+                    Logger.error(`Failed to compile: ${err}`, loggerCtx, err.stack);
+                },
+            );
+            await overwriteConfig();
+        } else {
+            Logger.info('Creating admin ui middleware (prod mode)', loggerCtx);
+            consumer.apply(await this.createStaticServer(app)).forRoutes(route);
+
             if (app && typeof app.compile === 'function') {
                 Logger.info(`Compiling Admin UI app in production mode...`, loggerCtx);
                 app.compile()
@@ -209,25 +165,19 @@ export class AdminUiPlugin implements OnVendureBootstrap, OnVendureClose {
             } else {
                 await overwriteConfig();
             }
-        } else {
-            Logger.info(`Compiling Admin UI app in development mode`, loggerCtx);
-            app.compile().then(
-                () => {
-                    Logger.info(`Admin UI compiling and watching for changes...`, loggerCtx);
-                },
-                (err: any) => {
-                    Logger.error(`Failed to compile: ${err}`, loggerCtx, err.stack);
-                },
-            );
-            await overwriteConfig();
         }
     }
 
-    /** @internal */
-    async onVendureClose(): Promise<void> {
-        if (this.server) {
-            await new Promise(resolve => this.server.close(() => resolve()));
-        }
+    private async createStaticServer(app?: AdminUiAppConfig) {
+        const adminUiAppPath = (app && app.path) || DEFAULT_APP_PATH;
+
+        const adminUiServer = express.Router();
+        adminUiServer.use(express.static(adminUiAppPath));
+        adminUiServer.use((req, res) => {
+            res.sendFile(path.join(adminUiAppPath, 'index.html'));
+        });
+
+        return adminUiServer;
     }
 
     /**
@@ -245,8 +195,8 @@ export class AdminUiPlugin implements OnVendureBootstrap, OnVendureClose {
         };
         return {
             adminApiPath: propOrDefault('adminApiPath', this.configService.apiOptions.adminApiPath),
-            apiHost: propOrDefault('apiHost', AdminUiPlugin.options.apiHost || 'auto'),
-            apiPort: propOrDefault('apiPort', AdminUiPlugin.options.apiPort || 'auto'),
+            apiHost: propOrDefault('apiHost', 'auto'),
+            apiPort: propOrDefault('apiPort', 'auto'),
             tokenMethod: propOrDefault('tokenMethod', authOptions.tokenMethod || 'cookie'),
             authTokenHeaderKey: propOrDefault(
                 'authTokenHeaderKey',

+ 9 - 41
packages/asset-server-plugin/src/plugin.ts

@@ -1,12 +1,8 @@
-import { DNSHealthIndicator, TerminusModule } from '@nestjs/terminus';
+import { MiddlewareConsumer, NestModule, OnApplicationBootstrap } from '@nestjs/common';
 import { Type } from '@vendure/common/lib/shared-types';
 import {
     AssetStorageStrategy,
-    createProxyHandler,
-    HealthCheckRegistryService,
     Logger,
-    OnVendureBootstrap,
-    OnVendureClose,
     PluginCommonModule,
     RuntimeVendureConfig,
     VendurePlugin,
@@ -15,7 +11,6 @@ import { createHash } from 'crypto';
 import express, { NextFunction, Request, Response } from 'express';
 import { fromBuffer } from 'file-type';
 import fs from 'fs-extra';
-import { Server } from 'http';
 import path from 'path';
 
 import { loggerCtx } from './constants';
@@ -124,11 +119,10 @@ import { AssetServerOptions, ImageTransformPreset } from './types';
  * @docsCategory AssetServerPlugin
  */
 @VendurePlugin({
-    imports: [PluginCommonModule, TerminusModule],
+    imports: [PluginCommonModule],
     configuration: config => AssetServerPlugin.configure(config),
 })
-export class AssetServerPlugin implements OnVendureBootstrap, OnVendureClose {
-    private server: Server;
+export class AssetServerPlugin implements NestModule, OnApplicationBootstrap {
     private static assetStorage: AssetStorageStrategy;
     private readonly cacheDir = 'cache';
     private presets: ImageTransformPreset[] = [
@@ -140,11 +134,6 @@ export class AssetServerPlugin implements OnVendureBootstrap, OnVendureClose {
     ];
     private static options: AssetServerOptions;
 
-    constructor(
-        private healthCheckRegistryService: HealthCheckRegistryService,
-        private dns: DNSHealthIndicator,
-    ) {}
-
     /**
      * @description
      * Set the plugin options.
@@ -166,15 +155,11 @@ export class AssetServerPlugin implements OnVendureBootstrap, OnVendureClose {
         config.assetOptions.assetStorageStrategy = this.assetStorage;
         config.assetOptions.assetNamingStrategy =
             this.options.namingStrategy || new HashedAssetNamingStrategy();
-        config.apiOptions.middleware.push({
-            handler: createProxyHandler({ ...this.options, label: 'Asset Server' }),
-            route: this.options.route,
-        });
         return config;
     }
 
     /** @internal */
-    onVendureBootstrap(): void | Promise<void> {
+    onApplicationBootstrap(): void | Promise<void> {
         if (AssetServerPlugin.options.presets) {
             for (const preset of AssetServerPlugin.options.presets) {
                 const existingIndex = this.presets.findIndex(p => p.name === preset.name);
@@ -188,37 +173,20 @@ export class AssetServerPlugin implements OnVendureBootstrap, OnVendureClose {
 
         const cachePath = path.join(AssetServerPlugin.options.assetUploadDir, this.cacheDir);
         fs.ensureDirSync(cachePath);
-        this.createAssetServer();
-        const { hostname, port } = AssetServerPlugin.options;
     }
 
-    /** @internal */
-    onVendureClose(): Promise<void> {
-        return new Promise(resolve => {
-            this.server.close(() => resolve());
-        });
+    configure(consumer: MiddlewareConsumer) {
+        Logger.info('Creating asset server middleware', 'AssetServerPlugin');
+        consumer.apply(this.createAssetServer()).forRoutes(AssetServerPlugin.options.route);
     }
 
     /**
      * Creates the image server instance
      */
     private createAssetServer() {
-        const assetServer = express();
-        assetServer.get('/health', (req, res) => {
-            res.send('ok');
-        });
+        const assetServer = express.Router();
         assetServer.use(this.sendAsset(), this.generateTransformedImage());
-
-        this.server = assetServer.listen(AssetServerPlugin.options.port, () => {
-            const addressInfo = this.server.address();
-            if (addressInfo && typeof addressInfo !== 'string') {
-                const { address, port } = addressInfo;
-                Logger.info(`Asset server listening on "http://localhost:${port}"`, loggerCtx);
-                this.healthCheckRegistryService.registerIndicatorFunction(() =>
-                    this.dns.pingCheck('asset-server', `http://localhost:${port}/health`),
-                );
-            }
-        });
+        return assetServer;
     }
 
     /**

+ 1 - 9
packages/asset-server-plugin/src/types.ts

@@ -44,17 +44,9 @@ export interface ImageTransformPreset {
  * @docsCategory AssetServerPlugin
  */
 export interface AssetServerOptions {
-    hostname?: string;
     /**
      * @description
-     * The local port that the server will run on. Note that the AssetServerPlugin
-     * includes a proxy server which allows the asset server to be accessed on the same
-     * port as the main Vendure server.
-     */
-    port: number;
-    /**
-     * @description
-     * The proxy route to the asset server.
+     * The route to the asset server.
      */
     route: string;
     /**

+ 69 - 0
packages/core/e2e/__snapshots__/collection.e2e-spec.ts.snap

@@ -137,3 +137,72 @@ Object {
   ],
 }
 `;
+
+exports[`Collection resolver updateCollection updating existing assets 1`] = `
+Object {
+  "assets": Array [
+    Object {
+      "fileSize": 1680,
+      "id": "T_1",
+      "mimeType": "image/jpeg",
+      "name": "derick-david-409858-unsplash.jpg",
+      "preview": "test-url/test-assets/derick-david-409858-unsplash__preview.jpg",
+      "source": "test-url/test-assets/derick-david-409858-unsplash.jpg",
+      "type": "IMAGE",
+    },
+    Object {
+      "fileSize": 1680,
+      "id": "T_3",
+      "mimeType": "image/jpeg",
+      "name": "florian-olivo-1166419-unsplash.jpg",
+      "preview": "test-url/test-assets/florian-olivo-1166419-unsplash__preview.jpg",
+      "source": "test-url/test-assets/florian-olivo-1166419-unsplash.jpg",
+      "type": "IMAGE",
+    },
+  ],
+  "children": Array [],
+  "description": "Apple stuff ",
+  "featuredAsset": Object {
+    "fileSize": 1680,
+    "id": "T_1",
+    "mimeType": "image/jpeg",
+    "name": "derick-david-409858-unsplash.jpg",
+    "preview": "test-url/test-assets/derick-david-409858-unsplash__preview.jpg",
+    "source": "test-url/test-assets/derick-david-409858-unsplash.jpg",
+    "type": "IMAGE",
+  },
+  "filters": Array [
+    Object {
+      "args": Array [
+        Object {
+          "name": "facetValueIds",
+          "value": "[\\"T_3\\"]",
+        },
+        Object {
+          "name": "containsAny",
+          "value": "false",
+        },
+      ],
+      "code": "facet-value-filter",
+    },
+  ],
+  "id": "T_5",
+  "isPrivate": false,
+  "languageCode": "en",
+  "name": "Pear",
+  "parent": Object {
+    "id": "T_4",
+    "name": "Computers",
+  },
+  "slug": "apple-stuff",
+  "translations": Array [
+    Object {
+      "description": "Apple stuff ",
+      "id": "T_5",
+      "languageCode": "en",
+      "name": "Pear",
+      "slug": "apple-stuff",
+    },
+  ],
+}
+`;

+ 1 - 0
packages/core/e2e/collection.e2e-spec.ts

@@ -3,6 +3,7 @@ import { ROOT_COLLECTION_NAME } from '@vendure/common/lib/shared-constants';
 import {
     DefaultJobQueuePlugin,
     facetValueCollectionFilter,
+    JobQueueService,
     variantNameCollectionFilter,
 } from '@vendure/core';
 import { createTestEnvironment } from '@vendure/testing';

+ 1 - 0
packages/core/e2e/default-search-plugin.e2e-spec.ts

@@ -4,6 +4,7 @@ import {
     DefaultJobQueuePlugin,
     DefaultSearchPlugin,
     facetValueCollectionFilter,
+    JobQueueService,
     mergeConfig,
 } from '@vendure/core';
 import { createTestEnvironment, E2E_DEFAULT_CHANNEL_TOKEN, SimpleGraphQLClient } from '@vendure/testing';

+ 3 - 3
packages/core/e2e/fixtures/test-plugins/list-query-plugin.ts

@@ -1,3 +1,4 @@
+import { OnApplicationBootstrap } from '@nestjs/common';
 import { Args, Query, Resolver } from '@nestjs/graphql';
 import { LanguageCode } from '@vendure/common/lib/generated-types';
 import { DeepPartial, ID } from '@vendure/common/lib/shared-types';
@@ -5,7 +6,6 @@ import {
     Ctx,
     ListQueryBuilder,
     LocaleString,
-    OnVendureBootstrap,
     PluginCommonModule,
     RequestContext,
     TransactionalConnection,
@@ -153,10 +153,10 @@ const adminApiExtensions = gql`
         resolvers: [ListQueryResolver],
     },
 })
-export class ListQueryPlugin implements OnVendureBootstrap {
+export class ListQueryPlugin implements OnApplicationBootstrap {
     constructor(private connection: TransactionalConnection) {}
 
-    async onVendureBootstrap() {
+    async onApplicationBootstrap() {
         const count = await this.connection.getRepository(TestEntity).count();
         if (count === 0) {
             const testEntities = await this.connection.getRepository(TestEntity).save([

+ 2 - 59
packages/core/e2e/fixtures/test-plugins/with-all-lifecycle-hooks.ts

@@ -1,37 +1,10 @@
 import { INestApplication, INestMicroservice } from '@nestjs/common';
-import {
-    OnVendureBootstrap,
-    OnVendureClose,
-    OnVendureWorkerBootstrap,
-    OnVendureWorkerClose,
-} from '@vendure/core';
 
-export class TestPluginWithAllLifecycleHooks
-    implements OnVendureBootstrap, OnVendureWorkerBootstrap, OnVendureClose, OnVendureWorkerClose {
+export class TestPluginWithAllLifecycleHooks {
     private static onConstructorFn: any;
-    private static onBeforeBootstrapFn: any;
-    private static onBeforeWorkerBootstrapFn: any;
-    private static onBootstrapFn: any;
-    private static onWorkerBootstrapFn: any;
-    private static onCloseFn: any;
-    private static onWorkerCloseFn: any;
 
-    static init(
-        constructorFn: any,
-        beforeBootstrapFn: any,
-        beforeWorkerBootstrapFn: any,
-        bootstrapFn: any,
-        workerBootstrapFn: any,
-        closeFn: any,
-        workerCloseFn: any,
-    ) {
+    static init(constructorFn: any) {
         this.onConstructorFn = constructorFn;
-        this.onBeforeBootstrapFn = beforeBootstrapFn;
-        this.onBeforeWorkerBootstrapFn = beforeWorkerBootstrapFn;
-        this.onBootstrapFn = bootstrapFn;
-        this.onWorkerBootstrapFn = workerBootstrapFn;
-        this.onCloseFn = closeFn;
-        this.onWorkerCloseFn = workerCloseFn;
         return this;
     }
 
@@ -39,32 +12,6 @@ export class TestPluginWithAllLifecycleHooks
         TestPluginWithAllLifecycleHooks.onConstructorFn();
     }
 
-    static beforeVendureBootstrap(app: INestApplication): void | Promise<void> {
-        TestPluginWithAllLifecycleHooks.onBeforeBootstrapFn(app);
-    }
-
-    static beforeVendureWorkerBootstrap(app: INestMicroservice): void | Promise<void> {
-        TestPluginWithAllLifecycleHooks.onBeforeWorkerBootstrapFn(app);
-    }
-
-    onVendureBootstrap(): void | Promise<void> {
-        TestPluginWithAllLifecycleHooks.onBootstrapFn();
-    }
-
-    onVendureWorkerBootstrap(): void | Promise<void> {
-        TestPluginWithAllLifecycleHooks.onWorkerBootstrapFn();
-    }
-
-    onVendureClose(): void | Promise<void> {
-        TestPluginWithAllLifecycleHooks.onCloseFn();
-        this.resetSpies();
-    }
-
-    onVendureWorkerClose(): void | Promise<void> {
-        TestPluginWithAllLifecycleHooks.onWorkerCloseFn();
-        this.resetSpies();
-    }
-
     /**
      * This is required because on the first run, the Vendure server will be bootstrapped twice -
      * once to populate the database and the second time for the actual tests. Thus the call counts
@@ -73,9 +20,5 @@ export class TestPluginWithAllLifecycleHooks
      */
     private resetSpies() {
         TestPluginWithAllLifecycleHooks.onConstructorFn.mockClear();
-        TestPluginWithAllLifecycleHooks.onBeforeBootstrapFn.mockClear();
-        TestPluginWithAllLifecycleHooks.onBeforeWorkerBootstrapFn.mockClear();
-        TestPluginWithAllLifecycleHooks.onBootstrapFn.mockClear();
-        TestPluginWithAllLifecycleHooks.onWorkerBootstrapFn.mockClear();
     }
 }

+ 0 - 35
packages/core/e2e/fixtures/test-plugins/with-config-and-bootstrap.ts

@@ -1,35 +0,0 @@
-import { LanguageCode } from '@vendure/common/lib/generated-types';
-import {
-    ConfigModule,
-    ConfigService,
-    OnVendureBootstrap,
-    OnVendureClose,
-    VendurePlugin,
-} from '@vendure/core';
-
-@VendurePlugin({
-    imports: [ConfigModule],
-    configuration: (config) => {
-        // tslint:disable-next-line:no-non-null-assertion
-        config.defaultLanguageCode = LanguageCode.zh;
-        return config;
-    },
-})
-export class TestPluginWithConfigAndBootstrap implements OnVendureBootstrap, OnVendureClose {
-    private static boostrapWasCalled: any;
-
-    static setup(boostrapWasCalled: (arg: any) => void) {
-        TestPluginWithConfigAndBootstrap.boostrapWasCalled = boostrapWasCalled;
-        return TestPluginWithConfigAndBootstrap;
-    }
-
-    constructor(private configService: ConfigService) {}
-
-    onVendureBootstrap() {
-        TestPluginWithConfigAndBootstrap.boostrapWasCalled(this.configService);
-    }
-
-    onVendureClose() {
-        TestPluginWithConfigAndBootstrap.boostrapWasCalled.mockClear();
-    }
-}

+ 22 - 0
packages/core/e2e/fixtures/test-plugins/with-config.ts

@@ -0,0 +1,22 @@
+import { LanguageCode } from '@vendure/common/lib/generated-types';
+import {
+    ConfigModule,
+    ConfigService,
+    OnVendureBootstrap,
+    OnVendureClose,
+    VendurePlugin,
+} from '@vendure/core';
+
+@VendurePlugin({
+    imports: [ConfigModule],
+    configuration: config => {
+        // tslint:disable-next-line:no-non-null-assertion
+        config.defaultLanguageCode = LanguageCode.zh;
+        return config;
+    },
+})
+export class TestPluginWithConfig {
+    static setup() {
+        return TestPluginWithConfig;
+    }
+}

+ 2 - 2
packages/core/e2e/fixtures/test-plugins/with-job-queue.ts

@@ -9,8 +9,8 @@ class TestController implements OnModuleInit {
 
     constructor(private jobQueueService: JobQueueService) {}
 
-    onModuleInit(): any {
-        this.queue = this.jobQueueService.createQueue({
+    async onModuleInit(): Promise<void> {
+        this.queue = await this.jobQueueService.createQueue({
             name: 'test',
             process: job => {
                 return PluginWithJobQueue.jobSubject

+ 0 - 46
packages/core/e2e/fixtures/test-plugins/with-worker-controller.ts

@@ -1,46 +0,0 @@
-import { Controller, Get } from '@nestjs/common';
-import { MessagePattern } from '@nestjs/microservices';
-import {
-    PluginCommonModule,
-    ProcessContext,
-    VendurePlugin,
-    WorkerMessage,
-    WorkerService,
-} from '@vendure/core';
-import { of } from 'rxjs';
-
-class TestWorkerMessage extends WorkerMessage<string, boolean> {
-    static readonly pattern = 'TestWorkerMessage';
-}
-
-@Controller('process-context')
-export class TestProcessContextController {
-    constructor(private processContext: ProcessContext, private workerService: WorkerService) {}
-
-    @Get('server')
-    isServer() {
-        return this.processContext.isServer;
-    }
-
-    @Get('worker')
-    isWorker() {
-        return this.workerService.send(new TestWorkerMessage('hello'));
-    }
-}
-
-@Controller()
-export class TestProcessContextWorkerController {
-    constructor(private processContext: ProcessContext) {}
-
-    @MessagePattern(TestWorkerMessage.pattern)
-    isWorker() {
-        return of(this.processContext.isWorker);
-    }
-}
-
-@VendurePlugin({
-    imports: [PluginCommonModule],
-    controllers: [TestProcessContextController],
-    workers: [TestProcessContextWorkerController],
-})
-export class TestProcessContextPlugin {}

+ 0 - 3
packages/core/e2e/job-queue.e2e-spec.ts

@@ -24,9 +24,6 @@ describe('JobQueue', () => {
     const { server, adminClient } = createTestEnvironment(
         mergeConfig(testConfig, {
             plugins: [DefaultJobQueuePlugin, PluginWithJobQueue],
-            workerOptions: {
-                runInMainProcess: true,
-            },
         }),
     );
 

+ 1 - 1
packages/core/e2e/order-item-price-calculation-strategy.e2e-spec.ts

@@ -1,4 +1,4 @@
-import { DefaultSearchPlugin, mergeConfig } from '@vendure/core';
+import { DefaultSearchPlugin, JobQueueService, mergeConfig } from '@vendure/core';
 import { createTestEnvironment } from '@vendure/testing';
 import gql from 'graphql-tag';
 import path from 'path';

+ 6 - 76
packages/core/e2e/plugin.e2e-spec.ts

@@ -5,44 +5,26 @@ import gql from 'graphql-tag';
 import path from 'path';
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
-import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
+import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
 
 import { TestPluginWithAllLifecycleHooks } from './fixtures/test-plugins/with-all-lifecycle-hooks';
 import { TestAPIExtensionPlugin } from './fixtures/test-plugins/with-api-extensions';
-import { TestPluginWithConfigAndBootstrap } from './fixtures/test-plugins/with-config-and-bootstrap';
+import { TestPluginWithConfig } from './fixtures/test-plugins/with-config';
 import { TestLazyExtensionPlugin } from './fixtures/test-plugins/with-lazy-api-extensions';
 import { TestPluginWithProvider } from './fixtures/test-plugins/with-provider';
 import { TestRestPlugin } from './fixtures/test-plugins/with-rest-controller';
-import { TestProcessContextPlugin } from './fixtures/test-plugins/with-worker-controller';
 
 describe('Plugins', () => {
-    const bootstrapMockFn = jest.fn();
     const onConstructorFn = jest.fn();
-    const beforeBootstrapFn = jest.fn();
-    const beforeWorkerBootstrapFn = jest.fn();
-    const onBootstrapFn = jest.fn();
-    const onWorkerBootstrapFn = jest.fn();
-    const onCloseFn = jest.fn();
-    const onWorkerCloseFn = jest.fn();
-
     const { server, adminClient, shopClient } = createTestEnvironment({
         ...testConfig,
         plugins: [
-            TestPluginWithAllLifecycleHooks.init(
-                onConstructorFn,
-                beforeBootstrapFn,
-                beforeWorkerBootstrapFn,
-                onBootstrapFn,
-                onWorkerBootstrapFn,
-                onCloseFn,
-                onWorkerCloseFn,
-            ),
-            TestPluginWithConfigAndBootstrap.setup(bootstrapMockFn),
+            TestPluginWithAllLifecycleHooks.init(onConstructorFn),
+            TestPluginWithConfig.setup(),
             TestAPIExtensionPlugin,
             TestPluginWithProvider,
             TestLazyExtensionPlugin,
             TestRestPlugin,
-            TestProcessContextPlugin,
         ],
     });
 
@@ -60,30 +42,11 @@ describe('Plugins', () => {
     });
 
     it('constructs one instance for each process', () => {
-        expect(onConstructorFn).toHaveBeenCalledTimes(2);
-    });
-
-    it('calls beforeVendureBootstrap', () => {
-        expect(beforeBootstrapFn).toHaveBeenCalledTimes(1);
-        expect(beforeBootstrapFn).toHaveBeenCalledWith(server.app);
-    });
-
-    it('calls beforeVendureWorkerBootstrap', () => {
-        expect(beforeWorkerBootstrapFn).toHaveBeenCalledTimes(1);
-        expect(beforeWorkerBootstrapFn).toHaveBeenCalledWith(server.worker);
-    });
-
-    it('calls onVendureBootstrap', () => {
-        expect(onBootstrapFn).toHaveBeenCalledTimes(1);
-    });
-
-    it('calls onWorkerVendureBootstrap', () => {
-        expect(onWorkerBootstrapFn).toHaveBeenCalledTimes(1);
+        expect(onConstructorFn).toHaveBeenCalledTimes(1);
     });
 
     it('can modify the config in configure()', () => {
-        expect(bootstrapMockFn).toHaveBeenCalledTimes(1);
-        const configService: ConfigService = bootstrapMockFn.mock.calls[0][0];
+        const configService = server.app.get(ConfigService);
         expect(configService instanceof ConfigService).toBe(true);
         expect(configService.defaultLanguageCode).toBe(LanguageCode.zh);
     });
@@ -174,37 +137,4 @@ describe('Plugins', () => {
             expect(result.message).toContain('uh oh!');
         });
     });
-
-    describe('processContext', () => {
-        it('server context', async () => {
-            const response = await shopClient.fetch(
-                `http://localhost:${testConfig.apiOptions.port}/process-context/server`,
-            );
-            const body = await response.text();
-
-            expect(body).toBe('true');
-        });
-        it('worker context', async () => {
-            const response = await shopClient.fetch(
-                `http://localhost:${testConfig.apiOptions.port}/process-context/worker`,
-            );
-            const body = await response.text();
-
-            expect(body).toBe('true');
-        });
-    });
-
-    describe('on app close', () => {
-        beforeAll(async () => {
-            await server.destroy();
-        });
-
-        it('calls onVendureClose', () => {
-            expect(onCloseFn).toHaveBeenCalled();
-        });
-
-        it('calls onWorkerVendureClose', () => {
-            expect(onWorkerCloseFn).toHaveBeenCalled();
-        });
-    });
 });

+ 1 - 1
packages/core/e2e/shop-catalog.e2e-spec.ts

@@ -1,5 +1,5 @@
 /* tslint:disable:no-non-null-assertion */
-import { facetValueCollectionFilter } from '@vendure/core';
+import { facetValueCollectionFilter, JobQueueService } from '@vendure/core';
 import { createTestEnvironment } from '@vendure/testing';
 import gql from 'graphql-tag';
 import path from 'path';

+ 1 - 1
packages/core/src/api/common/request-context.ts

@@ -113,7 +113,7 @@ export class RequestContext {
      * @description
      * Serializes the RequestContext object into a JSON-compatible simple object.
      * This is useful when you need to send a RequestContext object to another
-     * process, e.g. to pass it to the Worker process via the {@link WorkerService}.
+     * process, e.g. to pass it to the Job Queue via the {@link JobQueueService}.
      */
     serialize(): SerializedRequestContext {
         const serializableThis: any = Object.assign({}, this);

+ 1 - 9
packages/core/src/app.module.ts

@@ -9,17 +9,9 @@ import { HealthCheckModule } from './health-check/health-check.module';
 import { I18nModule } from './i18n/i18n.module';
 import { I18nService } from './i18n/i18n.service';
 import { PluginModule } from './plugin/plugin.module';
-import { ProcessContextModule } from './process-context/process-context.module';
 
 @Module({
-    imports: [
-        ConfigModule,
-        I18nModule,
-        ApiModule,
-        PluginModule.forRoot(),
-        ProcessContextModule.forRoot(),
-        HealthCheckModule,
-    ],
+    imports: [ConfigModule, I18nModule, ApiModule, PluginModule.forRoot(), HealthCheckModule],
 })
 export class AppModule implements NestModule, OnApplicationShutdown {
     constructor(private configService: ConfigService, private i18nService: I18nService) {}

+ 0 - 0
packages/core/src/worker/async-observable.ts → packages/core/src/async/async-observable.ts


+ 1 - 0
packages/core/src/async/index.ts

@@ -0,0 +1 @@
+export * from './async-observable';

+ 8 - 115
packages/core/src/bootstrap.ts

@@ -1,6 +1,5 @@
-import { INestApplication, INestMicroservice } from '@nestjs/common';
+import { INestApplication, INestApplicationContext } from '@nestjs/common';
 import { NestFactory } from '@nestjs/core';
-import { TcpClientOptions, Transport } from '@nestjs/microservices';
 import { getConnectionToken } from '@nestjs/typeorm';
 import { Type } from '@vendure/common/lib/shared-types';
 import cookieSession = require('cookie-session');
@@ -18,7 +17,6 @@ import { setEntityIdStrategy } from './entity/set-entity-id-strategy';
 import { validateCustomFieldsConfig } from './entity/validate-custom-fields-config';
 import { getConfigurationFunction, getEntitiesFromPlugins } from './plugin/plugin-metadata';
 import { getProxyMiddlewareCliGreetings } from './plugin/plugin-utils';
-import { BeforeVendureBootstrap, BeforeVendureWorkerBootstrap } from './plugin/vendure-plugin';
 
 export type VendureBootstrapFunction = (config: VendureConfig) => Promise<INestApplication>;
 
@@ -54,7 +52,6 @@ export async function bootstrap(userConfig: Partial<VendureConfig>): Promise<INe
     });
     DefaultLogger.restoreOriginalLogLevel();
     app.useLogger(new Logger());
-    await runBeforeBootstrapHooks(config, app);
     if (config.authOptions.tokenMethod === 'cookie') {
         const { sessionSecret, cookieOptions } = config.authOptions;
         app.use(
@@ -67,24 +64,13 @@ export async function bootstrap(userConfig: Partial<VendureConfig>): Promise<INe
     }
     await app.listen(port, hostname || '');
     app.enableShutdownHooks();
-    if (config.workerOptions.runInMainProcess) {
-        try {
-            const worker = await bootstrapWorkerInternal(config);
-            Logger.warn(`Worker is running in main process. This is not recommended for production.`);
-            Logger.warn(`[VendureConfig.workerOptions.runInMainProcess = true]`);
-            closeWorkerOnAppClose(app, worker);
-        } catch (e) {
-            Logger.error(`Could not start the worker process: ${e.message || e}`, 'Vendure Worker');
-        }
-    }
     logWelcomeMessage(config);
     return app;
 }
 
 /**
  * @description
- * Bootstraps the Vendure worker. Read more about the [Vendure Worker]({{< relref "vendure-worker" >}}) or see the worker-specific options
- * defined in {@link WorkerOptions}.
+ * Bootstraps the Vendure . Read more about the [Vendure Worker]({{< relref "vendure-worker" >}})
  *
  * @example
  * ```TypeScript
@@ -97,59 +83,22 @@ export async function bootstrap(userConfig: Partial<VendureConfig>): Promise<INe
  * ```
  * @docsCategory worker
  * */
-export async function bootstrapWorker(userConfig: Partial<VendureConfig>): Promise<INestMicroservice> {
-    if (userConfig.workerOptions && userConfig.workerOptions.runInMainProcess === true) {
-        Logger.useLogger(userConfig.logger || new DefaultLogger());
-        const errorMessage = `Cannot bootstrap worker when "runInMainProcess" is set to true`;
-        Logger.error(errorMessage, 'Vendure Worker');
-        throw new Error(errorMessage);
-    } else {
-        try {
-            const vendureConfig = await preBootstrapConfig(userConfig);
-            return await bootstrapWorkerInternal(vendureConfig);
-        } catch (e) {
-            Logger.error(`Could not start the worker process: ${e.message}`, 'Vendure Worker');
-            throw e;
-        }
-    }
-}
-
-async function bootstrapWorkerInternal(
-    vendureConfig: Readonly<RuntimeVendureConfig>,
-): Promise<INestMicroservice> {
+export async function bootstrapWorker(userConfig: Partial<VendureConfig>): Promise<INestApplicationContext> {
+    const vendureConfig = await preBootstrapConfig(userConfig);
     const config = disableSynchronize(vendureConfig);
-    if (!config.workerOptions.runInMainProcess && (config.logger as any).setDefaultContext) {
-        (config.logger as any).setDefaultContext('Vendure Worker');
-    }
+    (config.logger as any).setDefaultContext('Vendure Worker');
     Logger.useLogger(config.logger);
     Logger.info(`Bootstrapping Vendure Worker (pid: ${process.pid})...`);
 
-    const workerModule = await import('./worker/worker.module');
+    const appModule = await import('./app.module');
     DefaultLogger.hideNestBoostrapLogs();
-    const workerApp = await NestFactory.createMicroservice(workerModule.WorkerModule, {
-        transport: config.workerOptions.transport,
+    const workerApp = await NestFactory.createApplicationContext(appModule.AppModule, {
         logger: new Logger(),
-        options: config.workerOptions.options,
     });
     DefaultLogger.restoreOriginalLogLevel();
     workerApp.useLogger(new Logger());
     workerApp.enableShutdownHooks();
     await validateDbTablesForWorker(workerApp);
-    await runBeforeWorkerBootstrapHooks(config, workerApp);
-    // A work-around to correctly handle errors when attempting to start the
-    // microservice server listening.
-    // See https://github.com/nestjs/nest/issues/2777
-    // TODO: Remove if & when the above issue is resolved.
-    await new Promise((resolve, reject) => {
-        const tcpServer = (workerApp as any).server.server;
-        if (tcpServer) {
-            tcpServer.on('error', (e: any) => {
-                reject(e);
-            });
-        }
-        workerApp.listenAsync().then(resolve);
-    });
-    workerWelcomeMessage(config);
     return workerApp;
 }
 
@@ -246,62 +195,6 @@ function setExposedHeaders(config: Readonly<RuntimeVendureConfig>) {
     }
 }
 
-export async function runBeforeBootstrapHooks(config: Readonly<RuntimeVendureConfig>, app: INestApplication) {
-    function hasBeforeBootstrapHook(
-        plugin: any,
-    ): plugin is { beforeVendureBootstrap: BeforeVendureBootstrap } {
-        return typeof plugin.beforeVendureBootstrap === 'function';
-    }
-    for (const plugin of config.plugins) {
-        if (hasBeforeBootstrapHook(plugin)) {
-            await plugin.beforeVendureBootstrap(app);
-        }
-    }
-}
-
-export async function runBeforeWorkerBootstrapHooks(
-    config: Readonly<RuntimeVendureConfig>,
-    worker: INestMicroservice,
-) {
-    function hasBeforeBootstrapHook(
-        plugin: any,
-    ): plugin is { beforeVendureWorkerBootstrap: BeforeVendureWorkerBootstrap } {
-        return typeof plugin.beforeVendureWorkerBootstrap === 'function';
-    }
-    for (const plugin of config.plugins) {
-        if (hasBeforeBootstrapHook(plugin)) {
-            await plugin.beforeVendureWorkerBootstrap(worker);
-        }
-    }
-}
-
-/**
- * Monkey-patches the app's .close() method to also close the worker microservice
- * instance too.
- */
-function closeWorkerOnAppClose(app: INestApplication, worker: INestMicroservice) {
-    // A Nest app is a nested Proxy. By getting the prototype we are
-    // able to access and override the actual close() method.
-    const appPrototype = Object.getPrototypeOf(app);
-    const appClose = appPrototype.close.bind(app);
-    appPrototype.close = async () => {
-        return Promise.all([appClose(), worker.close()]);
-    };
-}
-
-function workerWelcomeMessage(config: VendureConfig) {
-    let transportString = '';
-    let connectionString = '';
-    const transport = (config.workerOptions && config.workerOptions.transport) || Transport.TCP;
-    transportString = ` with ${Transport[transport]} transport`;
-    const options = (config.workerOptions as TcpClientOptions).options;
-    if (options) {
-        const { host, port } = options;
-        connectionString = ` at ${host || 'localhost'}:${port}`;
-    }
-    Logger.info(`Vendure Worker started${transportString}${connectionString}`);
-}
-
 function logWelcomeMessage(config: RuntimeVendureConfig) {
     let version: string;
     try {
@@ -351,7 +244,7 @@ function disableSynchronize(userConfig: Readonly<RuntimeVendureConfig>): Readonl
  * before allowing the rest of the worker bootstrap to continue.
  * @param worker
  */
-async function validateDbTablesForWorker(worker: INestMicroservice) {
+async function validateDbTablesForWorker(worker: INestApplicationContext) {
     const connection: Connection = worker.get(getConnectionToken());
     await new Promise(async (resolve, reject) => {
         const checkForTables = async (): Promise<boolean> => {

+ 7 - 7
packages/core/src/cli/populate.ts

@@ -1,4 +1,4 @@
-import { INestApplication } from '@nestjs/common';
+import { INestApplicationContext } from '@nestjs/common';
 import fs from 'fs-extra';
 import path from 'path';
 
@@ -12,11 +12,11 @@ import { logColored } from './cli-utils';
  *
  * @docsCategory import-export
  */
-export async function populate(
-    bootstrapFn: () => Promise<INestApplication | undefined>,
+export async function populate<T extends INestApplicationContext>(
+    bootstrapFn: () => Promise<T | undefined>,
     initialDataPathOrObject: string | object,
     productsCsvPath?: string,
-): Promise<INestApplication> {
+): Promise<T> {
     const app = await bootstrapFn();
     if (!app) {
         throw new Error('Could not bootstrap the Vendure app');
@@ -48,7 +48,7 @@ export async function populate(
 }
 
 export async function populateInitialData(
-    app: INestApplication,
+    app: INestApplicationContext,
     initialData: import('@vendure/core').InitialData,
     loggingFn?: (message: string) => void,
 ) {
@@ -65,7 +65,7 @@ export async function populateInitialData(
 }
 
 export async function populateCollections(
-    app: INestApplication,
+    app: INestApplicationContext,
     initialData: import('@vendure/core').InitialData,
     loggingFn?: (message: string) => void,
 ) {
@@ -84,7 +84,7 @@ export async function populateCollections(
 }
 
 export async function importProductsFromCsv(
-    app: INestApplication,
+    app: INestApplicationContext,
     productsCsvPath: string,
     languageCode: import('@vendure/core').LanguageCode,
 ): Promise<import('@vendure/core').ImportProgress> {

+ 3 - 8
packages/core/src/common/types/injectable-strategy.ts

@@ -13,18 +13,13 @@ export interface InjectableStrategy {
      * Defines setup logic to be run during application bootstrap. Receives
      * the {@link Injector} as an argument, which allows application providers
      * to be used as part of the setup. This hook will be called on both the
-     * main server and the worker processes. If you have code which should only
-     * run on either the server on the worker, then inject the {@link ProcessContext}
-     * to check the current context.
+     * main server and the worker processes.
      *
      * @example
      * ```TypeScript
      * async init(injector: Injector) {
-     *   const processContext = injector.get(ProcessContext);
-     *   if (processContext.isServer) {
-     *     const myService = injector.get(MyService);
-     *     await myService.doSomething();
-     *   }
+     *   const myService = injector.get(MyService);
+     *   await myService.doSomething();
      * }
      * ```
      */

+ 5 - 25
packages/core/src/config/config.module.ts

@@ -4,7 +4,6 @@ import { ModuleRef } from '@nestjs/core';
 import { ConfigurableOperationDef } from '../common/configurable-operation';
 import { Injector } from '../common/injector';
 import { InjectableStrategy } from '../common/types/injectable-strategy';
-import { ProcessContext } from '../process-context/process-context';
 
 import { ConfigService } from './config.service';
 
@@ -13,35 +12,16 @@ import { ConfigService } from './config.service';
     exports: [ConfigService],
 })
 export class ConfigModule implements OnApplicationBootstrap, OnApplicationShutdown {
-    constructor(
-        private configService: ConfigService,
-        private moduleRef: ModuleRef,
-        private processContext: ProcessContext,
-    ) {}
+    constructor(private configService: ConfigService, private moduleRef: ModuleRef) {}
 
     async onApplicationBootstrap() {
-        if (this.runInjectableStrategyLifecycleHooks()) {
-            await this.initInjectableStrategies();
-            await this.initConfigurableOperations();
-        }
+        await this.initInjectableStrategies();
+        await this.initConfigurableOperations();
     }
 
     async onApplicationShutdown(signal?: string) {
-        if (this.runInjectableStrategyLifecycleHooks()) {
-            await this.destroyInjectableStrategies();
-            await this.destroyConfigurableOperations();
-        }
-    }
-
-    /**
-     * The lifecycle hooks of the configured strategies should be run if we are on the main
-     * server process _or_ if we are on the worker running independently of the main process.
-     */
-    private runInjectableStrategyLifecycleHooks(): boolean {
-        return (
-            this.processContext.isServer ||
-            (this.processContext.isWorker && !this.configService.workerOptions.runInMainProcess)
-        );
+        await this.destroyInjectableStrategies();
+        await this.destroyConfigurableOperations();
     }
 
     private async initInjectableStrategies() {

+ 0 - 1
packages/core/src/config/config.service.mock.ts

@@ -41,7 +41,6 @@ export class MockConfigService implements MockClass<ConfigService> {
     emailOptions: {};
     importExportOptions: {};
     orderOptions = {};
-    workerOptions = {};
     customFields = {};
 
     plugins = [];

+ 0 - 9
packages/core/src/config/config.service.ts

@@ -1,8 +1,5 @@
 import { DynamicModule, Injectable, Type } from '@nestjs/common';
-import { CorsOptions } from '@nestjs/common/interfaces/external/cors-options.interface';
 import { LanguageCode } from '@vendure/common/lib/generated-types';
-import { PluginDefinition } from 'apollo-server-core';
-import { RequestHandler } from 'express';
 import { ConnectionOptions } from 'typeorm';
 
 import { getConfig } from './config-helpers';
@@ -21,10 +18,8 @@ import {
     PromotionOptions,
     RuntimeVendureConfig,
     ShippingOptions,
-    SuperadminCredentials,
     TaxOptions,
     VendureConfig,
-    WorkerOptions,
 } from './vendure-config';
 
 @Injectable()
@@ -107,10 +102,6 @@ export class ConfigService implements VendureConfig {
         return this.activeConfig.logger;
     }
 
-    get workerOptions(): WorkerOptions {
-        return this.activeConfig.workerOptions;
-    }
-
     get jobQueueOptions(): Required<JobQueueOptions> {
         return this.activeConfig.jobQueueOptions;
     }

+ 0 - 7
packages/core/src/config/default-config.ts

@@ -133,13 +133,6 @@ export const defaultConfig: RuntimeVendureConfig = {
     importExportOptions: {
         importAssetsDir: __dirname,
     },
-    workerOptions: {
-        runInMainProcess: false,
-        transport: Transport.TCP,
-        options: {
-            port: 3020,
-        },
-    },
     jobQueueOptions: {
         jobQueueStrategy: new InMemoryJobQueueStrategy(),
         activeQueues: [],

+ 1 - 1
packages/core/src/config/job-queue/job-queue-strategy.ts

@@ -27,7 +27,7 @@ export interface JobQueueStrategy extends InjectableStrategy {
     start<Data extends JobData<Data> = {}>(
         queueName: string,
         process: (job: Job<Data>) => Promise<any>,
-    ): void;
+    ): Promise<void>;
 
     /**
      * @description

+ 0 - 60
packages/core/src/config/vendure-config.ts

@@ -685,60 +685,6 @@ export interface ImportExportOptions {
     importAssetsDir?: string;
 }
 
-/**
- * @description
- * Options related to the Vendure Worker.
- *
- * @example
- * ```TypeScript
- * import { Transport } from '\@nestjs/microservices';
- *
- * const config: VendureConfig = {
- *     // ...
- *     workerOptions: {
- *         transport: Transport.TCP,
- *         options: {
- *             host: 'localhost',
- *             port: 3001,
- *         },
- *     },
- * }
- * ```
- *
- * @docsCategory worker
- */
-export interface WorkerOptions {
-    /**
-     * @description
-     * If set to `true`, the Worker will run be bootstrapped as part of the main Vendure server (when invoking the
-     * `bootstrap()` function) and will run in the same process. This mode is intended only for development and
-     * testing purposes, not for production, since running the Worker in the main process negates the benefits
-     * of having long-running or expensive tasks run in the background.
-     *
-     * @default false
-     */
-    runInMainProcess?: boolean;
-    /**
-     * @description
-     * Sets the transport protocol used to communicate with the Worker. Options include TCP, Redis, gPRC and more. See the
-     * [NestJS microservices documentation](https://docs.nestjs.com/microservices/basics) for a full list.
-     *
-     * @default Transport.TCP
-     */
-    transport?: Transport;
-    /**
-     * @description
-     * Additional options related to the chosen transport method. See See the
-     * [NestJS microservices documentation](https://docs.nestjs.com/microservices/basics) for details on the options relating to each of the
-     * transport methods.
-     *
-     * By default, the options for the TCP transport will run with the following settings:
-     * * host: 'localhost'
-     * * port: 3020
-     */
-    options?: ClientOptions['options'];
-}
-
 /**
  * @description
  * Options related to the built-in job queue.
@@ -876,11 +822,6 @@ export interface VendureConfig {
      * Configures how taxes are calculated on products.
      */
     taxOptions?: TaxOptions;
-    /**
-     * @description
-     * Configures the Vendure Worker, which is used for long-running background tasks.
-     */
-    workerOptions?: WorkerOptions;
     /**
      * @description
      * Configures how the job queue is persisted and processed.
@@ -905,7 +846,6 @@ export interface RuntimeVendureConfig extends Required<VendureConfig> {
     orderOptions: Required<OrderOptions>;
     promotionOptions: Required<PromotionOptions>;
     shippingOptions: Required<ShippingOptions>;
-    workerOptions: Required<WorkerOptions>;
     taxOptions: Required<TaxOptions>;
 }
 

+ 1 - 2
packages/core/src/index.ts

@@ -7,11 +7,10 @@ export * from './event-bus/index';
 export * from './health-check/index';
 export * from './job-queue/index';
 export * from './plugin/index';
-export * from './process-context/index';
 export * from './entity/index';
 export * from './data-import/index';
 export * from './service/index';
-export * from './worker/index';
+export * from './async/index';
 export * from '@vendure/common/lib/shared-types';
 export {
     Permission,

+ 1 - 0
packages/core/src/job-queue/constants.ts

@@ -0,0 +1 @@
+export const loggerCtx = 'JobQueue';

+ 15 - 19
packages/core/src/job-queue/job-queue.service.spec.ts

@@ -8,7 +8,6 @@ import { take } from 'rxjs/operators';
 
 import { Injector } from '../common';
 import { ConfigService } from '../config/config.service';
-import { ProcessContext, WorkerProcessContext } from '../process-context/process-context';
 
 import { Job } from './job';
 import { JobQueueService } from './job-queue.service';
@@ -22,15 +21,12 @@ describe('JobQueueService', () => {
 
     beforeEach(async () => {
         module = await Test.createTestingModule({
-            providers: [
-                { provide: ConfigService, useClass: MockConfigService },
-                { provide: ProcessContext, useClass: WorkerProcessContext },
-                JobQueueService,
-            ],
+            providers: [{ provide: ConfigService, useClass: MockConfigService }, JobQueueService],
         }).compile();
+        await module.init();
 
         jobQueueService = module.get(JobQueueService);
-        await module.init();
+        await jobQueueService.start();
     });
 
     afterEach(async () => {
@@ -40,7 +36,7 @@ describe('JobQueueService', () => {
     it('data is passed into job', async () => {
         const subject = new Subject<string>();
         const subNext = subject.pipe(take(1)).toPromise();
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: async job => {
                 subject.next(job.data);
@@ -54,7 +50,7 @@ describe('JobQueueService', () => {
 
     it('job marked as complete', async () => {
         const subject = new Subject<string>();
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: job => {
                 return subject.toPromise();
@@ -78,7 +74,7 @@ describe('JobQueueService', () => {
 
     it('job marked as failed when exception thrown', async () => {
         const subject = new Subject();
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: async job => {
                 const result = await subject.toPromise();
@@ -102,7 +98,7 @@ describe('JobQueueService', () => {
 
     it('job marked as failed when async error thrown', async () => {
         const err = new Error('something bad happened');
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: async job => {
                 throw err;
@@ -119,7 +115,7 @@ describe('JobQueueService', () => {
 
     it('jobs processed in FIFO queue', async () => {
         const subject = new Subject();
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: job => {
                 return subject.pipe(take(1)).toPromise();
@@ -164,7 +160,7 @@ describe('JobQueueService', () => {
         testingJobQueueStrategy.concurrency = 2;
 
         const subject = new Subject();
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: job => {
                 return subject.pipe(take(1)).toPromise();
@@ -212,7 +208,7 @@ describe('JobQueueService', () => {
             }),
         ]);
 
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: async job => {
                 return;
@@ -232,7 +228,7 @@ describe('JobQueueService', () => {
 
     it('retries', async () => {
         const subject = new Subject<boolean>();
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: job => {
                 return subject
@@ -275,7 +271,7 @@ describe('JobQueueService', () => {
             .jobQueueStrategy as TestingJobQueueStrategy;
 
         const subject = new Subject<boolean>();
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: job => {
                 return subject.pipe(take(1)).toPromise();
@@ -288,7 +284,7 @@ describe('JobQueueService', () => {
 
         expect((await testingJobQueueStrategy.findOne(testJob.id!))?.state).toBe(JobState.RUNNING);
 
-        await testQueue.destroy();
+        await testQueue.stop();
 
         expect((await testingJobQueueStrategy.findOne(testJob.id!))?.state).toBe(JobState.PENDING);
     }, 10000);
@@ -297,7 +293,7 @@ describe('JobQueueService', () => {
         module.get(ConfigService).jobQueueOptions.activeQueues = ['test'];
 
         const subject = new Subject();
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: job => {
                 return subject.toPromise();
@@ -322,7 +318,7 @@ describe('JobQueueService', () => {
         module.get(ConfigService).jobQueueOptions.activeQueues = ['another'];
 
         const subject = new Subject();
-        const testQueue = jobQueueService.createQueue<string>({
+        const testQueue = await jobQueueService.createQueue<string>({
             name: 'test',
             process: job => {
                 return subject.toPromise();

+ 24 - 27
packages/core/src/job-queue/job-queue.service.ts

@@ -1,9 +1,9 @@
-import { Injectable, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';
+import { Injectable, OnModuleDestroy } from '@nestjs/common';
 import { JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types';
 
-import { ConfigService, JobQueueStrategy } from '../config';
-import { ProcessContext } from '../process-context';
+import { ConfigService, JobQueueStrategy, Logger } from '../config';
 
+import { loggerCtx } from './constants';
 import { JobQueue } from './job-queue';
 import { CreateQueueOptions, JobData } from './types';
 
@@ -43,46 +43,47 @@ import { CreateQueueOptions, JobData } from './types';
  * @docsCategory JobQueue
  */
 @Injectable()
-export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy {
+export class JobQueueService implements OnModuleDestroy {
     private queues: Array<JobQueue<any>> = [];
-    private hasInitialized = false;
+    private hasStarted = false;
 
     private get jobQueueStrategy(): JobQueueStrategy {
         return this.configService.jobQueueOptions.jobQueueStrategy;
     }
 
-    constructor(private configService: ConfigService, private processContext: ProcessContext) {}
-
-    /** @internal */
-    async onApplicationBootstrap() {
-        this.hasInitialized = true;
-        for (const queue of this.queues) {
-            if (!queue.started && this.shouldStartQueue(queue.name)) {
-                queue.start();
-            }
-        }
-    }
+    constructor(private configService: ConfigService) {}
 
     /** @internal */
     onModuleDestroy() {
-        this.hasInitialized = false;
-        return Promise.all(this.queues.map(q => q.destroy()));
+        this.hasStarted = false;
+        return Promise.all(this.queues.map(q => q.stop()));
     }
 
     /**
      * @description
      * Configures and creates a new {@link JobQueue} instance.
      */
-    createQueue<Data extends JobData<Data>>(options: CreateQueueOptions<Data>): JobQueue<Data> {
-        const { jobQueueStrategy } = this.configService.jobQueueOptions;
-        const queue = new JobQueue(options, jobQueueStrategy);
-        if (this.hasInitialized && this.shouldStartQueue(queue.name)) {
-            queue.start();
+    async createQueue<Data extends JobData<Data>>(
+        options: CreateQueueOptions<Data>,
+    ): Promise<JobQueue<Data>> {
+        const queue = new JobQueue(options, this.jobQueueStrategy);
+        if (this.hasStarted && this.shouldStartQueue(queue.name)) {
+            await queue.start();
         }
         this.queues.push(queue);
         return queue;
     }
 
+    async start(): Promise<void> {
+        this.hasStarted = true;
+        for (const queue of this.queues) {
+            if (!queue.started && this.shouldStartQueue(queue.name)) {
+                Logger.info(`Starting queue: ${queue.name}`, loggerCtx);
+                await queue.start();
+            }
+        }
+    }
+
     /**
      * @description
      * Returns an array of `{ name: string; running: boolean; }` for each
@@ -96,10 +97,6 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
     }
 
     private shouldStartQueue(queueName: string): boolean {
-        if (this.processContext.isServer) {
-            return false;
-        }
-
         if (this.configService.jobQueueOptions.activeQueues.length > 0) {
             if (!this.configService.jobQueueOptions.activeQueues.includes(queueName)) {
                 return false;

+ 3 - 13
packages/core/src/job-queue/job-queue.ts

@@ -34,26 +34,16 @@ export class JobQueue<Data extends JobData<Data> = {}> {
     constructor(private options: CreateQueueOptions<Data>, private jobQueueStrategy: JobQueueStrategy) {}
 
     /** @internal */
-    start() {
+    async start() {
         if (this.running) {
             return;
         }
         this.running = true;
-        this.jobQueueStrategy.start<Data>(this.options.name, this.options.process);
+        await this.jobQueueStrategy.start<Data>(this.options.name, this.options.process);
     }
 
     /** @internal */
-    pause() {
-        Logger.debug(`Pausing JobQueue "${this.options.name}"`);
-        if (!this.running) {
-            return;
-        }
-        this.running = false;
-        this.jobQueueStrategy.stop(this.options.name, this.options.process);
-    }
-
-    /** @internal */
-    async destroy(): Promise<void> {
+    async stop(): Promise<void> {
         if (!this.running) {
             return;
         }

+ 2 - 2
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -130,10 +130,10 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
         super();
     }
 
-    start<Data extends JobData<Data> = {}>(
+    async start<Data extends JobData<Data> = {}>(
         queueName: string,
         process: (job: Job<Data>) => Promise<any>,
-    ): void {
+    ) {
         if (!this.hasInitialized) {
             this.started.set(queueName, process);
             return;

+ 1 - 1
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -6,6 +6,7 @@ import { Observable } from 'rxjs';
 import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
 
 import { RequestContext } from '../../../api/common/request-context';
+import { asyncObservable } from '../../../async';
 import { AsyncQueue } from '../../../common/async-queue';
 import { Translatable, Translation } from '../../../common/types/locale-types';
 import { idsAreEqual } from '../../../common/utils';
@@ -16,7 +17,6 @@ import { ProductVariant } from '../../../entity/product-variant/product-variant.
 import { Product } from '../../../entity/product/product.entity';
 import { ProductVariantService } from '../../../service/services/product-variant.service';
 import { TransactionalConnection } from '../../../service/transaction/transactional-connection';
-import { asyncObservable } from '../../../worker/async-observable';
 import { SearchIndexItem } from '../search-index-item.entity';
 import {
     ProductChannelMessageData,

+ 2 - 2
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -24,8 +24,8 @@ export class SearchIndexService implements OnApplicationBootstrap {
 
     constructor(private jobService: JobQueueService, private indexerController: IndexerController) {}
 
-    onApplicationBootstrap() {
-        this.updateIndexQueue = this.jobService.createQueue({
+    async onApplicationBootstrap() {
+        this.updateIndexQueue = await this.jobService.createQueue({
             name: 'update-search-index',
             process: job => {
                 const data = job.data;

+ 0 - 4
packages/core/src/plugin/plugin-common.module.ts

@@ -6,7 +6,6 @@ import { EventBusModule } from '../event-bus/event-bus.module';
 import { HealthCheckModule } from '../health-check/health-check.module';
 import { JobQueueModule } from '../job-queue/job-queue.module';
 import { ServiceModule } from '../service/service.module';
-import { WorkerServiceModule } from '../worker/worker-service.module';
 
 /**
  * @description
@@ -19,7 +18,6 @@ import { WorkerServiceModule } from '../worker/worker-service.module';
  * * `EventBusModule`, allowing the injection of the {@link EventBus} service.
  * * `ServiceModule` allowing the injection of any of the various entity services such as ProductService, OrderService etc.
  * * `ConfigModule`, allowing the injection of the ConfigService.
- * * `WorkerServiceModule`, allowing the injection of the {@link WorkerService}.
  * * `JobQueueModule`, allowing the injection of the {@link JobQueueService}.
  * * `HealthCheckModule`, allowing the injection of the {@link HealthCheckRegistryService}.
  *
@@ -30,7 +28,6 @@ import { WorkerServiceModule } from '../worker/worker-service.module';
         EventBusModule,
         ConfigModule,
         ServiceModule.forPlugin(),
-        WorkerServiceModule,
         JobQueueModule,
         HealthCheckModule,
         CacheModule,
@@ -39,7 +36,6 @@ import { WorkerServiceModule } from '../worker/worker-service.module';
         EventBusModule,
         ConfigModule,
         ServiceModule.forPlugin(),
-        WorkerServiceModule,
         JobQueueModule,
         HealthCheckModule,
         CacheModule,

+ 1 - 13
packages/core/src/plugin/plugin-metadata.ts

@@ -4,13 +4,12 @@ import { Type } from '@vendure/common/lib/shared-types';
 
 import { notNullOrUndefined } from '../../../common/lib/shared-utils';
 
-import { APIExtensionDefinition, PluginConfigurationFn, PluginLifecycleMethods } from './vendure-plugin';
+import { APIExtensionDefinition, PluginConfigurationFn } from './vendure-plugin';
 
 export const PLUGIN_METADATA = {
     CONFIGURATION: 'configuration',
     SHOP_API_EXTENSIONS: 'shopApiExtensions',
     ADMIN_API_EXTENSIONS: 'adminApiExtensions',
-    WORKERS: 'workers',
     ENTITIES: 'entities',
 };
 
@@ -48,17 +47,6 @@ export function getPluginModules(plugins: Array<Type<any> | DynamicModule>): Arr
     return plugins.map(p => (isDynamicModule(p) ? p.module : p));
 }
 
-export function hasLifecycleMethod<M extends keyof PluginLifecycleMethods>(
-    plugin: any,
-    lifecycleMethod: M,
-): plugin is { [key in M]: PluginLifecycleMethods[M] } {
-    return typeof (plugin as any)[lifecycleMethod] === 'function';
-}
-
-export function getWorkerControllers(plugin: Type<any> | DynamicModule) {
-    return reflectMetadata(plugin, PLUGIN_METADATA.WORKERS);
-}
-
 export function getConfigurationFunction(
     plugin: Type<any> | DynamicModule,
 ): PluginConfigurationFn | undefined {

+ 2 - 93
packages/core/src/plugin/plugin.module.ts

@@ -1,25 +1,7 @@
-import { DynamicModule, Inject, Module, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
-import { ModuleRef } from '@nestjs/core';
+import { DynamicModule, Module } from '@nestjs/common';
 
 import { getConfig } from '../config/config-helpers';
 import { ConfigModule } from '../config/config.module';
-import { ConfigService } from '../config/config.service';
-import { Logger } from '../config/logger/vendure-logger';
-
-import {
-    getPluginModules,
-    getWorkerControllers,
-    hasLifecycleMethod,
-    isDynamicModule,
-} from './plugin-metadata';
-import { PluginLifecycleMethods } from './vendure-plugin';
-
-export enum PluginProcessContext {
-    Main,
-    Worker,
-}
-
-const PLUGIN_PROCESS_CONTEXT = 'PLUGIN_PROCESS_CONTEXT';
 
 /**
  * This module collects and re-exports all providers defined in plugins so that they can be used in other
@@ -28,84 +10,11 @@ const PLUGIN_PROCESS_CONTEXT = 'PLUGIN_PROCESS_CONTEXT';
 @Module({
     imports: [ConfigModule],
 })
-export class PluginModule implements OnModuleInit, OnModuleDestroy {
+export class PluginModule {
     static forRoot(): DynamicModule {
         return {
             module: PluginModule,
-            providers: [{ provide: PLUGIN_PROCESS_CONTEXT, useValue: PluginProcessContext.Main }],
             imports: [...getConfig().plugins],
         };
     }
-    static forWorker(): DynamicModule {
-        return {
-            module: PluginModule,
-            providers: [{ provide: PLUGIN_PROCESS_CONTEXT, useValue: PluginProcessContext.Worker }],
-            imports: [...pluginsWithWorkerControllers()],
-        };
-    }
-    constructor(
-        @Inject(PLUGIN_PROCESS_CONTEXT) private processContext: PluginProcessContext,
-        private moduleRef: ModuleRef,
-        private configService: ConfigService,
-    ) {}
-
-    async onModuleInit() {
-        if (this.processContext === PluginProcessContext.Main) {
-            await this.runPluginLifecycleMethods('onVendureBootstrap', instance => {
-                const pluginName = instance.constructor.name || '(anonymous plugin)';
-                Logger.verbose(`Bootstrapped plugin ${pluginName}`);
-            });
-        }
-        if (this.processContext === PluginProcessContext.Worker) {
-            await this.runPluginLifecycleMethods('onVendureWorkerBootstrap');
-        }
-    }
-
-    async onModuleDestroy() {
-        if (this.processContext === PluginProcessContext.Main) {
-            await this.runPluginLifecycleMethods('onVendureClose');
-        }
-        if (this.processContext === PluginProcessContext.Worker) {
-            await this.runPluginLifecycleMethods('onVendureWorkerClose');
-        }
-    }
-
-    private async runPluginLifecycleMethods(
-        lifecycleMethod: keyof PluginLifecycleMethods,
-        afterRun?: (instance: any) => void,
-    ) {
-        for (const plugin of getPluginModules(this.configService.plugins)) {
-            let instance: any;
-            try {
-                instance = this.moduleRef.get(plugin, { strict: false });
-            } catch (e) {
-                Logger.error(`Could not find ${plugin.name}`, undefined, e.stack);
-            }
-            if (instance) {
-                if (hasLifecycleMethod(instance, lifecycleMethod)) {
-                    await instance[lifecycleMethod]();
-                }
-                if (typeof afterRun === 'function') {
-                    afterRun(instance);
-                }
-            }
-        }
-    }
-}
-
-function pluginsWithWorkerControllers(): DynamicModule[] {
-    return getConfig().plugins.map(plugin => {
-        const controllers = getWorkerControllers(plugin);
-        if (isDynamicModule(plugin)) {
-            return {
-                ...plugin,
-                controllers,
-            };
-        } else {
-            return {
-                module: plugin,
-                controllers,
-            };
-        }
-    });
 }

+ 0 - 85
packages/core/src/plugin/vendure-plugin.ts

@@ -37,12 +37,6 @@ export interface VendurePluginMetadata extends ModuleMetadata {
      * schema definitions and any required resolvers.
      */
     adminApiExtensions?: APIExtensionDefinition;
-    /**
-     * @description
-     * The plugin may define [Nestjs microservice controllers](https://docs.nestjs.com/microservices/basics#request-response)
-     * which are run in the Worker context.
-     */
-    workers?: Array<Type<any>>;
     /**
      * @description
      * The plugin may define custom [TypeORM database entities](https://typeorm.io/#/entities).
@@ -138,82 +132,3 @@ export function VendurePlugin(pluginMetadata: VendurePluginMetadata): ClassDecor
         Module(nestModuleMetadata)(target);
     };
 }
-
-/**
- * @description
- * A plugin which implements a static `beforeVendureBootstrap` method with this type can define logic to run
- * before the Vendure server (and the underlying Nestjs application) is bootstrapped. This is called
- * _after_ the Nestjs application has been created, but _before_ the `app.listen()` method is invoked.
- *
- * @docsCategory plugin
- * @docsPage Plugin Lifecycle Methods
- */
-export type BeforeVendureBootstrap = (app: INestApplication) => void | Promise<void>;
-
-/**
- * @description
- * A plugin which implements a static `beforeVendureWorkerBootstrap` method with this type can define logic to run
- * before the Vendure worker (and the underlying Nestjs microservice) is bootstrapped. This is called
- * _after_ the Nestjs microservice has been created, but _before_ the `microservice.listen()` method is invoked.
- *
- * @docsCategory plugin
- * @docsPage Plugin Lifecycle Methods
- */
-export type BeforeVendureWorkerBootstrap = (app: INestMicroservice) => void | Promise<void>;
-
-/**
- * @description
- * A plugin which implements this interface can define logic to run when the Vendure server is initialized.
- *
- * For example, this could be used to call out to an external API or to set up {@link EventBus} listeners.
- *
- * @docsCategory plugin
- * @docsPage Plugin Lifecycle Methods
- */
-export interface OnVendureBootstrap {
-    onVendureBootstrap(): void | Promise<void>;
-}
-
-/**
- * @description
- * A plugin which implements this interface can define logic to run when the Vendure worker is initialized.
- *
- * For example, this could be used to start or connect to a server or databased used by the worker.
- *
- * @docsCategory plugin
- * @docsPage Plugin Lifecycle Methods
- */
-export interface OnVendureWorkerBootstrap {
-    onVendureWorkerBootstrap(): void | Promise<void>;
-}
-
-/**
- * @description
- * A plugin which implements this interface can define logic to run before Vendure server is closed.
- *
- * For example, this could be used to clean up any processes started by the {@link OnVendureBootstrap} method.
- *
- * @docsCategory plugin
- * @docsPage Plugin Lifecycle Methods
- */
-export interface OnVendureClose {
-    onVendureClose(): void | Promise<void>;
-}
-
-/**
- * @description
- * A plugin which implements this interface can define logic to run before Vendure worker is closed.
- *
- * For example, this could be used to close any open connections to external services.
- *
- * @docsCategory plugin
- * @docsPage Plugin Lifecycle Methods
- */
-export interface OnVendureWorkerClose {
-    onVendureWorkerClose(): void | Promise<void>;
-}
-
-export type PluginLifecycleMethods = OnVendureBootstrap &
-    OnVendureWorkerBootstrap &
-    OnVendureClose &
-    OnVendureWorkerClose;

+ 0 - 2
packages/core/src/process-context/index.ts

@@ -1,2 +0,0 @@
-export * from './process-context';
-export * from './process-context.module';

+ 0 - 22
packages/core/src/process-context/process-context.module.ts

@@ -1,22 +0,0 @@
-import { DynamicModule, Global, Module } from '@nestjs/common';
-
-import { ProcessContext, ServerProcessContext, WorkerProcessContext } from './process-context';
-
-@Global()
-@Module({})
-export class ProcessContextModule {
-    static forRoot(): DynamicModule {
-        return {
-            module: ProcessContextModule,
-            providers: [{ provide: ProcessContext, useClass: ServerProcessContext }],
-            exports: [ProcessContext],
-        };
-    }
-    static forWorker(): DynamicModule {
-        return {
-            module: ProcessContextModule,
-            providers: [{ provide: ProcessContext, useClass: WorkerProcessContext }],
-            exports: [ProcessContext],
-        };
-    }
-}

+ 0 - 30
packages/core/src/process-context/process-context.ts

@@ -1,30 +0,0 @@
-import { Injectable } from '@nestjs/common';
-
-/**
- * @description
- * The ProcessContext can be injected into your providers in order to know whether that provider
- * is being executed in the context of the main Vendure server or the worker.
- *
- * @docsCategory common
- */
-@Injectable()
-export class ProcessContext {
-    protected _isServer: boolean;
-
-    get isServer(): boolean {
-        return this._isServer;
-    }
-    get isWorker(): boolean {
-        return !this._isServer;
-    }
-}
-
-@Injectable()
-export class ServerProcessContext extends ProcessContext {
-    protected _isServer = true;
-}
-
-@Injectable()
-export class WorkerProcessContext extends ProcessContext {
-    protected _isServer = false;
-}

+ 0 - 32
packages/core/src/service/service.module.ts

@@ -163,38 +163,6 @@ export class ServiceModule {
         };
     }
 
-    static forWorker(): DynamicModule {
-        if (!workerTypeOrmModule) {
-            workerTypeOrmModule = TypeOrmModule.forRootAsync({
-                imports: [ConfigModule],
-                useFactory: (configService: ConfigService) => {
-                    const { dbConnectionOptions, workerOptions } = configService;
-                    const logger = ServiceModule.getTypeOrmLogger(dbConnectionOptions);
-                    if (workerOptions.runInMainProcess) {
-                        // When running in the main process, we can re-use the existing
-                        // default connection.
-                        return {
-                            ...dbConnectionOptions,
-                            logger,
-                            name: 'default',
-                            keepConnectionAlive: true,
-                        };
-                    } else {
-                        return {
-                            ...dbConnectionOptions,
-                            logger,
-                        };
-                    }
-                },
-                inject: [ConfigService],
-            });
-        }
-        return {
-            module: ServiceModule,
-            imports: [workerTypeOrmModule, ConfigModule],
-        };
-    }
-
     static forPlugin(): DynamicModule {
         return {
             module: ServiceModule,

+ 2 - 2
packages/core/src/service/services/collection.service.ts

@@ -65,7 +65,7 @@ export class CollectionService implements OnModuleInit {
         private customFieldRelationService: CustomFieldRelationService,
     ) {}
 
-    onModuleInit() {
+    async onModuleInit() {
         const productEvents$ = this.eventBus.ofType(ProductEvent);
         const variantEvents$ = this.eventBus.ofType(ProductVariantEvent);
 
@@ -79,7 +79,7 @@ export class CollectionService implements OnModuleInit {
                 });
             });
 
-        this.applyFiltersQueue = this.jobQueueService.createQueue({
+        this.applyFiltersQueue = await this.jobQueueService.createQueue({
             name: 'apply-collection-filters',
             process: async job => {
                 const ctx = RequestContext.deserialize(job.data.ctx);

+ 0 - 1
packages/core/src/worker/constants.ts

@@ -1 +0,0 @@
-export const VENDURE_WORKER_CLIENT = Symbol('VENDURE_WORKER_CLIENT');

+ 0 - 3
packages/core/src/worker/index.ts

@@ -1,3 +0,0 @@
-export * from './async-observable';
-export * from './worker.service';
-export * from './types';

+ 0 - 25
packages/core/src/worker/message-interceptor.ts

@@ -1,25 +0,0 @@
-import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
-import { Observable } from 'rxjs';
-import { finalize, tap } from 'rxjs/operators';
-
-import { WorkerMonitor } from './worker-monitor';
-
-/**
- * This interceptor is used to keep track of open worker tasks, so that the WorkerModule
- * is not allowed to be destroyed while tasks are in progress.
- */
-@Injectable()
-export class MessageInterceptor implements NestInterceptor {
-    constructor(private monitor: WorkerMonitor) {}
-
-    intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
-        this.monitor.increment();
-        return next
-            .handle()
-            .pipe(
-                finalize(() => {
-                    this.monitor.decrement();
-                }),
-            );
-    }
-}

+ 0 - 44
packages/core/src/worker/types.ts

@@ -1,44 +0,0 @@
-/**
- * @description
- * A class which is used to define the contract between the Vendure server and the worker process. Used
- * by the {@link WorkerService} `send` method.
- *
- * @example
- * ```TypeScript
- * export class ReindexMessage extends WorkerMessage<{ ctx: SerializedRequestContext }, boolean> {
- *   static readonly pattern = 'Reindex';
- * }
- *
- * // in a Service running on the main process
- * class MyService {
- *
- *   constructor(private workerService: WorkerService) {}
- *
- *   reindex(ctx: RequestContext): Observable<boolean>> {
- *     // If you need to send the RequestContext object to the worker,
- *     // be sure to send a serialized version to avoid errors.
- *     const serializedCtx = ctx.serialize();
- *     return this.workerService.send(new ReindexMessage({ ctx: serializedCtx }))
- *   }
- * }
- *
- * // in a microservice Controller on the worker process
- * class MyController {
- *
- *  \@MessagePattern(ReindexMessage.pattern)
- *  reindex({ ctx: serializedCtx }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
- *    // Convert the SerializedRequestContext back into a regular
- *    // RequestContext object
- *    const ctx = RequestContext.deserialize(serializedCtx);
- *    // ... some long-running workload
- *  }
- * }
- * ```
- *
- * @docsCategory worker
- */
-export abstract class WorkerMessage<T, R> {
-    static readonly pattern: string;
-    constructor(public data: T) {}
-    response: R;
-}

+ 0 - 37
packages/core/src/worker/worker-monitor.ts

@@ -1,37 +0,0 @@
-import { Injectable } from '@nestjs/common';
-import { BehaviorSubject } from 'rxjs';
-import { debounceTime, takeWhile, tap } from 'rxjs/operators';
-
-import { Logger } from '../config/logger/vendure-logger';
-
-/**
- * This service is responsible for keeping track of incomplete worker tasks
- * to ensure that the WorkerModule is not destroyed before active tasks complete.
- */
-@Injectable()
-export class WorkerMonitor {
-    openTasks = new BehaviorSubject<number>(0);
-    get openTaskCount(): number {
-        return this.openTasks.value;
-    }
-    increment() {
-        this.openTasks.next(this.openTasks.value + 1);
-    }
-    decrement() {
-        this.openTasks.next(this.openTasks.value - 1);
-    }
-    waitForOpenTasksToComplete(): Promise<number> {
-        if (0 < this.openTaskCount) {
-            Logger.info('Waiting for open worker tasks to complete...');
-        }
-        return this.openTasks.asObservable().pipe(
-            tap(count => {
-                if (0 < count) {
-                    Logger.info(`${count} tasks open`);
-                }
-            }),
-            debounceTime(100),
-            takeWhile(value => value > 0),
-        ).toPromise();
-    }
-}

+ 0 - 27
packages/core/src/worker/worker-service.module.ts

@@ -1,27 +0,0 @@
-import { Module } from '@nestjs/common';
-import { ClientProxyFactory } from '@nestjs/microservices';
-
-import { ConfigModule } from '../config/config.module';
-import { ConfigService } from '../config/config.service';
-
-import { VENDURE_WORKER_CLIENT } from './constants';
-import { WorkerService } from './worker.service';
-
-@Module({
-    imports: [ConfigModule],
-    providers: [
-        WorkerService,
-        {
-            provide: VENDURE_WORKER_CLIENT,
-            useFactory: (configService: ConfigService) => {
-                return ClientProxyFactory.create({
-                    transport: configService.workerOptions.transport as any,
-                    options: configService.workerOptions.options as any,
-                });
-            },
-            inject: [ConfigService],
-        },
-    ],
-    exports: [WorkerService],
-})
-export class WorkerServiceModule {}

+ 0 - 41
packages/core/src/worker/worker.module.ts

@@ -1,41 +0,0 @@
-import { Module, OnApplicationShutdown, OnModuleDestroy } from '@nestjs/common';
-import { APP_INTERCEPTOR } from '@nestjs/core';
-
-import { ConfigModule } from '../config/config.module';
-import { Logger } from '../config/logger/vendure-logger';
-import { PluginModule } from '../plugin/plugin.module';
-import { ProcessContextModule } from '../process-context/process-context.module';
-import { ServiceModule } from '../service/service.module';
-
-import { MessageInterceptor } from './message-interceptor';
-import { WorkerMonitor } from './worker-monitor';
-import { WorkerServiceModule } from './worker-service.module';
-
-@Module({
-    imports: [
-        ConfigModule,
-        ServiceModule.forWorker(),
-        PluginModule.forWorker(),
-        WorkerServiceModule,
-        ProcessContextModule.forWorker(),
-    ],
-    providers: [
-        WorkerMonitor,
-        {
-            provide: APP_INTERCEPTOR,
-            useClass: MessageInterceptor,
-        },
-    ],
-})
-export class WorkerModule implements OnModuleDestroy, OnApplicationShutdown {
-    constructor(private monitor: WorkerMonitor) {}
-    onModuleDestroy() {
-        return this.monitor.waitForOpenTasksToComplete();
-    }
-
-    onApplicationShutdown(signal?: string) {
-        if (signal) {
-            Logger.info('Worker Received shutdown signal:' + signal);
-        }
-    }
-}

+ 0 - 62
packages/core/src/worker/worker.service.ts

@@ -1,62 +0,0 @@
-import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
-import { ClientProxy } from '@nestjs/microservices';
-import { BehaviorSubject, Observable } from 'rxjs';
-import { filter, finalize, mergeMap, take } from 'rxjs/operators';
-
-import { VENDURE_WORKER_CLIENT } from './constants';
-import { WorkerMessage } from './types';
-
-/**
- * @description
- * This service is responsible for sending messages to the Worker process. See the {@link WorkerMessage}
- * docs for an example of usage.
- *
- * @docsCategory worker
- */
-@Injectable()
-export class WorkerService implements OnModuleDestroy {
-    private pendingConnection = false;
-    private initialConnection = new BehaviorSubject(false);
-    constructor(@Inject(VENDURE_WORKER_CLIENT) private readonly client: ClientProxy) {}
-
-    /**
-     * @description
-     * Sends a {@link WorkerMessage} to the worker process, where there should be a Controller with a method
-     * listening out for the message's pattern.
-     */
-    send<T, R>(message: WorkerMessage<T, R>): Observable<R> {
-        // The rather convoluted logic here is required in order to prevent more than
-        // one connection being opened in the event that the `send` method is called multiple
-        // times in the same event loop tick.
-        // On the first invocation, the first path is taken, which establishes the single
-        // connection (implicit in the first call to ClientProxy.send()). All subsequent
-        // invocations take the second code path.
-        if (!this.pendingConnection && this.initialConnection.value === false) {
-            this.pendingConnection = true;
-            return this.client
-                .send<R, T>((message.constructor as typeof WorkerMessage).pattern, message.data)
-                .pipe(
-                    finalize(() => {
-                        this.initialConnection.next(true);
-                        this.pendingConnection = false;
-                    }),
-                );
-        } else {
-            return this.initialConnection.pipe(
-                filter((val) => val),
-                take(1),
-                mergeMap(() => {
-                    return this.client.send<R, T>(
-                        (message.constructor as typeof WorkerMessage).pattern,
-                        message.data,
-                    );
-                }),
-            );
-        }
-    }
-
-    /** @internal */
-    onModuleDestroy() {
-        this.client.close();
-    }
-}

+ 5 - 1
packages/create/templates/index-worker.hbs

@@ -1,7 +1,11 @@
 {{#if isTs }}import { bootstrapWorker } from '@vendure/core';{{else}}const { bootstrapWorker } = require('@vendure/core');{{/if}}
 {{#if isTs }}import { config } from './vendure-config';{{else}}const { config } = require('./vendure-config');{{/if}}
 
-bootstrapWorker(config).catch(err => {
+bootstrapWorker(config)
+.then(await app => {
+  await app.get(JobQueueService).start();
+})
+.catch(err => {
     // tslint:disable-next-line:no-console
     console.log(err);
 });

+ 5 - 3
packages/create/templates/vendure-config.hbs

@@ -78,14 +78,13 @@ const path = require('path');
         AssetServerPlugin.init({
             route: 'assets',
             assetUploadDir: path.join(__dirname, '../static/assets'),
-            port: 3001,
         }),
         DefaultJobQueuePlugin,
         DefaultSearchPlugin,
         EmailPlugin.init({
             devMode: true,
             outputPath: path.join(__dirname, '../static/email/test-emails'),
-            mailboxPort: 3003,
+            route: 'mailbox',
             handlers: defaultEmailHandlers,
             templatePath: path.join(__dirname, '../static/email/templates'),
             globalTemplateVars: {
@@ -96,7 +95,10 @@ const path = require('path');
                 changeEmailAddressUrl: 'http://localhost:8080/verify-email-address-change'
             },
         }),
-        AdminUiPlugin.init({ port: 3002 }),
+        AdminUiPlugin.init({
+            route: 'admin',
+            port: 3002,
+        }),
     ],
 };
 {{#if isTs}}

+ 2 - 2
packages/dev-server/dev-config.ts

@@ -78,7 +78,6 @@ export const devConfig: VendureConfig = {
         AssetServerPlugin.init({
             route: 'assets',
             assetUploadDir: path.join(__dirname, 'assets'),
-            port: 5002,
         }),
         DefaultSearchPlugin,
         DefaultJobQueuePlugin,
@@ -88,10 +87,10 @@ export const devConfig: VendureConfig = {
         // }),
         EmailPlugin.init({
             devMode: true,
+            route: 'mailbox',
             handlers: defaultEmailHandlers,
             templatePath: path.join(__dirname, '../email-plugin/templates'),
             outputPath: path.join(__dirname, 'test-emails'),
-            mailboxPort: 5003,
             globalTemplateVars: {
                 verifyEmailAddressUrl: 'http://localhost:4201/verify',
                 passwordResetUrl: 'http://localhost:4201/reset-password',
@@ -99,6 +98,7 @@ export const devConfig: VendureConfig = {
             },
         }),
         AdminUiPlugin.init({
+            route: 'admin',
             port: 5001,
         }),
     ],

+ 10 - 6
packages/dev-server/index-worker.ts

@@ -1,4 +1,4 @@
-import { bootstrapWorker } from '@vendure/core';
+import { bootstrapWorker, JobQueueService } from '@vendure/core';
 
 import { devConfig } from './dev-config';
 
@@ -6,8 +6,12 @@ import { devConfig } from './dev-config';
 // fix race condition when modifying DB
 devConfig.dbConnectionOptions = { ...devConfig.dbConnectionOptions, synchronize: false };
 
-bootstrapWorker(devConfig).catch(err => {
-    // tslint:disable-next-line
-    console.log(err);
-    process.exit(1);
-});
+bootstrapWorker(devConfig)
+    .then(app => {
+        app.get(JobQueueService).start();
+    })
+    .catch(err => {
+        // tslint:disable-next-line
+        console.log(err);
+        process.exit(1);
+    });

+ 0 - 3
packages/dev-server/populate-dev-server.ts

@@ -26,9 +26,6 @@ if (require.main === module) {
             importExportOptions: {
                 importAssetsDir: path.join(__dirname, '../core/mock-data/assets'),
             },
-            workerOptions: {
-                runInMainProcess: true,
-            },
             customFields: {},
         }),
     );

+ 7 - 25
packages/dev-server/test-plugins/google-auth/google-auth-plugin.ts

@@ -1,7 +1,6 @@
-import { INestApplication } from '@nestjs/common';
-import { OnVendureBootstrap, OnVendureClose, PluginCommonModule, VendurePlugin } from '@vendure/core';
+import { MiddlewareConsumer, NestModule, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';
+import { PluginCommonModule, VendurePlugin } from '@vendure/core';
 import express from 'express';
-import { Server } from 'http';
 import path from 'path';
 
 import { GoogleAuthenticationStrategy } from './google-authentication-strategy';
@@ -17,14 +16,14 @@ export type GoogleAuthPluginOptions = {
  *
  * Then add this plugin to the dev config.
  *
- * The "storefront" is a simple html file which is served on http://localhost:80,
+ * The "storefront" is a simple html file which is served on http://localhost:3000/google-login,
  * but to get it to work with the Google login button you'll need to resolve it to some
  * public-looking url such as `http://google-login-test.com` by modifying your OS
  * hosts file.
  */
 @VendurePlugin({
     imports: [PluginCommonModule],
-    configuration: (config) => {
+    configuration: config => {
         config.authOptions.shopAuthenticationStrategy = [
             ...config.authOptions.shopAuthenticationStrategy,
             new GoogleAuthenticationStrategy(GoogleAuthPlugin.options.clientId),
@@ -32,32 +31,15 @@ export type GoogleAuthPluginOptions = {
         return config;
     },
 })
-export class GoogleAuthPlugin implements OnVendureBootstrap, OnVendureClose {
+export class GoogleAuthPlugin implements NestModule {
     static options: GoogleAuthPluginOptions;
-    private staticServer: Server;
 
     static init(options: GoogleAuthPluginOptions) {
         this.options = options;
         return GoogleAuthPlugin;
     }
 
-    onVendureBootstrap() {
-        // Set up a static express server to serve the demo login page
-        // from public/index.html.
-        const app = express();
-        app.use(express.static(path.join(__dirname, 'public')));
-        this.staticServer = app.listen(80);
-    }
-
-    onVendureClose(): void | Promise<void> {
-        return new Promise((resolve, reject) => {
-            this.staticServer.close((err) => {
-                if (err) {
-                    reject(err);
-                } else {
-                    resolve();
-                }
-            });
-        });
+    configure(consumer: MiddlewareConsumer) {
+        consumer.apply(express.static(path.join(__dirname, 'public'))).forRoutes('google-login');
     }
 }

+ 6 - 39
packages/dev-server/test-plugins/keycloak-auth/keycloak-auth-plugin.ts

@@ -1,12 +1,6 @@
-import {
-    createProxyHandler,
-    OnVendureBootstrap,
-    OnVendureClose,
-    PluginCommonModule,
-    VendurePlugin,
-} from '@vendure/core';
+import { MiddlewareConsumer, NestModule } from '@nestjs/common';
+import { PluginCommonModule, VendurePlugin } from '@vendure/core';
 import express from 'express';
-import { Server } from 'http';
 import path from 'path';
 
 import { KeycloakAuthenticationStrategy } from './keycloak-authentication-strategy';
@@ -24,43 +18,16 @@ import { KeycloakAuthenticationStrategy } from './keycloak-authentication-strate
  */
 @VendurePlugin({
     imports: [PluginCommonModule],
-    configuration: (config) => {
+    configuration: config => {
         config.authOptions.adminAuthenticationStrategy = [
             ...config.authOptions.adminAuthenticationStrategy,
             new KeycloakAuthenticationStrategy(),
         ];
-        config.apiOptions.middleware.push({
-            handler: createProxyHandler({
-                port: 3042,
-                route: 'keycloak-login',
-                label: 'Keycloak Login',
-                basePath: '',
-            }),
-            route: 'keycloak-login',
-        });
         return config;
     },
 })
-export class KeycloakAuthPlugin implements OnVendureBootstrap, OnVendureClose {
-    private staticServer: Server;
-
-    onVendureBootstrap() {
-        // Set up a static express server to serve the demo login page
-        // from public/index.html.
-        const app = express();
-        app.use(express.static(path.join(__dirname, 'public')));
-        this.staticServer = app.listen(3042);
-    }
-
-    onVendureClose(): void | Promise<void> {
-        return new Promise((resolve, reject) => {
-            this.staticServer.close((err) => {
-                if (err) {
-                    reject(err);
-                } else {
-                    resolve();
-                }
-            });
-        });
+export class KeycloakAuthPlugin implements NestModule {
+    configure(consumer: MiddlewareConsumer) {
+        consumer.apply(express.static(path.join(__dirname, 'public'))).forRoutes('keycloak-login');
     }
 }

+ 2 - 2
packages/elasticsearch-plugin/src/elasticsearch-index.service.ts

@@ -25,8 +25,8 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap {
         private indexerController: ElasticsearchIndexerController,
     ) {}
 
-    onApplicationBootstrap() {
-        this.updateIndexQueue = this.jobService.createQueue({
+    async onApplicationBootstrap() {
+        this.updateIndexQueue = await this.jobService.createQueue({
             name: 'update-search-index',
             process: job => {
                 const data = job.data;

+ 4 - 8
packages/email-plugin/src/dev-mailbox.ts

@@ -12,15 +12,14 @@ import { EmailPluginDevModeOptions, EventWithContext } from './types';
  * An email inbox application that serves the contents of the dev mode `outputPath` directory.
  */
 export class DevMailbox {
-    server: http.Server;
     private handleMockEventFn: (
         handler: EmailEventHandler<string, any>,
         event: EventWithContext,
     ) => void | undefined;
 
     serve(options: EmailPluginDevModeOptions) {
-        const { outputPath, handlers, mailboxPort } = options;
-        const server = express();
+        const { outputPath, handlers } = options;
+        const server = express.Router();
         server.get('/', (req, res) => {
             res.sendFile(path.join(__dirname, '../../dev-mailbox.html'));
         });
@@ -61,17 +60,14 @@ export class DevMailbox {
             const content = await this.getEmail(outputPath, fileName);
             res.send(content);
         });
-        this.server = server.listen(mailboxPort);
+
+        return server;
     }
 
     handleMockEvent(handler: (handler: EmailEventHandler<string, any>, event: EventWithContext) => void) {
         this.handleMockEventFn = handler;
     }
 
-    destroy() {
-        this.server.close();
-    }
-
     private async getEmailList(outputPath: string) {
         const list = await fs.readdir(outputPath);
         const contents: any[] = [];

+ 0 - 2
packages/email-plugin/src/plugin.spec.ts

@@ -9,7 +9,6 @@ import {
     Order,
     OrderStateTransitionEvent,
     PluginCommonModule,
-    ProcessContextModule,
     RequestContext,
     VendureEvent,
 } from '@vendure/core';
@@ -40,7 +39,6 @@ describe('EmailPlugin', () => {
                     type: 'sqljs',
                     retryAttempts: 0,
                 }),
-                ProcessContextModule.forRoot(),
                 PluginCommonModule,
                 EmailPlugin.init({
                     templatePath: path.join(__dirname, '../test-templates'),

+ 15 - 41
packages/email-plugin/src/plugin.ts

@@ -1,29 +1,24 @@
-import { OnApplicationBootstrap } from '@nestjs/common';
+import { MiddlewareConsumer, NestModule, OnApplicationBootstrap } from '@nestjs/common';
 import { ModuleRef } from '@nestjs/core';
 import {
-    createProxyHandler,
     EventBus,
     Injector,
     JobQueue,
     JobQueueService,
     Logger,
-    OnVendureBootstrap,
-    OnVendureClose,
     PluginCommonModule,
-    RuntimeVendureConfig,
     Type,
     VendurePlugin,
 } from '@vendure/core';
 
 import { isDevModeOptions } from './common';
-import { EMAIL_PLUGIN_OPTIONS } from './constants';
+import { EMAIL_PLUGIN_OPTIONS, loggerCtx } from './constants';
 import { DevMailbox } from './dev-mailbox';
 import { EmailProcessor } from './email-processor';
 import { EmailEventHandler, EmailEventHandlerWithAsyncData } from './event-handler';
 import {
     EmailPluginDevModeOptions,
     EmailPluginOptions,
-    EmailWorkerMessage,
     EventWithContext,
     IntermediateEmailDetails,
 } from './types';
@@ -125,13 +120,12 @@ import {
  *   handlers: defaultEmailHandlers,
  *   templatePath: path.join(__dirname, 'vendure/email/templates'),
  *   outputPath: path.join(__dirname, 'test-emails'),
- *   mailboxPort: 5003,
  * })
  * ```
  *
  * ### Dev mailbox
  *
- * In dev mode, specifying the optional `mailboxPort` will start a webmail-like interface available at the `/mailbox` path, e.g.
+ * In dev mode, a webmail-like interface available at the `/mailbox` path, e.g.
  * http://localhost:3000/mailbox. This is a simple way to view the output of all emails generated by the EmailPlugin while in dev mode.
  *
  * ## Troubleshooting SMTP Connections
@@ -168,9 +162,8 @@ import {
 @VendurePlugin({
     imports: [PluginCommonModule],
     providers: [{ provide: EMAIL_PLUGIN_OPTIONS, useFactory: () => EmailPlugin.options }, EmailProcessor],
-    configuration: config => EmailPlugin.configure(config),
 })
-export class EmailPlugin implements OnApplicationBootstrap, OnVendureBootstrap, OnVendureClose {
+export class EmailPlugin implements OnApplicationBootstrap, NestModule {
     private static options: EmailPluginOptions | EmailPluginDevModeOptions;
     private devMailbox: DevMailbox | undefined;
     private jobQueue: JobQueue<IntermediateEmailDetails> | undefined;
@@ -192,29 +185,6 @@ export class EmailPlugin implements OnApplicationBootstrap, OnVendureBootstrap,
         return EmailPlugin;
     }
 
-    /** @internal */
-    static configure(config: RuntimeVendureConfig): RuntimeVendureConfig {
-        if (isDevModeOptions(this.options) && this.options.mailboxPort !== undefined) {
-            const route = 'mailbox';
-            config.apiOptions.middleware.push({
-                handler: createProxyHandler({ port: this.options.mailboxPort, route, label: 'Dev Mailbox' }),
-                route,
-            });
-        }
-        return config;
-    }
-
-    /** @internal */
-    async onVendureBootstrap() {
-        const options = EmailPlugin.options;
-
-        if (isDevModeOptions(options) && options.mailboxPort !== undefined) {
-            this.devMailbox = new DevMailbox();
-            this.devMailbox.serve(options);
-            this.devMailbox.handleMockEvent((handler, event) => this.handleEvent(handler, event));
-        }
-    }
-
     /** @internal */
     async onApplicationBootstrap(): Promise<void> {
         const options = EmailPlugin.options;
@@ -226,7 +196,7 @@ export class EmailPlugin implements OnApplicationBootstrap, OnVendureBootstrap,
             this.testingProcessor = new EmailProcessor(options);
             await this.testingProcessor.init();
         } else {
-            this.jobQueue = this.jobQueueService.createQueue({
+            this.jobQueue = await this.jobQueueService.createQueue({
                 name: 'send-email',
                 process: job => {
                     return this.emailProcessor.process(job.data);
@@ -235,10 +205,14 @@ export class EmailPlugin implements OnApplicationBootstrap, OnVendureBootstrap,
         }
     }
 
-    /** @internal */
-    async onVendureClose() {
-        if (this.devMailbox) {
-            this.devMailbox.destroy();
+    configure(consumer: MiddlewareConsumer) {
+        const options = EmailPlugin.options;
+
+        if (isDevModeOptions(options)) {
+            Logger.info('Creating dev mailbox middleware', loggerCtx);
+            this.devMailbox = new DevMailbox();
+            consumer.apply(this.devMailbox.serve(options)).forRoutes(options.route);
+            this.devMailbox.handleMockEvent((handler, event) => this.handleEvent(handler, event));
         }
     }
 
@@ -254,7 +228,7 @@ export class EmailPlugin implements OnApplicationBootstrap, OnVendureBootstrap,
         handler: EmailEventHandler | EmailEventHandlerWithAsyncData<any>,
         event: EventWithContext,
     ) {
-        Logger.debug(`Handling event "${handler.type}"`, 'EmailPlugin');
+        Logger.debug(`Handling event "${handler.type}"`, loggerCtx);
         const { type } = handler;
         try {
             const injector = new Injector(this.moduleRef);
@@ -272,7 +246,7 @@ export class EmailPlugin implements OnApplicationBootstrap, OnVendureBootstrap,
                 await this.testingProcessor.process(result);
             }
         } catch (e) {
-            Logger.error(e.message, 'EmailPlugin', e.stack);
+            Logger.error(e.message, loggerCtx, e.stack);
         }
     }
 }

+ 3 - 10
packages/email-plugin/src/types.ts

@@ -1,7 +1,6 @@
 import { LanguageCode } from '@vendure/common/lib/generated-types';
 import { Omit } from '@vendure/common/lib/omit';
-import { JsonCompatible } from '@vendure/common/lib/shared-types';
-import { Injector, RequestContext, VendureEvent, WorkerMessage } from '@vendure/core';
+import { Injector, RequestContext, VendureEvent } from '@vendure/core';
 import { Attachment } from 'nodemailer/lib/mailer';
 
 import { EmailEventHandler } from './event-handler';
@@ -93,11 +92,9 @@ export interface EmailPluginDevModeOptions extends Omit<EmailPluginOptions, 'tra
     outputPath: string;
     /**
      * @description
-     * If set, a "mailbox" server will be started which will serve the contents of the
-     * `outputPath` similar to a web-based email client, available at the route `/mailbox`,
-     * e.g. http://localhost:3000/mailbox.
+     * The route to the dev mailbox server.
      */
-    mailboxPort?: number;
+    route: string;
 }
 
 /**
@@ -391,10 +388,6 @@ export type IntermediateEmailDetails = {
     attachments: SerializedAttachment[];
 };
 
-export class EmailWorkerMessage extends WorkerMessage<IntermediateEmailDetails, boolean> {
-    static readonly pattern = 'send-email';
-}
-
 /**
  * @description
  * Configures the {@link EmailEventHandler} to handle a particular channel & languageCode

+ 4 - 1
packages/pub-sub-plugin/src/pub-sub-job-queue-strategy.ts

@@ -57,7 +57,10 @@ export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implement
         });
     }
 
-    start<Data extends JobData<Data> = {}>(queueName: string, process: (job: Job<Data>) => Promise<any>) {
+    async start<Data extends JobData<Data> = {}>(
+        queueName: string,
+        process: (job: Job<Data>) => Promise<any>,
+    ) {
         if (!this.hasInitialized) {
             this.started.set(queueName, process);
             return;

+ 0 - 7
packages/testing/src/config/test-config.ts

@@ -60,11 +60,4 @@ export const testConfig: Required<VendureConfig> = mergeConfig(defaultConfig, {
         assetStorageStrategy: new TestingAssetStorageStrategy(),
         assetPreviewStrategy: new TestingAssetPreviewStrategy(),
     },
-    workerOptions: {
-        runInMainProcess: true,
-        transport: Transport.TCP,
-        options: {
-            port: 3051,
-        },
-    },
 });

+ 7 - 7
packages/testing/src/data-population/populate-for-testing.ts

@@ -1,5 +1,5 @@
 /* tslint:disable:no-console */
-import { INestApplication, INestMicroservice } from '@nestjs/common';
+import { INestApplicationContext } from '@nestjs/common';
 import { LanguageCode } from '@vendure/common/lib/generated-types';
 import { VendureConfig } from '@vendure/core';
 import { importProductsFromCsv, populateCollections, populateInitialData } from '@vendure/core/cli';
@@ -12,17 +12,17 @@ import { populateCustomers } from './populate-customers';
 /**
  * Clears all tables from the database and populates with (deterministic) random data.
  */
-export async function populateForTesting(
+export async function populateForTesting<T extends INestApplicationContext>(
     config: Required<VendureConfig>,
-    bootstrapFn: (config: VendureConfig) => Promise<[INestApplication, INestMicroservice | undefined]>,
+    bootstrapFn: (config: VendureConfig) => Promise<T>,
     options: TestServerOptions,
-): Promise<[INestApplication, INestMicroservice | undefined]> {
+): Promise<T> {
     (config.dbConnectionOptions as any).logging = false;
     const logging = options.logging === undefined ? true : options.logging;
     const originalRequireVerification = config.authOptions.requireVerification;
     config.authOptions.requireVerification = false;
 
-    const [app, worker] = await bootstrapFn(config);
+    const app = await bootstrapFn(config);
 
     const logFn = (message: string) => (logging ? console.log(message) : null);
 
@@ -32,10 +32,10 @@ export async function populateForTesting(
     await populateCustomers(options.customerCount ?? 10, config, logging);
 
     config.authOptions.requireVerification = originalRequireVerification;
-    return [app, worker];
+    return app;
 }
 
-async function populateProducts(app: INestApplication, productsCsvPath: string, logging: boolean) {
+async function populateProducts(app: INestApplicationContext, productsCsvPath: string, logging: boolean) {
     const importResult = await importProductsFromCsv(app, productsCsvPath, LanguageCode.en);
     if (importResult.errors && importResult.errors.length) {
         console.log(`${importResult.errors.length} errors encountered when importing product data:`);

+ 8 - 41
packages/testing/src/test-server.ts

@@ -1,11 +1,7 @@
-import { INestApplication, INestMicroservice } from '@nestjs/common';
+import { INestApplication } from '@nestjs/common';
 import { NestFactory } from '@nestjs/core';
-import { DefaultLogger, Logger, VendureConfig } from '@vendure/core';
-import {
-    preBootstrapConfig,
-    runBeforeBootstrapHooks,
-    runBeforeWorkerBootstrapHooks,
-} from '@vendure/core/dist/bootstrap';
+import { DefaultLogger, JobQueueService, Logger, VendureConfig } from '@vendure/core';
+import { preBootstrapConfig } from '@vendure/core/dist/bootstrap';
 
 import { populateForTesting } from './data-population/populate-for-testing';
 import { getInitializerFor } from './initializers/initializers';
@@ -20,7 +16,6 @@ import { TestServerOptions } from './types';
  */
 export class TestServer {
     public app: INestApplication;
-    public worker?: INestMicroservice;
 
     constructor(private vendureConfig: Required<VendureConfig>) {}
 
@@ -55,16 +50,7 @@ export class TestServer {
      * start and stop a Vendure instance multiple times without re-populating data.
      */
     async bootstrap() {
-        const [app, worker] = await this.bootstrapForTesting(this.vendureConfig);
-        if (app) {
-            this.app = app;
-        } else {
-            console.error(`Could not bootstrap app`);
-            process.exit(1);
-        }
-        if (worker) {
-            this.worker = worker;
-        }
+        this.app = await this.bootstrapForTesting(this.vendureConfig);
     }
 
     /**
@@ -75,9 +61,6 @@ export class TestServer {
     async destroy() {
         // allow a grace period of any outstanding async tasks to complete
         await new Promise(resolve => global.setTimeout(resolve, 500));
-        if (this.worker) {
-            await this.worker.close();
-        }
         await this.app.close();
     }
 
@@ -111,22 +94,17 @@ export class TestServer {
         testingConfig: Required<VendureConfig>,
         options: TestServerOptions,
     ): Promise<void> {
-        const [app, worker] = await populateForTesting(testingConfig, this.bootstrapForTesting, {
+        const app = await populateForTesting(testingConfig, this.bootstrapForTesting, {
             logging: false,
             ...options,
         });
-        if (worker) {
-            await worker.close();
-        }
         await app.close();
     }
 
     /**
      * Bootstraps an instance of the Vendure server for testing against.
      */
-    private async bootstrapForTesting(
-        userConfig: Partial<VendureConfig>,
-    ): Promise<[INestApplication, INestMicroservice | undefined]> {
+    private async bootstrapForTesting(userConfig: Partial<VendureConfig>): Promise<INestApplication> {
         const config = await preBootstrapConfig(userConfig);
         Logger.useLogger(config.logger);
         const appModule = await import('@vendure/core/dist/app.module');
@@ -136,21 +114,10 @@ export class TestServer {
                 cors: config.apiOptions.cors,
                 logger: new Logger(),
             });
-            let worker: INestMicroservice | undefined;
-            await runBeforeBootstrapHooks(config, app);
             await app.listen(config.apiOptions.port);
-            if (config.workerOptions.runInMainProcess) {
-                const workerModule = await import('@vendure/core/dist/worker/worker.module');
-                worker = await NestFactory.createMicroservice(workerModule.WorkerModule, {
-                    transport: config.workerOptions.transport,
-                    logger: new Logger(),
-                    options: config.workerOptions.options,
-                });
-                await runBeforeWorkerBootstrapHooks(config, worker);
-                await worker.listenAsync();
-            }
+            await app.get(JobQueueService).start();
             DefaultLogger.restoreOriginalLogLevel();
-            return [app, worker];
+            return app;
         } catch (e) {
             console.log(e);
             throw e;