Просмотр исходного кода

feat(core): Set up worker architecture based on Nest microservices

Relates to #115
Michael Bromley 6 лет назад
Родитель
Сommit
508bafd9dd
26 измененных файлов с 704 добавлено и 662 удалено
  1. 1 0
      packages/core/package.json
  2. 3 3
      packages/core/src/api/api-internal-modules.ts
  3. 1 1
      packages/core/src/api/api.module.ts
  4. 113 0
      packages/core/src/api/common/request-context.spec.ts
  5. 29 0
      packages/core/src/api/common/request-context.ts
  6. 30 5
      packages/core/src/bootstrap.ts
  7. 1 0
      packages/core/src/config/config.service.mock.ts
  8. 5 0
      packages/core/src/config/config.service.ts
  9. 6 1
      packages/core/src/config/default-config.ts
  10. 6 1
      packages/core/src/config/logger/default-logger.ts
  11. 40 0
      packages/core/src/config/vendure-config.ts
  12. 6 0
      packages/core/src/config/vendure-plugin/vendure-plugin.ts
  13. 1 1
      packages/core/src/data-import/data-import.module.ts
  14. 2 1
      packages/core/src/index.ts
  15. 6 1
      packages/core/src/plugin/default-search-plugin/constants.ts
  16. 7 35
      packages/core/src/plugin/default-search-plugin/default-search-plugin.ts
  17. 0 165
      packages/core/src/plugin/default-search-plugin/indexer/index-builder.ts
  18. 244 0
      packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts
  19. 0 154
      packages/core/src/plugin/default-search-plugin/indexer/ipc.ts
  20. 0 16
      packages/core/src/plugin/default-search-plugin/indexer/search-index-worker.ts
  21. 80 268
      packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts
  22. 55 8
      packages/core/src/plugin/plugin.module.ts
  23. 35 2
      packages/core/src/service/service.module.ts
  24. 1 0
      packages/core/src/worker/constants.ts
  25. 18 0
      packages/core/src/worker/worker.module.ts
  26. 14 0
      yarn.lock

+ 1 - 0
packages/core/package.json

@@ -37,6 +37,7 @@
     "@nestjs/common": "^6.3.1",
     "@nestjs/core": "^6.3.1",
     "@nestjs/graphql": "^6.2.4",
+    "@nestjs/microservices": "^6.3.1",
     "@nestjs/platform-express": "^6.3.1",
     "@nestjs/testing": "^6.3.1",
     "@nestjs/typeorm": "^6.0.0",

+ 3 - 3
packages/core/src/api/api-internal-modules.ts

@@ -105,9 +105,9 @@ export class ApiSharedModule {}
  * The internal module containing the Admin GraphQL API resolvers
  */
 @Module({
-    imports: [ApiSharedModule, PluginModule, ServiceModule, DataImportModule],
+    imports: [ApiSharedModule, PluginModule.forRoot(), ServiceModule.forRoot(), DataImportModule],
     providers: [...adminResolvers, ...entityResolvers, ...adminEntityResolvers, ...PluginModule.adminApiResolvers()],
-    exports: adminResolvers,
+    exports: [...adminResolvers],
 })
 export class AdminApiModule {}
 
@@ -115,7 +115,7 @@ export class AdminApiModule {}
  * The internal module containing the Shop GraphQL API resolvers
  */
 @Module({
-    imports: [ApiSharedModule, PluginModule, ServiceModule],
+    imports: [ApiSharedModule, PluginModule.forRoot(), ServiceModule.forRoot()],
     providers: [...shopResolvers, ...entityResolvers, ...PluginModule.shopApiResolvers()],
     exports: shopResolvers,
 })

+ 1 - 1
packages/core/src/api/api.module.ts

@@ -19,7 +19,7 @@ import { IdInterceptor } from './middleware/id-interceptor';
  */
 @Module({
     imports: [
-        ServiceModule,
+        ServiceModule.forRoot(),
         DataImportModule,
         ApiSharedModule,
         AdminApiModule,

+ 113 - 0
packages/core/src/api/common/request-context.spec.ts

@@ -0,0 +1,113 @@
+import { CurrencyCode, LanguageCode } from '@vendure/common/lib/generated-types';
+
+import { Channel } from '../../entity/channel/channel.entity';
+import { Order } from '../../entity/order/order.entity';
+import { AuthenticatedSession } from '../../entity/session/authenticated-session.entity';
+import { Session } from '../../entity/session/session.entity';
+import { User } from '../../entity/user/user.entity';
+import { Zone } from '../../entity/zone/zone.entity';
+
+import { RequestContext } from './request-context';
+
+describe('RequestContext', () => {
+
+    describe('fromObject()', () => {
+
+        let original: RequestContext;
+        let ctxObject: object;
+        let session: Session;
+        let user: User;
+        let channel: Channel;
+        let activeOrder: Order;
+        let zone: Zone;
+
+        beforeAll(() => {
+            activeOrder = new Order({
+                id: '55555',
+                active: true,
+                code: 'ADAWDJAWD',
+            });
+            user = new User({
+                id: '8833774',
+                verified: true,
+            });
+            session = new AuthenticatedSession({
+                id: '1234',
+                token: '2d37187e9e8fc47807fe4f58ca',
+                activeOrder,
+                user,
+            });
+            zone = new Zone({
+                id: '62626',
+                name: 'Europe',
+            });
+            channel = new Channel({
+                token: 'oiajwodij09au3r',
+                id: '995859',
+                code: '__default_channel__',
+                currencyCode: CurrencyCode.EUR,
+                pricesIncludeTax: true,
+                defaultLanguageCode: LanguageCode.en,
+                defaultShippingZone: zone,
+                defaultTaxZone: zone,
+            });
+            original = new RequestContext({
+                apiType: 'admin',
+                languageCode: LanguageCode.en,
+                channel,
+                session,
+                isAuthorized: true,
+                authorizedAsOwnerOnly: false,
+            });
+
+            ctxObject = JSON.parse(JSON.stringify(original));
+        });
+
+        it('apiType', () => {
+            const result = RequestContext.fromObject(ctxObject);
+            expect(result.apiType).toBe(original.apiType);
+        });
+
+        it('channelId', () => {
+            const result = RequestContext.fromObject(ctxObject);
+            expect(result.channelId).toBe(original.channelId);
+        });
+
+        it('languageCode', () => {
+            const result = RequestContext.fromObject(ctxObject);
+            expect(result.languageCode).toBe(original.languageCode);
+        });
+
+        it('activeUserId', () => {
+            const result = RequestContext.fromObject(ctxObject);
+            expect(result.activeUserId).toBe(original.activeUserId);
+        });
+
+        it('isAuthorized', () => {
+            const result = RequestContext.fromObject(ctxObject);
+            expect(result.isAuthorized).toBe(original.isAuthorized);
+        });
+
+        it('authorizedAsOwnerOnly', () => {
+            const result = RequestContext.fromObject(ctxObject);
+            expect(result.authorizedAsOwnerOnly).toBe(original.authorizedAsOwnerOnly);
+        });
+
+        it('channel', () => {
+            const result = RequestContext.fromObject(ctxObject);
+            expect(result.channel).toEqual(original.channel);
+        });
+
+        it('session', () => {
+            const result = RequestContext.fromObject(ctxObject);
+            expect(result.session).toEqual(original.session);
+        });
+
+        it('activeUser', () => {
+            const result = RequestContext.fromObject(ctxObject);
+            expect(result.activeUser).toEqual(original.activeUser);
+        });
+
+    });
+
+});

+ 29 - 0
packages/core/src/api/common/request-context.ts

@@ -4,6 +4,7 @@ import i18next from 'i18next';
 
 import { DEFAULT_LANGUAGE_CODE } from '../../common/constants';
 import { Channel } from '../../entity/channel/channel.entity';
+import { AnonymousSession } from '../../entity/session/anonymous-session.entity';
 import { AuthenticatedSession } from '../../entity/session/authenticated-session.entity';
 import { Session } from '../../entity/session/session.entity';
 import { User } from '../../entity/user/user.entity';
@@ -49,6 +50,34 @@ export class RequestContext {
         this._translationFn = translationFn || (((key: string) => key) as any);
     }
 
+    /**
+     * @description
+     * Creates a new RequestContext object from a plain object which is the result of
+     * a JSON serialization - deserialization operation.
+     */
+    static fromObject(ctxObject: any): RequestContext {
+        let session: Session | undefined;
+        if (ctxObject._session) {
+            if (ctxObject._session.user) {
+                const user = new User(ctxObject._session.user);
+                session = new AuthenticatedSession({
+                    ...ctxObject._session,
+                    user,
+                });
+            } else {
+                session = new AnonymousSession(ctxObject._session);
+            }
+        }
+        return new RequestContext({
+            apiType: ctxObject._apiType,
+            channel: new Channel(ctxObject._channel),
+            session,
+            languageCode: ctxObject._languageCode,
+            isAuthorized: ctxObject._isAuthorized,
+            authorizedAsOwnerOnly: ctxObject._authorizedAsOwnerOnly,
+        });
+    }
+
     get apiType(): ApiType {
         return this._apiType;
     }

+ 30 - 5
packages/core/src/bootstrap.ts

@@ -1,6 +1,8 @@
-import { INestApplication } from '@nestjs/common';
+import { INestApplication, INestMicroservice } from '@nestjs/common';
 import { NestFactory } from '@nestjs/core';
+import { Transport } from '@nestjs/microservices';
 import { Type } from '@vendure/common/lib/shared-types';
+import { worker } from 'cluster';
 import { EntitySubscriberInterface } from 'typeorm';
 
 import { InternalServerError } from './common/error/errors';
@@ -19,15 +21,15 @@ export type VendureBootstrapFunction = (config: VendureConfig) => Promise<INestA
  */
 export async function bootstrap(userConfig: Partial<VendureConfig>): Promise<INestApplication> {
     const config = await preBootstrapConfig(userConfig);
-    Logger.info(`Bootstrapping Vendure Server...`);
+    Logger.useLogger(config.logger);
+    Logger.info(`Bootstrapping Vendure Server (pid: ${process.pid})...`);
 
     // The AppModule *must* be loaded only after the entities have been set in the
     // config, so that they are available when the AppModule decorator is evaluated.
     // tslint:disable-next-line:whitespace
     const appModule = await import('./app.module');
     DefaultLogger.hideNestBoostrapLogs();
-    let app: INestApplication;
-    app = await NestFactory.create(appModule.AppModule, {
+    const app = await NestFactory.create(appModule.AppModule, {
         cors: config.cors,
         logger: new Logger(),
     });
@@ -35,10 +37,34 @@ export async function bootstrap(userConfig: Partial<VendureConfig>): Promise<INe
     app.useLogger(new Logger());
     await runPluginOnBootstrapMethods(config, app);
     await app.listen(config.port, config.hostname);
+    if (config.workerOptions.runInMainProcess) {
+        await bootstrapWorker(config);
+    }
     logWelcomeMessage(config);
     return app;
 }
 
+export async function bootstrapWorker(userConfig: Partial<VendureConfig>): Promise<INestMicroservice> {
+    const config = await preBootstrapConfig(userConfig);
+    if ((config.logger as any).setDefaultContext) {
+        (config.logger as any).setDefaultContext('Vendure Worker');
+    }
+    Logger.useLogger(config.logger);
+    Logger.info(`Bootstrapping Vendure Worker (pid: ${process.pid})...`);
+
+    const workerModule = await import('./worker/worker.module');
+    DefaultLogger.hideNestBoostrapLogs();
+    const workerApp = await NestFactory.createMicroservice(workerModule.WorkerModule, {
+        transport: config.workerOptions.transport,
+        logger: new Logger(),
+        options: config.workerOptions.options,
+    });
+    DefaultLogger.restoreOriginalLogLevel();
+    workerApp.useLogger(new Logger());
+    await workerApp.listenAsync();
+    return workerApp;
+}
+
 /**
  * Setting the global config must be done prior to loading the AppModule.
  */
@@ -64,7 +90,6 @@ export async function preBootstrapConfig(
     });
 
     let config = getConfig();
-    Logger.useLogger(config.logger);
     config = await runPluginConfigurations(config);
     registerCustomEntityFields(config);
     return config;

+ 1 - 0
packages/core/src/config/config.service.mock.ts

@@ -32,6 +32,7 @@ export class MockConfigService implements MockClass<ConfigService> {
     emailOptions: {};
     importExportOptions: {};
     orderOptions = {};
+    workerOptions = {};
     customFields = {};
     middleware = [];
     logger = {} as any;

+ 5 - 0
packages/core/src/config/config.service.ts

@@ -20,6 +20,7 @@ import {
     ShippingOptions,
     TaxOptions,
     VendureConfig,
+    WorkerOptions,
 } from './vendure-config';
 import { VendurePlugin } from './vendure-plugin/vendure-plugin';
 
@@ -120,4 +121,8 @@ export class ConfigService implements VendureConfig {
     get logger(): VendureLogger {
         return this.activeConfig.logger;
     }
+
+    get workerOptions(): WorkerOptions {
+        return this.activeConfig.workerOptions as Required<WorkerOptions>;
+    }
 }

+ 6 - 1
packages/core/src/config/default-config.ts

@@ -1,5 +1,5 @@
+import { Transport } from '@nestjs/microservices';
 import { LanguageCode } from '@vendure/common/lib/generated-types';
-import { ADMIN_API_PATH, API_PORT } from '@vendure/common/lib/shared-constants';
 import { CustomFields } from '@vendure/common/lib/shared-types';
 
 import { ReadOnlyRequired } from '../common/types/common-types';
@@ -80,6 +80,11 @@ export const defaultConfig: ReadOnlyRequired<VendureConfig> = {
     importExportOptions: {
         importAssetsDir: __dirname,
     },
+    workerOptions: {
+        runInMainProcess: true,
+        transport: Transport.TCP,
+        options: {},
+    },
     customFields: {
         Address: [],
         Collection: [],

+ 6 - 1
packages/core/src/config/logger/default-logger.ts

@@ -26,6 +26,7 @@ export class DefaultLogger implements VendureLogger {
     /** @internal */
     level: LogLevel = LogLevel.Info;
     private readonly timestamp: boolean;
+    private defaultContext = DEFAULT_CONTEXT;
     private readonly localeStringOptions = {
         year: '2-digit',
         hour: 'numeric',
@@ -74,6 +75,10 @@ export class DefaultLogger implements VendureLogger {
         }
     }
 
+    setDefaultContext(defaultContext: string) {
+        this.defaultContext = defaultContext;
+    }
+
     error(message: string, context?: string, trace?: string | undefined): void {
         if (this.level >= LogLevel.Error) {
             this.logMessage(
@@ -131,7 +136,7 @@ export class DefaultLogger implements VendureLogger {
     }
 
     private logContext(context?: string) {
-        return chalk.cyan(`[${context || DEFAULT_CONTEXT}]`);
+        return chalk.cyan(`[${context || this.defaultContext}]`);
     }
 
     private logTimestamp() {

+ 40 - 0
packages/core/src/config/vendure-config.ts

@@ -1,4 +1,5 @@
 import { CorsOptions } from '@nestjs/common/interfaces/external/cors-options.interface';
+import { ClientOptions, Transport } from '@nestjs/microservices';
 import { LanguageCode } from '@vendure/common/lib/generated-types';
 import { CustomFields } from '@vendure/common/lib/shared-types';
 import { RequestHandler } from 'express';
@@ -313,6 +314,40 @@ export interface ImportExportOptions {
     importAssetsDir?: string;
 }
 
+/**
+ * @description
+ * Options related to the Vendure Worker.
+ *
+ * @docsCategory worker
+ */
+export interface WorkerOptions {
+    /**
+     * @description
+     * If set to `true`, the Worker will run be bootstrapped as part of the main Vendure server (when invoking the
+     * `bootstrap()` function) and will run in the same process. This mode is intended only for development and
+     * testing purposes, not for production, since running the Worker in the main process negates the benefits
+     * of having long-running or expensive tasks run in the background.
+     *
+     * @default true
+     */
+    runInMainProcess?: boolean;
+    /**
+     * @description
+     * Sets the transport protocol used to communicate with the Worker. Options include TCP, Redis, gPRC and more. See the
+     * [NestJS microservices documentation](https://docs.nestjs.com/microservices/basics) for a full list.
+     *
+     * @default Transport.TCP
+     */
+    transport?: Transport;
+    /**
+     * @description
+     * Additional options related to the chosen transport method. See See the
+     * [NestJS microservices documentation](https://docs.nestjs.com/microservices/basics) for details on the options relating to each of the
+     * transport methods.
+     */
+    options?: ClientOptions['options'];
+}
+
 /**
  * @description
  * All possible configuration options are defined by the
@@ -464,4 +499,9 @@ export interface VendureConfig {
      * Configures how taxes are calculated on products.
      */
     taxOptions?: TaxOptions;
+    /**
+     * @description
+     * Configures the Vendure Worker, which is used for long-running background tasks.
+     */
+    workerOptions?: WorkerOptions;
 }

+ 6 - 0
packages/core/src/config/vendure-plugin/vendure-plugin.ts

@@ -93,6 +93,12 @@ export interface VendurePlugin {
      */
     defineProviders?(): Provider[];
 
+    /**
+     * @description
+     * The plugin may define providers which are run in the Worker context, i.e. Nest microservice controllers.
+     */
+    defineWorkers?(): Array<Type<any>>;
+
     /**
      * @description
      * The plugin may define custom database entities, which should be defined as classes annotated as per the

+ 1 - 1
packages/core/src/data-import/data-import.module.ts

@@ -12,7 +12,7 @@ import { Populator } from './providers/populator/populator';
     // Important! PluginModule must be defined before ServiceModule
     // in order that overrides of Services (e.g. SearchService) are correctly
     // registered with the injector.
-    imports: [PluginModule, ServiceModule, ConfigModule],
+    imports: [PluginModule.forRoot(), ServiceModule.forRoot(), ConfigModule],
     exports: [ImportParser, Importer, Populator],
     providers: [ImportParser, Importer, Populator],
 })

+ 2 - 1
packages/core/src/index.ts

@@ -1,4 +1,4 @@
-export { bootstrap } from './bootstrap';
+export { bootstrap, bootstrapWorker } from './bootstrap';
 export * from './api/index';
 export * from './common/index';
 export * from './config/index';
@@ -7,6 +7,7 @@ export * from './plugin/index';
 export * from './entity/index';
 export * from './data-import/index';
 export * from './service/index';
+export * from './worker/constants';
 export * from '@vendure/common/lib/shared-types';
 export {
     Permission,

+ 6 - 1
packages/core/src/plugin/default-search-plugin/constants.ts

@@ -1 +1,6 @@
-export const SEARCH_PLUGIN_OPTIONS = Symbol('SEARCH_PLUGIN_OPTIONS');
+export const loggerCtx = 'DefaultSearchPlugin';
+export enum Message {
+    Reindex = 'Reindex',
+    UpdateVariantsById = 'UpdateVariantsById',
+    UpdateProductOrVariant = 'UpdateProductOrVariant',
+}

+ 7 - 35
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -14,9 +14,9 @@ import { CollectionModificationEvent } from '../../event-bus/events/collection-m
 import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
 import { SearchService } from '../../service/services/search.service';
 
-import { SEARCH_PLUGIN_OPTIONS } from './constants';
 import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
 import { FulltextSearchService } from './fulltext-search.service';
+import { IndexerController } from './indexer/indexer.controller';
 import { SearchIndexService } from './indexer/search-index.service';
 import { SearchIndexItem } from './search-index-item.entity';
 
@@ -25,24 +25,6 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
     indexedItemCount: number;
 }
 
-/**
- * @description
- * Options for configuring the DefaultSearchPlugin.
- *
- * @docsCategory DefaultSearchPlugin
- */
-export interface DefaultSearchPluginOptions {
-    /**
-     * @description
-     * By default, the DefaultSearchPlugin will spawn a background process which is responsible
-     * for updating the search index. By setting this option to `false`, indexing will be
-     * performed on the main server process instead. Usually this is undesirable as performance will
-     * be degraded during indexing, but the option is useful for certain debugging and testing scenarios.
-     * @default true
-     */
-    runInForkedProcess: boolean;
-}
-
 /**
  * @description
  * The DefaultSearchPlugin provides a full-text Product search based on the full-text searching capabilities of the
@@ -73,20 +55,6 @@ export interface DefaultSearchPluginOptions {
  * @docsCategory DefaultSearchPlugin
  */
 export class DefaultSearchPlugin implements VendurePlugin {
-    private readonly options: DefaultSearchPluginOptions;
-
-    constructor(options?: DefaultSearchPluginOptions) {
-        const defaultOptions: DefaultSearchPluginOptions = {
-            runInForkedProcess: true,
-        };
-        this.options = { ...defaultOptions, ...options };
-
-        if (process.env[CREATING_VENDURE_APP]) {
-            // For the "create" step we will not run the indexer in a forked process as this
-            // can cause issues with sqlite locking.
-            this.options.runInForkedProcess = false;
-        }
-    }
 
     /** @internal */
     async onBootstrap(inject: <T>(type: Type<T>) => T): Promise<void> {
@@ -106,7 +74,6 @@ export class DefaultSearchPlugin implements VendurePlugin {
                 return searchIndexService.reindex(event.ctx).start();
             }
         });
-        await searchIndexService.connect();
     }
 
     /** @internal */
@@ -146,7 +113,12 @@ export class DefaultSearchPlugin implements VendurePlugin {
             FulltextSearchService,
             SearchIndexService,
             { provide: SearchService, useClass: FulltextSearchService },
-            { provide: SEARCH_PLUGIN_OPTIONS, useFactory: () => this.options },
+        ];
+    }
+
+    defineWorkers(): Array<Type<any>> {
+        return [
+            IndexerController,
         ];
     }
 }

+ 0 - 165
packages/core/src/plugin/default-search-plugin/indexer/index-builder.ts

@@ -1,165 +0,0 @@
-import { ID, Type } from '@vendure/common/lib/shared-types';
-import { unique } from '@vendure/common/lib/unique';
-import { Connection, ConnectionOptions, createConnection, SelectQueryBuilder } from 'typeorm';
-import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
-
-import { RequestContext } from '../../../api/common/request-context';
-import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
-import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
-import { AsyncQueue } from '../async-queue';
-import { SearchIndexItem } from '../search-index-item.entity';
-
-import { CompletedMessage, ConnectedMessage, Message, MessageType, ReturnRawBatchMessage, SaveVariantsPayload, VariantsSavedMessage } from './ipc';
-
-export const BATCH_SIZE = 500;
-export const variantRelations = [
-    'product',
-    'product.featuredAsset',
-    'product.facetValues',
-    'product.facetValues.facet',
-    'featuredAsset',
-    'facetValues',
-    'facetValues.facet',
-    'collections',
-    'taxCategory',
-];
-
-export function getSearchIndexQueryBuilder(connection: Connection) {
-    const qb = connection.getRepository(ProductVariant).createQueryBuilder('variants');
-    FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
-        relations: variantRelations,
-    });
-    FindOptionsUtils.joinEagerRelations(qb, qb.alias, connection.getMetadata(ProductVariant));
-    return qb;
-}
-
-/**
- * This class is responsible for all updates to the search index.
- */
-export class IndexBuilder {
-    private connection: Connection;
-    private indexQueryBuilder: SelectQueryBuilder<ProductVariant>;
-    private onMessageHandlers = new Set<(message: string) => void>();
-    private queue = new AsyncQueue('search-index');
-
-    /**
-     * When running in the main process, it should be constructed with the existing connection.
-     * Otherwise, the connection will be created in the .connect() method in response to an
-     * IPC message.
-     */
-    constructor(connection?: Connection) {
-        if (connection) {
-            this.connection = connection;
-            this.indexQueryBuilder = getSearchIndexQueryBuilder(this.connection);
-        }
-    }
-
-    processMessage(message: Message): Promise<Message | undefined> {
-        switch (message.type) {
-            case MessageType.CONNECTION_OPTIONS: {
-                return this.connect(message.value);
-            }
-            case MessageType.GET_RAW_BATCH: {
-                return this.getRawBatch(message.value.batchNumber);
-            }
-            case MessageType.GET_RAW_BATCH_BY_IDS: {
-                return this.getRawBatchByIds(message.value.ids);
-            }
-            case MessageType.SAVE_VARIANTS: {
-                return this.saveVariants(message.value);
-            }
-            default:
-                return Promise.resolve(undefined);
-        }
-    }
-
-    async processMessageAndEmitResult(message: Message) {
-        const result = await this.processMessage(message);
-        if (result) {
-            result.channelId = message.channelId;
-            this.onMessageHandlers.forEach(handler => {
-                handler(JSON.stringify(result));
-            });
-        }
-    }
-
-    addMessageListener<T extends Message>(handler: (message: string) => void) {
-        this.onMessageHandlers.add(handler);
-    }
-
-    removeMessageListener<T extends Message>(handler: (message: string) => void) {
-        this.onMessageHandlers.delete(handler);
-    }
-
-    private async connect(dbConnectionOptions: ConnectionOptions): Promise<ConnectedMessage> {
-        const {coreEntitiesMap} = await import('../../../entity/entities');
-        const coreEntities = Object.values(coreEntitiesMap) as Array<Type<any>>;
-        this.connection = await createConnection({...dbConnectionOptions, entities: [SearchIndexItem, ...coreEntities], name: 'index-builder' });
-        this.indexQueryBuilder = getSearchIndexQueryBuilder(this.connection);
-        return new ConnectedMessage(this.connection.isConnected);
-    }
-
-    private async getRawBatchByIds(ids: ID[]): Promise<ReturnRawBatchMessage> {
-        const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
-            relations: variantRelations,
-        });
-        return new ReturnRawBatchMessage({variants});
-    }
-
-    private async getRawBatch(batchNumber: string | number): Promise<ReturnRawBatchMessage> {
-        const i = Number.parseInt(batchNumber.toString(), 10);
-        const variants = await this.indexQueryBuilder
-            .where('variants__product.deletedAt IS NULL')
-            .take(BATCH_SIZE)
-            .skip(i * BATCH_SIZE)
-            .getMany();
-
-        return new ReturnRawBatchMessage({variants});
-    }
-
-    private async saveVariants(payload: SaveVariantsPayload): Promise<VariantsSavedMessage | CompletedMessage> {
-        const {variants, ctx, batch, total} = payload;
-        const requestContext = new RequestContext(ctx);
-
-        const items = variants.map((v: ProductVariant) =>
-            new SearchIndexItem({
-                sku: v.sku,
-                enabled: v.enabled,
-                slug: v.product.slug,
-                price: v.price,
-                priceWithTax: v.priceWithTax,
-                languageCode: requestContext.languageCode,
-                productVariantId: v.id,
-                productId: v.product.id,
-                productName: v.product.name,
-                description: v.product.description,
-                productVariantName: v.name,
-                productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
-                productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
-                facetIds: this.getFacetIds(v),
-                facetValueIds: this.getFacetValueIds(v),
-                collectionIds: v.collections.map(c => c.id.toString()),
-            }),
-        );
-        await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items));
-        if (batch === total - 1) {
-            return new CompletedMessage(true);
-        } else {
-            return new VariantsSavedMessage({batchNumber: batch});
-        }
-    }
-
-    private getFacetIds(variant: ProductVariant): string[] {
-        const facetIds = (fv: FacetValue) => fv.facet.id.toString();
-        const variantFacetIds = variant.facetValues.map(facetIds);
-        const productFacetIds = variant.product.facetValues.map(facetIds);
-        return unique([...variantFacetIds, ...productFacetIds]);
-    }
-
-    private getFacetValueIds(variant: ProductVariant): string[] {
-        const facetValueIds = (fv: FacetValue) => fv.id.toString();
-        const variantFacetValueIds = variant.facetValues.map(facetValueIds);
-        const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
-        return unique([...variantFacetValueIds, ...productFacetValueIds]);
-    }
-}

+ 244 - 0
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -0,0 +1,244 @@
+import { Controller } from '@nestjs/common';
+import { MessagePattern } from '@nestjs/microservices';
+import { InjectConnection } from '@nestjs/typeorm';
+import { LanguageCode } from '@vendure/common/lib/generated-types';
+import { ID } from '@vendure/common/lib/shared-types';
+import { unique } from '@vendure/common/lib/unique';
+import { defer, Observable } from 'rxjs';
+import { Connection } from 'typeorm';
+import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
+
+import { RequestContext } from '../../../api/common/request-context';
+import { Logger } from '../../../config/logger/vendure-logger';
+import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
+import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
+import { Product } from '../../../entity/product/product.entity';
+import { translateDeep } from '../../../service/helpers/utils/translate-entity';
+import { ProductVariantService } from '../../../service/services/product-variant.service';
+import { AsyncQueue } from '../async-queue';
+import { loggerCtx, Message } from '../constants';
+import { SearchIndexItem } from '../search-index-item.entity';
+
+export const BATCH_SIZE = 1000;
+export const variantRelations = [
+    'product',
+    'product.featuredAsset',
+    'product.facetValues',
+    'product.facetValues.facet',
+    'featuredAsset',
+    'facetValues',
+    'facetValues.facet',
+    'collections',
+    'taxCategory',
+];
+
+export interface ReindexMessageResponse {
+    total: number;
+    completed: number;
+    duration: number;
+}
+
+@Controller()
+export class IndexerController {
+    private queue = new AsyncQueue('search-index');
+
+    constructor(
+        @InjectConnection() private connection: Connection,
+        private productVariantService: ProductVariantService,
+    ) {
+        console.log('IndexerController tolo bolo');
+    }
+
+    @MessagePattern(Message.Reindex)
+    reindex({ ctx: rawContext }: { ctx: any}): Observable<ReindexMessageResponse> {
+        const ctx = RequestContext.fromObject(rawContext);
+        return new Observable(observer => {
+            (async () => {
+                const timeStart = Date.now();
+                const qb = this.getSearchIndexQueryBuilder();
+                const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
+                Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
+                const batches = Math.ceil(count / BATCH_SIZE);
+
+                await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
+                Logger.verbose('Deleted existing index items', loggerCtx);
+
+                for (let i = 0; i < batches; i++) {
+                    Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
+
+                    const variants = await qb
+                        .where('variants__product.deletedAt IS NULL')
+                        .take(BATCH_SIZE)
+                        .skip(i * BATCH_SIZE)
+                        .getMany();
+                    const hydratedVariants = this.hydrateVariants(ctx, variants);
+                    await this.saveVariants(ctx, hydratedVariants);
+                    observer.next({
+                        total: count,
+                        completed: Math.min((i + 1) * BATCH_SIZE, count),
+                        duration: +new Date() - timeStart,
+                    });
+                }
+                Logger.verbose(`Completed reindexing!`);
+                observer.next({
+                    total: count,
+                    completed: count,
+                    duration: +new Date() - timeStart,
+                });
+                observer.complete();
+            })();
+        });
+    }
+
+    @MessagePattern(Message.UpdateVariantsById)
+    updateVariantsById({ ctx: rawContext, ids }: { ctx: any, ids: ID[] }): Observable<ReindexMessageResponse> {
+        const ctx = RequestContext.fromObject(rawContext);
+
+        return new Observable(observer => {
+            (async () => {
+                const timeStart = Date.now();
+                if (ids.length) {
+                    const batches = Math.ceil(ids.length / BATCH_SIZE);
+                    Logger.verbose(`Updating ${ids.length} variants...`);
+
+                    for (let i = 0; i < batches; i++) {
+                        const begin = i * BATCH_SIZE;
+                        const end = begin + BATCH_SIZE;
+                        Logger.verbose(`Updating ids from index ${begin} to ${end}`);
+                        const batchIds = ids.slice(begin, end);
+                        const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
+                            relations: variantRelations,
+                        });
+                        const variants = this.hydrateVariants(ctx, batch);
+                        await this.saveVariants(ctx, variants);
+                        observer.next({
+                            total: ids.length,
+                            completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
+                            duration: +new Date() - timeStart,
+                        });
+                    }
+                }
+                Logger.verbose(`Completed reindexing!`);
+                observer.next({
+                    total: ids.length,
+                    completed: ids.length,
+                    duration: +new Date() - timeStart,
+                });
+                observer.complete();
+            })();
+        });
+    }
+
+    /**
+     * Updates the search index only for the affected entities.
+     */
+    @MessagePattern(Message.UpdateProductOrVariant)
+    updateProductOrVariant({ ctx: rawContext, productId, variantId }: { ctx: any, productId?: ID, variantId?: ID }): Observable<boolean> {
+        const ctx = RequestContext.fromObject(rawContext);
+        let updatedVariants: ProductVariant[] = [];
+        let removedVariantIds: ID[] = [];
+        return defer(async () => {
+            if (productId) {
+                const product = await this.connection.getRepository(Product).findOne(productId, {
+                    relations: ['variants'],
+                });
+                if (product) {
+                    if (product.deletedAt) {
+                        removedVariantIds = product.variants.map(v => v.id);
+                    } else {
+                        updatedVariants = await this.connection
+                            .getRepository(ProductVariant)
+                            .findByIds(product.variants.map(v => v.id), {
+                                relations: variantRelations,
+                            });
+                        if (product.enabled === false) {
+                            updatedVariants.forEach(v => v.enabled = false);
+                        }
+                    }
+                }
+            } else {
+                const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
+                    relations: variantRelations,
+                });
+                if (variant) {
+                    updatedVariants = [variant];
+                }
+            }
+            Logger.verbose(`Updating ${updatedVariants.length} variants`, loggerCtx);
+            if (updatedVariants.length) {
+                await this.saveVariants(ctx, updatedVariants);
+            }
+            if (removedVariantIds.length) {
+                await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
+            }
+            return true;
+        });
+    }
+
+    private getSearchIndexQueryBuilder() {
+        const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
+        FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
+            relations: variantRelations,
+        });
+        FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
+        return qb;
+    }
+
+    /**
+     * Given an array of ProductVariants, this method applies the correct taxes and translations.
+     */
+    private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
+        return variants
+            .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
+            .map(v => translateDeep(v, ctx.languageCode, ['product']));
+    }
+
+    private async saveVariants(ctx: RequestContext, variants: ProductVariant[]) {
+        const items = variants.map((v: ProductVariant) =>
+            new SearchIndexItem({
+                sku: v.sku,
+                enabled: v.enabled,
+                slug: v.product.slug,
+                price: v.price,
+                priceWithTax: v.priceWithTax,
+                languageCode: ctx.languageCode,
+                productVariantId: v.id,
+                productId: v.product.id,
+                productName: v.product.name,
+                description: v.product.description,
+                productVariantName: v.name,
+                productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
+                productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
+                facetIds: this.getFacetIds(v),
+                facetValueIds: this.getFacetValueIds(v),
+                collectionIds: v.collections.map(c => c.id.toString()),
+            }),
+        );
+        await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items));
+    }
+
+    private getFacetIds(variant: ProductVariant): string[] {
+        const facetIds = (fv: FacetValue) => fv.facet.id.toString();
+        const variantFacetIds = variant.facetValues.map(facetIds);
+        const productFacetIds = variant.product.facetValues.map(facetIds);
+        return unique([...variantFacetIds, ...productFacetIds]);
+    }
+
+    private getFacetValueIds(variant: ProductVariant): string[] {
+        const facetValueIds = (fv: FacetValue) => fv.id.toString();
+        const variantFacetValueIds = variant.facetValues.map(facetValueIds);
+        const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
+        return unique([...variantFacetValueIds, ...productFacetValueIds]);
+    }
+
+    /**
+     * Remove items from the search index
+     */
+    private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
+        const compositeKeys = variantIds.map(id => ({
+            productVariantId: id,
+            languageCode,
+        })) as any[];
+        await this.connection.getRepository(SearchIndexItem).delete(compositeKeys);
+    }
+}

+ 0 - 154
packages/core/src/plugin/default-search-plugin/indexer/ipc.ts

@@ -1,154 +0,0 @@
-import { ID } from '@vendure/common/lib/shared-types';
-import { ChildProcess } from 'child_process';
-import { ConnectionOptions } from 'typeorm';
-
-import { RequestContext } from '../../../api/common/request-context';
-import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
-
-import { IndexBuilder } from './index-builder';
-
-export enum MessageType {
-    CONNECTION_OPTIONS,
-    CONNECTED,
-    GET_RAW_BATCH,
-    GET_RAW_BATCH_BY_IDS,
-    RETURN_RAW_BATCH,
-    SAVE_VARIANTS,
-    VARIANTS_SAVED,
-    COMPLETED,
-}
-
-export interface SaveVariantsPayload {
-    variants: ProductVariant[];
-    ctx: RequestContext;
-    batch: number;
-    total: number;
-}
-
-export interface IPCMessage {
-    type: MessageType;
-    value: any;
-    channelId: string;
-}
-
-export class ConnectionOptionsMessage implements IPCMessage {
-    readonly type = MessageType.CONNECTION_OPTIONS;
-    channelId: string;
-    constructor(public value: ConnectionOptions) {}
-}
-
-export class ConnectedMessage implements IPCMessage {
-    readonly type = MessageType.CONNECTED;
-    channelId: string;
-    constructor(public value: boolean) {}
-}
-
-export class GetRawBatchMessage implements IPCMessage {
-    readonly type = MessageType.GET_RAW_BATCH;
-    channelId: string;
-    constructor(public value: { batchNumber: number; }) {}
-}
-
-export class GetRawBatchByIdsMessage implements IPCMessage {
-    readonly type = MessageType.GET_RAW_BATCH_BY_IDS;
-    channelId: string;
-    constructor(public value: { ids: ID[]; }) {}
-}
-
-export class ReturnRawBatchMessage implements IPCMessage {
-    readonly type = MessageType.RETURN_RAW_BATCH;
-    channelId: string;
-    constructor(public value: { variants: ProductVariant[]; }) {}
-}
-
-export class SaveVariantsMessage implements IPCMessage {
-    readonly type = MessageType.SAVE_VARIANTS;
-    channelId: string;
-    constructor(public value: SaveVariantsPayload) {}
-}
-
-export class VariantsSavedMessage implements IPCMessage {
-    readonly type = MessageType.VARIANTS_SAVED;
-    channelId: string;
-    constructor(public value: { batchNumber: number; }) {}
-}
-
-export class CompletedMessage implements IPCMessage {
-    readonly type = MessageType.COMPLETED;
-    channelId: string;
-    constructor(public value: boolean) {}
-}
-
-export type Message = ConnectionOptionsMessage |
-    ConnectedMessage |
-    GetRawBatchMessage |
-    GetRawBatchByIdsMessage |
-    ReturnRawBatchMessage |
-    SaveVariantsMessage |
-    VariantsSavedMessage |
-    CompletedMessage;
-
-export type MessageOfType<T extends MessageType> = Extract<Message, { type: T }>;
-
-export function sendIPCMessage(target: NodeJS.Process | ChildProcess, message: Message) {
-    // tslint:disable-next-line:no-non-null-assertion
-    target.send!(JSON.stringify(message));
-}
-
-/**
- * An IpcChannel allows safe communication between main thread and worker. It achieves
- * this by adding a unique ID to each outgoing message, which the worker then adds
- * to any responses.
- *
- * If the `target` is an instance of IndexBuilder running on the main process (not in
- * a worker thread), then the channel interacts directly with it, whilst keeping the
- * differences abstracted away from the consuming code.
- */
-export class IpcChannel {
-    private readonly channelId = Math.random().toString(32);
-    private handlers: Array<(m: string) => void> = [];
-    constructor(private readonly target: NodeJS.Process | ChildProcess | IndexBuilder) {}
-
-    /**
-     * Send a message to the worker process.
-     */
-    send(message: Message) {
-        message.channelId = this.channelId;
-        if (this.target instanceof IndexBuilder) {
-            this.target.processMessageAndEmitResult(message);
-        } else {
-            sendIPCMessage(this.target, message);
-        }
-    }
-
-    /**
-     * Subscribes to the given IPC message which is sent from the worker in response to a message
-     * send with the `send()` method.
-     */
-    subscribe<T extends MessageType>(messageType: T, callback: (message: MessageOfType<T>) => void): void {
-        const handler = (messageString: string) => {
-            const message = JSON.parse(messageString) as Message;
-            if (message.type === messageType && message.channelId === this.channelId) {
-                callback(message as MessageOfType<T>);
-            }
-        };
-        if (this.target instanceof IndexBuilder) {
-            this.target.addMessageListener(handler);
-        } else {
-            this.target.on('message', handler);
-        }
-        this.handlers.push(handler);
-    }
-
-    /**
-     * Clean up all event listeners created by subscriptions.
-     */
-    close() {
-        const target = this.target;
-        if (target instanceof IndexBuilder) {
-            this.handlers.forEach(handler => target.removeMessageListener(handler));
-        } else {
-            this.handlers.forEach(handler => target.off('message', handler));
-        }
-    }
-}

+ 0 - 16
packages/core/src/plugin/default-search-plugin/indexer/search-index-worker.ts

@@ -1,16 +0,0 @@
-/* tslint:disable:no-non-null-assertion no-console */
-import { IndexBuilder } from './index-builder';
-import { ConnectionOptionsMessage, GetRawBatchByIdsMessage, GetRawBatchMessage, SaveVariantsMessage, sendIPCMessage } from './ipc';
-
-export type IncomingMessage = ConnectionOptionsMessage | GetRawBatchMessage | GetRawBatchByIdsMessage | SaveVariantsMessage;
-
-const indexBuilder = new IndexBuilder();
-
-process.on('message', async (messageString) => {
-    const message: IncomingMessage = JSON.parse(messageString);
-    const result = await indexBuilder.processMessage(message);
-    if (result) {
-        result.channelId = message.channelId;
-        sendIPCMessage(process, result);
-    }
-});

+ 80 - 268
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -1,11 +1,7 @@
 import { Inject, Injectable } from '@nestjs/common';
+import { ClientProxy } from '@nestjs/microservices';
 import { InjectConnection } from '@nestjs/typeorm';
-import { LanguageCode } from '@vendure/common/lib/generated-types';
-import { pick } from '@vendure/common/lib/pick';
 import { ID } from '@vendure/common/lib/shared-types';
-import { ChildProcess, fork } from 'child_process';
-import fs from 'fs-extra';
-import path from 'path';
 import { Connection } from 'typeorm';
 
 import { RequestContext } from '../../../api/common/request-context';
@@ -14,126 +10,62 @@ import { Logger } from '../../../config/logger/vendure-logger';
 import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
 import { Product } from '../../../entity/product/product.entity';
 import { Job } from '../../../service/helpers/job-manager/job';
-import { translateDeep } from '../../../service/helpers/utils/translate-entity';
 import { JobService } from '../../../service/services/job.service';
 import { ProductVariantService } from '../../../service/services/product-variant.service';
-import { SEARCH_PLUGIN_OPTIONS } from '../constants';
-import { DefaultSearchPluginOptions } from '../default-search-plugin';
-import { SearchIndexItem } from '../search-index-item.entity';
+import { VENDURE_WORKER_CLIENT } from '../../../worker/constants';
+import { Message } from '../constants';
 
-import { BATCH_SIZE, getSearchIndexQueryBuilder, IndexBuilder, variantRelations } from './index-builder';
-import {
-    CompletedMessage,
-    ConnectedMessage,
-    ConnectionOptionsMessage,
-    GetRawBatchByIdsMessage,
-    GetRawBatchMessage,
-    IpcChannel,
-    MessageType,
-    ReturnRawBatchMessage,
-    SaveVariantsMessage,
-    VariantsSavedMessage,
-} from './ipc';
-// This import is needed to ensure that the worker script gets compiled
-// and emitted during build.
-import './search-index-worker';
-
-export type IncomingMessage = ConnectedMessage | ReturnRawBatchMessage | VariantsSavedMessage | CompletedMessage;
-const loggerCtx = 'DefaultSearchPlugin';
+import { ReindexMessageResponse } from './indexer.controller';
 
 /**
- * This service is responsible for all writes to the search index. It works together with the SearchIndexWorker
- * process to perform these often resource-intensive tasks in another thread, which keeps the main
- * server thread responsive.
+ * This service is responsible for messaging the {@link IndexerController} with search index updates.
  */
 @Injectable()
 export class SearchIndexService {
-    private workerProcess: ChildProcess | IndexBuilder;
-    private restartAttempts = 0;
 
     constructor(@InjectConnection() private connection: Connection,
-                @Inject(SEARCH_PLUGIN_OPTIONS) private options: DefaultSearchPluginOptions,
+                @Inject(VENDURE_WORKER_CLIENT) private readonly client: ClientProxy,
                 private productVariantService: ProductVariantService,
                 private jobService: JobService,
                 private configService: ConfigService) {}
 
-    /**
-     * Creates the search index worker process and has it connect to the database.
-     */
-    async connect() {
-        if (this.options.runInForkedProcess && this.configService.dbConnectionOptions.type !== 'sqljs') {
-            try {
-                const workerProcess = this.getChildProcess(path.join(__dirname, 'search-index-worker.ts'));
-                Logger.verbose(`IndexBuilder running as forked process`, loggerCtx);
-                workerProcess.on('error', err => {
-                    Logger.error(`IndexBuilder worker error: ` + err.message, loggerCtx);
-                });
-                workerProcess.on('close', () => {
-                    this.restartAttempts++;
-                    Logger.error(`IndexBuilder worker process died!`, loggerCtx);
-                    if (this.restartAttempts <= 10) {
-                        Logger.error(`Attempting to restart (${this.restartAttempts})...`, loggerCtx);
-                        this.connect();
-                    } else {
-                        Logger.error(`Too many failed restart attempts. Sorry!`);
-                    }
-                });
-                await this.establishConnection(workerProcess);
-                this.workerProcess = workerProcess;
-            } catch (e) {
-                Logger.error(e);
-            }
-
-        } else {
-            this.workerProcess = new IndexBuilder(this.connection);
-            Logger.verbose(`IndexBuilder running in main process`, loggerCtx);
-        }
-    }
-
     reindex(ctx: RequestContext): Job {
         return this.jobService.createJob({
             name: 'reindex',
             singleInstance: true,
             work: async reporter => {
-                const timeStart = Date.now();
-                const qb = getSearchIndexQueryBuilder(this.connection);
-                const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
-                Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
-                const batches = Math.ceil(count / BATCH_SIZE);
-
-                await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
-                Logger.verbose('Deleted existing index items', loggerCtx);
-
-                return new Promise(async (resolve, reject) => {
-                    const ipcChannel = new IpcChannel(this.workerProcess);
-                    ipcChannel.subscribe(MessageType.COMPLETED, message => {
-                        Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`, loggerCtx);
-                        ipcChannel.close();
-                        resolve({
-                            success: true,
-                            indexedItemCount: count,
-                            timeTaken: Date.now() - timeStart,
+                return new Promise((resolve, reject) => {
+                    Logger.verbose(`sending reindex message`);
+                    let total: number | undefined;
+                    let duration = 0;
+                    let completed = 0;
+                    this.client.send<ReindexMessageResponse>(Message.Reindex, { ctx })
+                        .subscribe({
+                            next: response => {
+                                if (!total) {
+                                    total = response.total;
+                                }
+                                duration = response.duration;
+                                completed = response.completed;
+                                const progress = Math.ceil((completed / total) * 100);
+                                reporter.setProgress(progress);
+                            },
+                            complete: () => {
+                                resolve({
+                                    success: true,
+                                    indexedItemCount: total,
+                                    timeTaken: duration,
+                                });
+                            },
+                            error: (err) => {
+                                Logger.error(JSON.stringify(err));
+                                resolve({
+                                    success: false,
+                                    indexedItemCount: 0,
+                                    timeTaken: 0,
+                                });
+                            },
                         });
-                    });
-                    ipcChannel.subscribe(MessageType.VARIANTS_SAVED, message => {
-                        reporter.setProgress(Math.ceil(((message.value.batchNumber + 1) / batches) * 100));
-                        Logger.verbose(`Completed batch ${message.value.batchNumber + 1} of ${batches}`, loggerCtx);
-                    });
-
-                    for (let i = 0; i < batches; i++) {
-                        Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
-
-                        const variants = await this.getBatch(this.workerProcess, i);
-                        const hydratedVariants = this.hydrateVariants(ctx, variants);
-                        Logger.verbose(`variants count: ${variants.length}`);
-
-                        ipcChannel.send(new SaveVariantsMessage({
-                            variants: hydratedVariants,
-                            ctx,
-                            batch: i,
-                            total: batches,
-                        }));
-                    }
                 });
             },
         });
@@ -142,175 +74,55 @@ export class SearchIndexService {
     /**
      * Updates the search index only for the affected entities.
      */
-    async updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
-        let updatedVariants: ProductVariant[] = [];
-        let removedVariantIds: ID[] = [];
+    updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
         if (updatedEntity instanceof Product) {
-            const product = await this.connection.getRepository(Product).findOne(updatedEntity.id, {
-                relations: ['variants'],
-            });
-            if (product) {
-                if (product.deletedAt) {
-                    removedVariantIds = product.variants.map(v => v.id);
-                } else {
-                    updatedVariants = await this.connection
-                        .getRepository(ProductVariant)
-                        .findByIds(product.variants.map(v => v.id), {
-                            relations: variantRelations,
-                        });
-                    if (product.enabled === false) {
-                        updatedVariants.forEach(v => v.enabled = false);
-                    }
-                }
-            }
+            return this.client.send(Message.UpdateProductOrVariant, { ctx, productId: updatedEntity.id })
+                .subscribe({ error: err => Logger.error(err) });
         } else {
-            const variant = await this.connection.getRepository(ProductVariant).findOne(updatedEntity.id, {
-                relations: variantRelations,
-            });
-            if (variant) {
-                updatedVariants = [variant];
-            }
-        }
-
-        if (updatedVariants.length) {
-            await this.saveSearchIndexItems(ctx, updatedVariants);
-        }
-        if (removedVariantIds.length) {
-            await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
+            return this.client.send(Message.UpdateProductOrVariant, { ctx, variantId: updatedEntity.id })
+                .subscribe({ error: err => Logger.error(err) });
         }
     }
 
-    async updateVariantsById(ctx: RequestContext, ids: ID[]) {
-        return new Promise(async resolve => {
-            if (ids.length) {
-                const ipcChannel = new IpcChannel(this.workerProcess);
-                const batches = Math.ceil(ids.length / BATCH_SIZE);
-                Logger.verbose(`Updating ${ids.length} variants...`);
-
-                ipcChannel.subscribe(MessageType.COMPLETED, message => {
-                    Logger.verbose(`Completed updating variants`);
-                    ipcChannel.close();
-                    resolve();
+    updateVariantsById(ctx: RequestContext, ids: ID[]) {
+        return this.jobService.createJob({
+            name: 'update-index',
+            singleInstance: true,
+            work: async reporter => {
+                return new Promise((resolve, reject) => {
+                    Logger.verbose(`sending reindex message`);
+                    let total: number | undefined;
+                    let duration = 0;
+                    let completed = 0;
+                    this.client.send<ReindexMessageResponse>(Message.UpdateVariantsById, { ctx, ids })
+                        .subscribe({
+                            next: response => {
+                                if (!total) {
+                                    total = response.total;
+                                }
+                                duration = response.duration;
+                                completed = response.completed;
+                                const progress = Math.ceil((completed / total) * 100);
+                                reporter.setProgress(progress);
+                            },
+                            complete: () => {
+                                resolve({
+                                    success: true,
+                                    indexedItemCount: total,
+                                    timeTaken: duration,
+                                });
+                            },
+                            error: (err) => {
+                                Logger.error(JSON.stringify(err));
+                                resolve({
+                                    success: false,
+                                    indexedItemCount: 0,
+                                    timeTaken: 0,
+                                });
+                            },
+                        });
                 });
-
-                for (let i = 0; i < batches; i++) {
-                    const begin = i * BATCH_SIZE;
-                    const end = begin + BATCH_SIZE;
-                    Logger.verbose(`Updating ids from index ${begin} to ${end}`);
-                    const batchIds = ids.slice(begin, end);
-                    const batch = await this.getBatchByIds(this.workerProcess, batchIds);
-                    const variants = this.hydrateVariants(ctx, batch);
-
-                    ipcChannel.send(new SaveVariantsMessage({ variants, ctx, batch: i, total: batches }));
-                }
-            } else {
-                resolve();
-            }
-        });
-    }
-
-    /**
-     * Add or update items in the search index
-     */
-    private async saveSearchIndexItems(ctx: RequestContext, variants: ProductVariant[]) {
-        const items = this.hydrateVariants(ctx, variants);
-        Logger.verbose(`Updating search index for ${variants.length} variants`, loggerCtx);
-        return new Promise(resolve => {
-            const ipcChannel = new IpcChannel(this.workerProcess);
-            ipcChannel.subscribe(MessageType.COMPLETED, message => {
-                Logger.verbose(`Done!`, loggerCtx);
-                ipcChannel.close();
-                resolve();
-            });
-            ipcChannel.send(new SaveVariantsMessage({ variants: items, ctx, batch: 0, total: 1 }));
-        });
-    }
-
-    /**
-     * Remove items from the search index
-     */
-    private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
-        const compositeKeys = variantIds.map(id => ({
-            productVariantId: id,
-            languageCode,
-        })) as any[];
-        await this.connection.getRepository(SearchIndexItem).delete(compositeKeys);
-    }
-
-    /**
-     * Given an array of ProductVariants, this method applies the correct taxes and translations.
-     */
-    private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
-        return variants
-            .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
-            .map(v => translateDeep(v, ctx.languageCode, ['product']));
-    }
-
-    /**
-     * Forks a child process based on the given filename. The filename can be a JS or TS file, as this method will attempt to
-     * use either (attempts JS first).
-     */
-    private getChildProcess(filename: string): ChildProcess {
-        const ext = path.extname(filename);
-        const fileWithoutExt = filename.replace(new RegExp(`${ext}$`), '');
-        let error: any;
-        try {
-            const jsFile = fileWithoutExt + '.js';
-            if (fs.existsSync(jsFile)) {
-                return fork(jsFile, [], { execArgv: [] });
-            }
-        } catch (e) {
-            // ignore and try ts
-            error = e;
-        }
-        try {
-            const tsFile = fileWithoutExt + '.ts';
-            if (fs.existsSync(tsFile)) {
-                // Fork the TS file using ts-node. This is useful when running in dev mode or
-                // for e2e tests.
-                return fork(tsFile, [], { execArgv: ['-r', 'ts-node/register'] });
-            }
-        } catch (e) {
-            // ignore and thow at the end.
-            error = e;
-        }
-        throw error;
-    }
-
-    private establishConnection(child: ChildProcess): Promise<boolean> {
-        const connectionOptions = pick(this.configService.dbConnectionOptions as any,
-            ['type', 'name', 'database', 'host', 'port', 'username', 'password', 'location', 'autoSave']);
-        return new Promise(resolve => {
-            const ipcChannel = new IpcChannel(child);
-            ipcChannel.subscribe(MessageType.CONNECTED, message => {
-                Logger.verbose(`IndexBuilder connection result: ${message.value}`, loggerCtx);
-                ipcChannel.close();
-                resolve(message.value);
-            });
-            ipcChannel.send(new ConnectionOptionsMessage(connectionOptions));
-        });
-    }
-
-    private getBatch(child: ChildProcess | IndexBuilder, batch: number): Promise<ProductVariant[]> {
-        return new Promise(resolve => {
-            const ipcChannel = new IpcChannel(child);
-            ipcChannel.subscribe(MessageType.RETURN_RAW_BATCH, message => {
-                ipcChannel.close();
-                resolve(message.value.variants);
-            });
-            ipcChannel.send(new GetRawBatchMessage({ batchNumber: batch }));
-        });
-    }
-
-    private getBatchByIds(child: ChildProcess | IndexBuilder, ids: ID[]): Promise<ProductVariant[]> {
-        return new Promise(resolve => {
-            const ipcChannel = new IpcChannel(child);
-            ipcChannel.subscribe(MessageType.RETURN_RAW_BATCH, message => {
-                ipcChannel.close();
-                resolve(message.value.variants);
-            });
-            ipcChannel.send(new GetRawBatchByIdsMessage({ ids }));
+            },
         });
     }
-
 }

+ 55 - 8
packages/core/src/plugin/plugin.module.ts

@@ -1,27 +1,41 @@
-import { Module } from '@nestjs/common';
+import { DynamicModule, Module } from '@nestjs/common';
+import { ClientProxyFactory, ClientsModule, Transport } from '@nestjs/microservices';
 import { Type } from '@vendure/common/lib/shared-types';
 import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 
 import { getConfig } from '../config/config-helpers';
 import { ConfigModule } from '../config/config.module';
+import { ConfigService } from '../config/config.service';
 import { EventBusModule } from '../event-bus/event-bus.module';
 import { ServiceModule } from '../service/service.module';
+import { VENDURE_WORKER_CLIENT } from '../worker/constants';
 
 import { getPluginAPIExtensions } from './plugin-utils';
 
-const plugins = getConfig().plugins;
-const pluginProviders = plugins
-    .map(p => (p.defineProviders ? p.defineProviders() : undefined))
-    .filter(notNullOrUndefined)
-    .reduce((flattened, providers) => flattened.concat(providers), []);
+const pluginProviders = getPluginProviders();
 
 /**
  * This module collects and re-exports all providers defined in plugins so that they can be used in other
  * modules.
  */
 @Module({
-    imports: [ServiceModule, EventBusModule, ConfigModule],
-    providers: pluginProviders,
+    imports: [
+        EventBusModule,
+        ConfigModule,
+    ],
+    providers: [
+        {
+            provide: VENDURE_WORKER_CLIENT,
+            useFactory: (configService: ConfigService) => {
+                return ClientProxyFactory.create({
+                    transport: configService.workerOptions.transport as any,
+                    options: configService.workerOptions.options as any,
+                });
+            },
+            inject: [ConfigService],
+        },
+        ...pluginProviders,
+    ],
     exports: pluginProviders,
 })
 export class PluginModule {
@@ -32,9 +46,42 @@ export class PluginModule {
     static adminApiResolvers(): Array<Type<any>> {
         return graphQLResolversFor('admin');
     }
+
+    static forRoot(): DynamicModule {
+        return {
+            module: PluginModule,
+            imports: [ServiceModule.forRoot()],
+        };
+    }
+
+    static forWorker(): DynamicModule {
+        const controllers = getWorkerControllers();
+        return {
+            module: PluginModule,
+            imports: [ServiceModule.forWorker()],
+            controllers,
+        };
+    }
+}
+
+function getPluginProviders() {
+    const plugins = getConfig().plugins;
+    return plugins
+        .map(p => (p.defineProviders ? p.defineProviders() : undefined))
+        .filter(notNullOrUndefined)
+        .reduce((flattened, providers) => flattened.concat(providers), []);
+}
+
+function getWorkerControllers() {
+    const plugins = getConfig().plugins;
+    return plugins
+        .map(p => (p.defineWorkers ? p.defineWorkers() : undefined))
+        .filter(notNullOrUndefined)
+        .reduce((flattened, providers) => flattened.concat(providers), []);
 }
 
 function graphQLResolversFor(apiType: 'shop' | 'admin'): Array<Type<any>> {
+    const plugins = getConfig().plugins;
     return getPluginAPIExtensions(plugins, apiType)
         .map(extension => extension.resolvers)
         .reduce((flattened, r) => [...flattened, ...r], []);

+ 35 - 2
packages/core/src/service/service.module.ts

@@ -1,4 +1,4 @@
-import { Module, OnModuleInit } from '@nestjs/common';
+import { DynamicModule, Module, OnModuleInit } from '@nestjs/common';
 import { TypeOrmModule } from '@nestjs/typeorm';
 
 import { getConfig } from '../config/config-helpers';
@@ -81,7 +81,10 @@ const exportedProviders = [
  * into a format suitable for the service layer logic.
  */
 @Module({
-    imports: [ConfigModule, EventBusModule, TypeOrmModule.forRoot(getConfig().dbConnectionOptions)],
+    imports: [
+        ConfigModule,
+        EventBusModule,
+    ],
     providers: [
         ...exportedProviders,
         PasswordCiper,
@@ -123,4 +126,34 @@ export class ServiceModule implements OnModuleInit {
         await this.shippingMethodService.initShippingMethods();
         await this.paymentMethodService.initPaymentMethods();
     }
+
+    static forRoot(): DynamicModule {
+        return {
+            module: ServiceModule,
+            imports: [
+                TypeOrmModule.forRootAsync({
+                    useFactory: () => {
+                        console.log('typeorn forRootAsync');
+                        return getConfig().dbConnectionOptions;
+                    },
+                }),
+            ],
+        };
+    }
+
+    static forWorker(): DynamicModule {
+        const { dbConnectionOptions, workerOptions } = getConfig();
+        const connectionOptions = { ...dbConnectionOptions, name: 'worker' };
+        return {
+            module: ServiceModule,
+            imports: [
+                TypeOrmModule.forRootAsync({
+                    useFactory: () => {
+                        console.log('typeorm forRootAsync Worker');
+                        return { ...connectionOptions, keepConnectionAlive: true };
+                    },
+                }),
+            ],
+        };
+    }
 }

+ 1 - 0
packages/core/src/worker/constants.ts

@@ -0,0 +1 @@
+export const VENDURE_WORKER_CLIENT = Symbol('VENDURE_WORKER_CLIENT');

+ 18 - 0
packages/core/src/worker/worker.module.ts

@@ -0,0 +1,18 @@
+import { Module } from '@nestjs/common';
+
+import { ConfigModule } from '../config/config.module';
+import { PluginModule } from '../plugin/plugin.module';
+import { ServiceModule } from '../service/service.module';
+
+@Module({
+    imports: [
+        ConfigModule,
+        ServiceModule.forWorker(),
+        PluginModule.forWorker(),
+    ],
+})
+export class WorkerModule {
+    constructor() {
+        console.log('Worker module constructed!');
+    }
+}

+ 14 - 0
yarn.lock

@@ -1286,6 +1286,15 @@
   optionalDependencies:
     type-graphql "^0.17.3"
 
+"@nestjs/microservices@^6.3.1":
+  version "6.3.1"
+  resolved "https://registry.npmjs.org/@nestjs/microservices/-/microservices-6.3.1.tgz#b6d080afb61c1c8e50394eb73f5f40a5f606b09b"
+  integrity sha512-vujz9lczXBnkw8H7vY/7V6Je0zcCUvpbgwLNGa5Dm/+SzUKTdSk5qjGgt3TiczXZY2PSuOdsTY0qUFHYK7B5yA==
+  dependencies:
+    iterare "1.1.2"
+    json-socket "0.3.0"
+    optional "0.1.4"
+
 "@nestjs/platform-express@^6.3.1":
   version "6.3.1"
   resolved "https://registry.npmjs.org/@nestjs/platform-express/-/platform-express-6.3.1.tgz#8adb602a5bb9571b9d58646bd52141a26ea8ba3e"
@@ -6830,6 +6839,11 @@ json-schema@0.2.3:
   resolved "https://registry.npmjs.org/json-schema/-/json-schema-0.2.3.tgz#b480c892e59a2f05954ce727bd3f2a4e882f9e13"
   integrity sha1-tIDIkuWaLwWVTOcnvT8qTogvnhM=
 
+json-socket@0.3.0:
+  version "0.3.0"
+  resolved "https://registry.npmjs.org/json-socket/-/json-socket-0.3.0.tgz#f4b953c685bb8e8bd0b72438f5208d9a0799ae07"
+  integrity sha512-jc8ZbUnYIWdxERFWQKVgwSLkGSe+kyzvmYxwNaRgx/c8NNyuHes4UHnPM3LUrAFXUx1BhNJ94n1h/KCRlbvV0g==
+
 json-stable-stringify-without-jsonify@^1.0.1:
   version "1.0.1"
   resolved "https://registry.npmjs.org/json-stable-stringify-without-jsonify/-/json-stable-stringify-without-jsonify-1.0.1.tgz#9db7b59496ad3f3cfef30a75142d2d930ad72651"