Просмотр исходного кода

feat(core): Process all updates to the search index on worker thread

The main server thread only reads from the search index. All updates are now performed via a worker running on another thread.
Michael Bromley 6 лет назад
Родитель
Сommit
fe4064176b

+ 0 - 24
docs/content/docs/plugins/default-search-plugin.md

@@ -1,24 +0,0 @@
----
-title: "DefaultSearchPlugin"
----
-
-# DefaultSearchPlugin
-
-The DefaultSearchPlugin provides a full-text Product search based on the full-text searching capabilities of the underlying database.
-
-The DefaultSearchPlugin is bundled with the `@vendure/core` package. If you are not using an alternative search plugin, then make sure this one is used, otherwise you will not be able to search products via the [`search` query](/docs/graphql-api/shop/queries#search).
-
-```ts
-import { DefaultSearchPlugin } from '@vendure/core';
-
-const config: VendureConfig = {
-  // Add an instance of the plugin to the plugins array
-  plugins: [
-    new DefaultSearchPlugin(),
-  ],
-};
-```
-
-{{% alert "warning" %}}
-Note that the quality of the fulltext search capabilities varies depending on the underlying database being used. For example, the MySQL & Postgres implementations will typically yield better results than the SQLite implementation.
-{{% /alert %}}

+ 1 - 1
packages/core/e2e/default-search-plugin.e2e-spec.ts

@@ -45,7 +45,7 @@ describe('Default search plugin', () => {
                 customerCount: 1,
             },
             {
-                plugins: [new DefaultSearchPlugin()],
+                plugins: [new DefaultSearchPlugin({ runInForkedProcess: false })],
             },
         );
         await adminClient.init();

+ 1 - 0
packages/core/src/plugin/default-search-plugin/constants.ts

@@ -0,0 +1 @@
+export const SEARCH_PLUGIN_OPTIONS = Symbol('SEARCH_PLUGIN_OPTIONS');

+ 69 - 6
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -13,9 +13,10 @@ import { CollectionModificationEvent } from '../../event-bus/events/collection-m
 import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
 import { SearchService } from '../../service/services/search.service';
 
+import { SEARCH_PLUGIN_OPTIONS } from './constants';
 import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
 import { FulltextSearchService } from './fulltext-search.service';
-import { SearchIndexService } from './search-index.service';
+import { SearchIndexService } from './indexer/search-index.service';
 import { SearchIndexItem } from './search-index-item.entity';
 
 export interface DefaultSearchReindexResponse extends SearchReindexResponse {
@@ -23,28 +24,82 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
     indexedItemCount: number;
 }
 
+/**
+ * @description
+ * Options for configuring the DefaultSearchPlugin.
+ *
+ * @docsCategory DefaultSearchPlugin
+ */
+export interface DefaultSearchPluginOptions {
+    /**
+     * @description
+     * By default, the DefaultSearchPlugin will spawn a background process which is responsible
+     * for updating the search index. By setting this option to `false`, indexing will be
+     * performed on the main server process instead. Usually this is undesirable as performance will
+     * be degraded during indexing, but the option is useful for certain debugging and testing scenarios.
+     * @default true
+     */
+    runInForkedProcess: boolean;
+}
+
+/**
+ * @description
+ * The DefaultSearchPlugin provides a full-text Product search based on the full-text searching capabilities of the
+ * underlying database.
+ *
+ * The DefaultSearchPlugin is bundled with the `@vendure/core` package. If you are not using an alternative search
+ * plugin, then make sure this one is used, otherwise you will not be able to search products via the [`search` query](/docs/graphql-api/shop/queries#search).
+ *
+ * @example
+ * ```ts
+ * import { DefaultSearchPlugin } from '@vendure/core';
+ *
+ * const config: VendureConfig = {
+ *   // Add an instance of the plugin to the plugins array
+ *   plugins: [
+ *     new DefaultSearchPlugin(),
+ *   ],
+ * };
+ * ```
+ *
+ * {{% alert "warning" %}}
+ * Note that the quality of the fulltext search capabilities varies depending on the underlying database being used. For example, the MySQL & Postgres implementations will typically yield better results than the SQLite implementation.
+ * {{% /alert %}}
+ *
+ * @docsCategory DefaultSearchPlugin
+ */
 export class DefaultSearchPlugin implements VendurePlugin {
+    private readonly options: DefaultSearchPluginOptions;
+
+    constructor(options?: DefaultSearchPluginOptions) {
+        const defaultOptions: DefaultSearchPluginOptions = {
+            runInForkedProcess: true,
+        };
+        this.options = { ...defaultOptions, ...options };
+    }
+
+    /** @internal */
     async onBootstrap(inject: <T>(type: Type<T>) => T): Promise<void> {
         const eventBus = inject(EventBus);
-        const fulltextSearchService = inject(FulltextSearchService);
         const searchIndexService = inject(SearchIndexService);
         eventBus.subscribe(CatalogModificationEvent, event => {
             if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
-                return fulltextSearchService.updateProductOrVariant(event.ctx, event.entity);
+                return searchIndexService.updateProductOrVariant(event.ctx, event.entity);
             }
         });
         eventBus.subscribe(CollectionModificationEvent, event => {
-            return fulltextSearchService.updateVariantsById(event.ctx, event.productVariantIds);
+            return searchIndexService.updateVariantsById(event.ctx, event.productVariantIds);
         });
         eventBus.subscribe(TaxRateModificationEvent, event => {
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
-                return fulltextSearchService.reindex(event.ctx);
+                return searchIndexService.reindex(event.ctx).start();
             }
         });
         await searchIndexService.connect();
     }
 
+    /** @internal */
     extendAdminAPI(): APIExtensionDefinition {
         return {
             resolvers: [AdminFulltextSearchResolver],
@@ -57,6 +112,7 @@ export class DefaultSearchPlugin implements VendurePlugin {
         };
     }
 
+    /** @internal */
     extendShopAPI(): APIExtensionDefinition {
         return {
             resolvers: [ShopFulltextSearchResolver],
@@ -69,11 +125,18 @@ export class DefaultSearchPlugin implements VendurePlugin {
         };
     }
 
+    /** @internal */
     defineEntities(): Array<Type<any>> {
         return [SearchIndexItem];
     }
 
+    /** @internal */
     defineProviders(): Provider[] {
-        return [FulltextSearchService, SearchIndexService, { provide: SearchService, useClass: FulltextSearchService }];
+        return [
+            FulltextSearchService,
+            SearchIndexService,
+            { provide: SearchService, useClass: FulltextSearchService },
+            { provide: SEARCH_PLUGIN_OPTIONS, useFactory: () => this.options },
+        ];
     }
 }

+ 5 - 142
packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts

@@ -1,25 +1,19 @@
 import { Injectable } from '@nestjs/common';
 import { InjectConnection } from '@nestjs/typeorm';
-import { JobInfo, LanguageCode, SearchInput, SearchResponse } from '@vendure/common/lib/generated-types';
+import { JobInfo, SearchInput, SearchResponse } from '@vendure/common/lib/generated-types';
 import { Omit } from '@vendure/common/lib/omit';
-import { ID } from '@vendure/common/lib/shared-types';
-import { unique } from '@vendure/common/lib/unique';
 import { Connection } from 'typeorm';
 
 import { RequestContext } from '../../api/common/request-context';
 import { InternalServerError } from '../../common/error/errors';
-import { Logger } from '../../config/logger/vendure-logger';
-import { FacetValue, Product, ProductVariant } from '../../entity';
+import { FacetValue } from '../../entity';
 import { EventBus } from '../../event-bus/event-bus';
-import { translateDeep } from '../../service/helpers/utils/translate-entity';
 import { FacetValueService } from '../../service/services/facet-value.service';
 import { JobService } from '../../service/services/job.service';
 import { ProductVariantService } from '../../service/services/product-variant.service';
 import { SearchService } from '../../service/services/search.service';
 
-import { AsyncQueue } from './async-queue';
-import { SearchIndexItem } from './search-index-item.entity';
-import { SearchIndexService } from './search-index.service';
+import { SearchIndexService } from './indexer/search-index.service';
 import { MysqlSearchStrategy } from './search-strategy/mysql-search-strategy';
 import { PostgresSearchStrategy } from './search-strategy/postgres-search-strategy';
 import { SearchStrategy } from './search-strategy/search-strategy';
@@ -31,20 +25,8 @@ import { SqliteSearchStrategy } from './search-strategy/sqlite-search-strategy';
  */
 @Injectable()
 export class FulltextSearchService implements SearchService {
-    private taskQueue = new AsyncQueue('search-service', 1);
     private searchStrategy: SearchStrategy;
     private readonly minTermLength = 2;
-    private readonly variantRelations = [
-        'product',
-        'product.featuredAsset',
-        'product.facetValues',
-        'product.facetValues.facet',
-        'featuredAsset',
-        'facetValues',
-        'facetValues.facet',
-        'collections',
-        'taxCategory',
-    ];
 
     constructor(
         @InjectConnection() private connection: Connection,
@@ -94,74 +76,11 @@ export class FulltextSearchService implements SearchService {
      * Rebuilds the full search index.
      */
     async reindex(ctx: RequestContext): Promise<JobInfo> {
-        const job = this.jobService.startJob('reindex', async reporter => {
-            return this.searchIndexService.reindex(ctx, reporter);
-        });
+        const job = this.searchIndexService.reindex(ctx);
+        job.start();
         return job;
     }
 
-    /**
-     * Updates the search index only for the affected entities.
-     */
-    async updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
-        let updatedVariants: ProductVariant[] = [];
-        let removedVariantIds: ID[] = [];
-        if (updatedEntity instanceof Product) {
-            const product = await this.connection.getRepository(Product).findOne(updatedEntity.id, {
-                relations: ['variants'],
-            });
-            if (product) {
-                if (product.deletedAt) {
-                    removedVariantIds = product.variants.map(v => v.id);
-                } else {
-                    updatedVariants = await this.connection
-                        .getRepository(ProductVariant)
-                        .findByIds(product.variants.map(v => v.id), {
-                            relations: this.variantRelations,
-                        });
-                    if (product.enabled === false) {
-                        updatedVariants.forEach(v => v.enabled = false);
-                    }
-                }
-            }
-        } else {
-            const variant = await this.connection.getRepository(ProductVariant).findOne(updatedEntity.id, {
-                relations: this.variantRelations,
-            });
-            if (variant) {
-                updatedVariants = [variant];
-            }
-        }
-        await this.taskQueue.push(async () => {
-            if (updatedVariants.length) {
-                await this.saveSearchIndexItems(ctx, updatedVariants);
-            }
-            if (removedVariantIds.length) {
-                await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
-            }
-        });
-
-    }
-
-    async updateVariantsById(ctx: RequestContext, ids: ID[]) {
-        if (ids.length) {
-            const BATCH_SIZE = 100;
-            const batches = Math.ceil(ids.length / BATCH_SIZE);
-            for (let i = 0; i < batches; i++) {
-                const begin = i * BATCH_SIZE;
-                const end = begin + BATCH_SIZE;
-                Logger.verbose(`Updating ids from index ${begin} to ${end}`);
-                const batch = ids.slice(begin, end);
-                const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(batch, {
-                    relations: this.variantRelations,
-                });
-                this.taskQueue.push(async () => {
-                    await this.saveSearchIndexItems(ctx, updatedVariants);
-                });
-            }
-        }
-    }
-
     /**
      * Sets the SearchStrategy appropriate to th configured database type.
      */
@@ -182,60 +101,4 @@ export class FulltextSearchService implements SearchService {
                 throw new InternalServerError(`error.database-not-supported-by-default-search-plugin`);
         }
     }
-
-    /**
-     * Add or update items in the search index
-     */
-    private async saveSearchIndexItems(ctx: RequestContext, variants: ProductVariant[]) {
-        const items = variants
-            .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
-            .map(v => translateDeep(v, ctx.languageCode, ['product']))
-            .map(
-                v =>
-                    new SearchIndexItem({
-                        sku: v.sku,
-                        enabled: v.enabled,
-                        slug: v.product.slug,
-                        price: v.price,
-                        priceWithTax: v.priceWithTax,
-                        languageCode: ctx.languageCode,
-                        productVariantId: v.id,
-                        productId: v.product.id,
-                        productName: v.product.name,
-                        description: v.product.description,
-                        productVariantName: v.name,
-                        productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
-                        productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
-                        facetIds: this.getFacetIds(v),
-                        facetValueIds: this.getFacetValueIds(v),
-                        collectionIds: v.collections.map(c => c.id.toString()),
-                    }),
-            );
-        return this.connection.getRepository(SearchIndexItem).save(items);
-    }
-
-    /**
-     * Remove items from the search index
-     */
-    private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
-        const compositeKeys = variantIds.map(id => ({
-            productVariantId: id,
-            languageCode,
-        })) as any[];
-        await this.connection.getRepository(SearchIndexItem).delete(compositeKeys);
-    }
-
-    private getFacetIds(variant: ProductVariant): string[] {
-        const facetIds = (fv: FacetValue) => fv.facet.id.toString();
-        const variantFacetIds = variant.facetValues.map(facetIds);
-        const productFacetIds = variant.product.facetValues.map(facetIds);
-        return unique([...variantFacetIds, ...productFacetIds]);
-    }
-
-    private getFacetValueIds(variant: ProductVariant): string[] {
-        const facetValueIds = (fv: FacetValue) => fv.id.toString();
-        const variantFacetValueIds = variant.facetValues.map(facetValueIds);
-        const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
-        return unique([...variantFacetValueIds, ...productFacetValueIds]);
-    }
 }

+ 163 - 0
packages/core/src/plugin/default-search-plugin/indexer/index-builder.ts

@@ -0,0 +1,163 @@
+import { Connection, ConnectionOptions, createConnection, SelectQueryBuilder } from 'typeorm';
+import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
+
+import { ID, Type } from '../../../../../common/lib/shared-types';
+import { unique } from '../../../../../common/lib/unique';
+import { RequestContext } from '../../../api/common/request-context';
+import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
+import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
+import { SearchIndexItem } from '../search-index-item.entity';
+
+import { CompletedMessage, ConnectedMessage, Message, MessageType, ReturnRawBatchMessage, SaveVariantsPayload, VariantsSavedMessage } from './ipc';
+
+export const BATCH_SIZE = 500;
+export const variantRelations = [
+    'product',
+    'product.featuredAsset',
+    'product.facetValues',
+    'product.facetValues.facet',
+    'featuredAsset',
+    'facetValues',
+    'facetValues.facet',
+    'collections',
+    'taxCategory',
+];
+
+export function getSearchIndexQueryBuilder(connection: Connection) {
+    const qb = connection.getRepository(ProductVariant).createQueryBuilder('variants');
+    FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
+        relations: variantRelations,
+    });
+    FindOptionsUtils.joinEagerRelations(qb, qb.alias, connection.getMetadata(ProductVariant));
+    return qb;
+}
+
+/**
+ * This class is responsible for all updates to the search index.
+ */
+export class IndexBuilder {
+    private connection: Connection;
+    private indexQueryBuilder: SelectQueryBuilder<ProductVariant>;
+    private onMessageHandlers = new Set<(message: string) => void>();
+
+    /**
+     * When running in the main process, it should be constructed with the existing connection.
+     * Otherwise, the connection will be created in the .connect() method in response to an
+     * IPC message.
+     */
+    constructor(connection?: Connection) {
+        if (connection) {
+            this.connection = connection;
+            this.indexQueryBuilder = getSearchIndexQueryBuilder(this.connection);
+        }
+    }
+
+    processMessage(message: Message): Promise<Message | undefined> {
+        switch (message.type) {
+            case MessageType.CONNECTION_OPTIONS: {
+                return this.connect(message.value);
+            }
+            case MessageType.GET_RAW_BATCH: {
+                return this.getRawBatch(message.value.batchNumber);
+            }
+            case MessageType.GET_RAW_BATCH_BY_IDS: {
+                return this.getRawBatchByIds(message.value.ids);
+            }
+            case MessageType.SAVE_VARIANTS: {
+                return this.saveVariants(message.value);
+            }
+            default:
+                return Promise.resolve(undefined);
+        }
+    }
+
+    async processMessageAndEmitResult(message: Message) {
+        const result = await this.processMessage(message);
+        if (result) {
+            result.channelId = message.channelId;
+            this.onMessageHandlers.forEach(handler => {
+                handler(JSON.stringify(result));
+            });
+        }
+    }
+
+    addMessageListener<T extends Message>(handler: (message: string) => void) {
+        this.onMessageHandlers.add(handler);
+    }
+
+    removeMessageListener<T extends Message>(handler: (message: string) => void) {
+        this.onMessageHandlers.delete(handler);
+    }
+
+    private async connect(dbConnectionOptions: ConnectionOptions): Promise<ConnectedMessage> {
+        const {coreEntitiesMap} = await import('../../../entity/entities');
+        const coreEntities = Object.values(coreEntitiesMap) as Array<Type<any>>;
+        this.connection = await createConnection({...dbConnectionOptions, entities: [SearchIndexItem, ...coreEntities]});
+        this.indexQueryBuilder = getSearchIndexQueryBuilder(this.connection);
+        return new ConnectedMessage(this.connection.isConnected);
+    }
+
+    private async getRawBatchByIds(ids: ID[]): Promise<ReturnRawBatchMessage> {
+        const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
+            relations: variantRelations,
+        });
+        return new ReturnRawBatchMessage({variants});
+    }
+
+    private async getRawBatch(batchNumber: string | number): Promise<ReturnRawBatchMessage> {
+        const i = Number.parseInt(batchNumber.toString(), 10);
+        const variants = await this.indexQueryBuilder
+            .where('variants__product.deletedAt IS NULL')
+            .take(BATCH_SIZE)
+            .skip(i * BATCH_SIZE)
+            .getMany();
+
+        return new ReturnRawBatchMessage({variants});
+    }
+
+    private async saveVariants(payload: SaveVariantsPayload): Promise<VariantsSavedMessage | CompletedMessage> {
+        const {variants, ctx, batch, total} = payload;
+        const requestContext = new RequestContext(ctx);
+
+        const items = variants.map((v: ProductVariant) =>
+            new SearchIndexItem({
+                sku: v.sku,
+                enabled: v.enabled,
+                slug: v.product.slug,
+                price: v.price,
+                priceWithTax: v.priceWithTax,
+                languageCode: requestContext.languageCode,
+                productVariantId: v.id,
+                productId: v.product.id,
+                productName: v.product.name,
+                description: v.product.description,
+                productVariantName: v.name,
+                productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
+                productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
+                facetIds: this.getFacetIds(v),
+                facetValueIds: this.getFacetValueIds(v),
+                collectionIds: v.collections.map(c => c.id.toString()),
+            }),
+        );
+        await this.connection.getRepository(SearchIndexItem).save(items);
+        if (batch === total - 1) {
+            return new CompletedMessage(true);
+        } else {
+            return new VariantsSavedMessage({batchNumber: batch});
+        }
+    }
+
+    private getFacetIds(variant: ProductVariant): string[] {
+        const facetIds = (fv: FacetValue) => fv.facet.id.toString();
+        const variantFacetIds = variant.facetValues.map(facetIds);
+        const productFacetIds = variant.product.facetValues.map(facetIds);
+        return unique([...variantFacetIds, ...productFacetIds]);
+    }
+
+    private getFacetValueIds(variant: ProductVariant): string[] {
+        const facetValueIds = (fv: FacetValue) => fv.id.toString();
+        const variantFacetValueIds = variant.facetValues.map(facetValueIds);
+        const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
+        return unique([...variantFacetValueIds, ...productFacetValueIds]);
+    }
+}

+ 154 - 0
packages/core/src/plugin/default-search-plugin/indexer/ipc.ts

@@ -0,0 +1,154 @@
+import { ID } from '@vendure/common/lib/shared-types';
+import { ChildProcess } from 'child_process';
+import { ConnectionOptions } from 'typeorm';
+
+import { RequestContext } from '../../../api/common/request-context';
+import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
+
+import { IndexBuilder } from './index-builder';
+
+export enum MessageType {
+    CONNECTION_OPTIONS,
+    CONNECTED,
+    GET_RAW_BATCH,
+    GET_RAW_BATCH_BY_IDS,
+    RETURN_RAW_BATCH,
+    SAVE_VARIANTS,
+    VARIANTS_SAVED,
+    COMPLETED,
+}
+
+export interface SaveVariantsPayload {
+    variants: ProductVariant[];
+    ctx: RequestContext;
+    batch: number;
+    total: number;
+}
+
+export interface IPCMessage {
+    type: MessageType;
+    value: any;
+    channelId: string;
+}
+
+export class ConnectionOptionsMessage implements IPCMessage {
+    readonly type = MessageType.CONNECTION_OPTIONS;
+    channelId: string;
+    constructor(public value: ConnectionOptions) {}
+}
+
+export class ConnectedMessage implements IPCMessage {
+    readonly type = MessageType.CONNECTED;
+    channelId: string;
+    constructor(public value: boolean) {}
+}
+
+export class GetRawBatchMessage implements IPCMessage {
+    readonly type = MessageType.GET_RAW_BATCH;
+    channelId: string;
+    constructor(public value: { batchNumber: number; }) {}
+}
+
+export class GetRawBatchByIdsMessage implements IPCMessage {
+    readonly type = MessageType.GET_RAW_BATCH_BY_IDS;
+    channelId: string;
+    constructor(public value: { ids: ID[]; }) {}
+}
+
+export class ReturnRawBatchMessage implements IPCMessage {
+    readonly type = MessageType.RETURN_RAW_BATCH;
+    channelId: string;
+    constructor(public value: { variants: ProductVariant[]; }) {}
+}
+
+export class SaveVariantsMessage implements IPCMessage {
+    readonly type = MessageType.SAVE_VARIANTS;
+    channelId: string;
+    constructor(public value: SaveVariantsPayload) {}
+}
+
+export class VariantsSavedMessage implements IPCMessage {
+    readonly type = MessageType.VARIANTS_SAVED;
+    channelId: string;
+    constructor(public value: { batchNumber: number; }) {}
+}
+
+export class CompletedMessage implements IPCMessage {
+    readonly type = MessageType.COMPLETED;
+    channelId: string;
+    constructor(public value: boolean) {}
+}
+
+export type Message = ConnectionOptionsMessage |
+    ConnectedMessage |
+    GetRawBatchMessage |
+    GetRawBatchByIdsMessage |
+    ReturnRawBatchMessage |
+    SaveVariantsMessage |
+    VariantsSavedMessage |
+    CompletedMessage;
+
+export type MessageOfType<T extends MessageType> = Extract<Message, { type: T }>;
+
+export function sendIPCMessage(target: NodeJS.Process | ChildProcess, message: Message) {
+    // tslint:disable-next-line:no-non-null-assertion
+    target.send!(JSON.stringify(message));
+}
+
+/**
+ * An IpcChannel allows safe communication between main thread and worker. It achieves
+ * this by adding a unique ID to each outgoing message, which the worker then adds
+ * to any responses.
+ *
+ * If the `target` is an instance of IndexBuilder running on the main process (not in
+ * a worker thread), then the channel interacts directly with it, whilst keeping the
+ * differences abstracted away from the consuming code.
+ */
+export class IpcChannel {
+    private readonly channelId = Math.random().toString(32);
+    private handlers: Array<(m: string) => void> = [];
+    constructor(private readonly target: NodeJS.Process | ChildProcess | IndexBuilder) {}
+
+    /**
+     * Send a message to the worker process.
+     */
+    send(message: Message) {
+        message.channelId = this.channelId;
+        if (this.target instanceof IndexBuilder) {
+            this.target.processMessageAndEmitResult(message);
+        } else {
+            sendIPCMessage(this.target, message);
+        }
+    }
+
+    /**
+     * Subscribes to the given IPC message which is sent from the worker in response to a message
+     * send with the `send()` method.
+     */
+    subscribe<T extends MessageType>(messageType: T, callback: (message: MessageOfType<T>) => void): void {
+        const handler = (messageString: string) => {
+            const message = JSON.parse(messageString) as Message;
+            if (message.type === messageType && message.channelId === this.channelId) {
+                callback(message as MessageOfType<T>);
+            }
+        };
+        if (this.target instanceof IndexBuilder) {
+            this.target.addMessageListener(handler);
+        } else {
+            this.target.on('message', handler);
+        }
+        this.handlers.push(handler);
+    }
+
+    /**
+     * Clean up all event listeners created by subscriptions.
+     */
+    close() {
+        const target = this.target;
+        if (target instanceof IndexBuilder) {
+            this.handlers.forEach(handler => target.removeMessageListener(handler));
+        } else {
+            this.handlers.forEach(handler => target.off('message', handler));
+        }
+    }
+}

+ 16 - 0
packages/core/src/plugin/default-search-plugin/indexer/search-index-worker.ts

@@ -0,0 +1,16 @@
+/* tslint:disable:no-non-null-assertion no-console */
+import { IndexBuilder } from './index-builder';
+import { ConnectionOptionsMessage, GetRawBatchByIdsMessage, GetRawBatchMessage, SaveVariantsMessage, sendIPCMessage } from './ipc';
+
+export type IncomingMessage = ConnectionOptionsMessage | GetRawBatchMessage | GetRawBatchByIdsMessage | SaveVariantsMessage;
+
+const indexBuilder = new IndexBuilder();
+
+process.on('message', async (messageString) => {
+    const message: IncomingMessage = JSON.parse(messageString);
+    const result = await indexBuilder.processMessage(message);
+    if (result) {
+        result.channelId = message.channelId;
+        sendIPCMessage(process, result);
+    }
+});

+ 312 - 0
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -0,0 +1,312 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { InjectConnection } from '@nestjs/typeorm';
+import { LanguageCode } from '@vendure/common/lib/generated-types';
+import { pick } from '@vendure/common/lib/pick';
+import { ID } from '@vendure/common/lib/shared-types';
+import { ChildProcess, fork } from 'child_process';
+import fs from 'fs-extra';
+import path from 'path';
+import { Connection } from 'typeorm';
+
+import { RequestContext } from '../../../api/common/request-context';
+import { ConfigService } from '../../../config/config.service';
+import { Logger } from '../../../config/logger/vendure-logger';
+import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
+import { Product } from '../../../entity/product/product.entity';
+import { Job } from '../../../service/helpers/job-manager/job';
+import { translateDeep } from '../../../service/helpers/utils/translate-entity';
+import { JobService } from '../../../service/services/job.service';
+import { ProductVariantService } from '../../../service/services/product-variant.service';
+import { SEARCH_PLUGIN_OPTIONS } from '../constants';
+import { DefaultSearchPluginOptions } from '../default-search-plugin';
+import { SearchIndexItem } from '../search-index-item.entity';
+
+import { BATCH_SIZE, getSearchIndexQueryBuilder, IndexBuilder, variantRelations } from './index-builder';
+import {
+    CompletedMessage,
+    ConnectedMessage,
+    ConnectionOptionsMessage,
+    GetRawBatchByIdsMessage,
+    GetRawBatchMessage,
+    IpcChannel,
+    MessageType,
+    ReturnRawBatchMessage,
+    SaveVariantsMessage,
+    VariantsSavedMessage,
+} from './ipc';
+
+export type IncomingMessage = ConnectedMessage | ReturnRawBatchMessage | VariantsSavedMessage | CompletedMessage;
+const loggerCtx = 'DefaultSearchPlugin';
+
+/**
+ * This service is responsible for all writes to the search index. It works together with the SearchIndexWorker
+ * process to perform these often resource-intensive tasks in another thread, which keeps the main
+ * server thread responsive.
+ */
+@Injectable()
+export class SearchIndexService {
+    private workerProcess: ChildProcess | IndexBuilder;
+    private restartAttempts = 0;
+
+    constructor(@InjectConnection() private connection: Connection,
+                @Inject(SEARCH_PLUGIN_OPTIONS) private options: DefaultSearchPluginOptions,
+                private productVariantService: ProductVariantService,
+                private jobService: JobService,
+                private configService: ConfigService) {}
+
+    /**
+     * Creates the search index worker process and has it connect to the database.
+     */
+    async connect() {
+        if (this.options.runInForkedProcess) {
+            try {
+                const workerProcess = this.getChildProcess(path.join(__dirname, 'search-index-worker.ts'));
+                Logger.verbose(`IndexBuilder running as forked process`, loggerCtx);
+                workerProcess.on('error', err => {
+                    Logger.error(`IndexBuilder worker error: ` + err.message, loggerCtx);
+                });
+                workerProcess.on('close', () => {
+                    this.restartAttempts++;
+                    Logger.error(`IndexBuilder worker process died!`, loggerCtx);
+                    if (this.restartAttempts <= 10) {
+                        Logger.error(`Attempting to restart (${this.restartAttempts})...`, loggerCtx);
+                        this.connect();
+                    } else {
+                        Logger.error(`Too many failed restart attempts. Sorry!`);
+                    }
+                });
+                await this.establishConnection(workerProcess);
+                this.workerProcess = workerProcess;
+            } catch (e) {
+                Logger.error(e);
+            }
+
+        } else {
+            this.workerProcess = new IndexBuilder(this.connection);
+            Logger.verbose(`IndexBuilder running in main process`, loggerCtx);
+        }
+    }
+
+    reindex(ctx: RequestContext): Job {
+        return this.jobService.createJob({
+            name: 'reindex',
+            singleInstance: true,
+            work: async reporter => {
+                const timeStart = Date.now();
+                const qb = getSearchIndexQueryBuilder(this.connection);
+                const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
+                Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
+                const batches = Math.ceil(count / BATCH_SIZE);
+
+                await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
+                Logger.verbose('Deleted existing index items', loggerCtx);
+
+                return new Promise(async (resolve, reject) => {
+                    const ipcChannel = new IpcChannel(this.workerProcess);
+                    ipcChannel.subscribe(MessageType.COMPLETED, message => {
+                        Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`, loggerCtx);
+                        ipcChannel.close();
+                        resolve({
+                            success: true,
+                            indexedItemCount: count,
+                            timeTaken: Date.now() - timeStart,
+                        });
+                    });
+                    ipcChannel.subscribe(MessageType.VARIANTS_SAVED, message => {
+                        reporter.setProgress(Math.ceil(((message.value.batchNumber + 1) / batches) * 100));
+                        Logger.verbose(`Completed batch ${message.value.batchNumber + 1} of ${batches}`, loggerCtx);
+                    });
+
+                    for (let i = 0; i < batches; i++) {
+                        Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
+
+                        const variants = await this.getBatch(this.workerProcess, i);
+                        const hydratedVariants = this.hydrateVariants(ctx, variants);
+
+                        ipcChannel.send(new SaveVariantsMessage({
+                            variants: hydratedVariants,
+                            ctx,
+                            batch: i,
+                            total: batches,
+                        }));
+                    }
+                });
+            },
+        });
+    }
+
+    /**
+     * Updates the search index only for the affected entities.
+     */
+    async updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
+        let updatedVariants: ProductVariant[] = [];
+        let removedVariantIds: ID[] = [];
+        if (updatedEntity instanceof Product) {
+            const product = await this.connection.getRepository(Product).findOne(updatedEntity.id, {
+                relations: ['variants'],
+            });
+            if (product) {
+                if (product.deletedAt) {
+                    removedVariantIds = product.variants.map(v => v.id);
+                } else {
+                    updatedVariants = await this.connection
+                        .getRepository(ProductVariant)
+                        .findByIds(product.variants.map(v => v.id), {
+                            relations: variantRelations,
+                        });
+                    if (product.enabled === false) {
+                        updatedVariants.forEach(v => v.enabled = false);
+                    }
+                }
+            }
+        } else {
+            const variant = await this.connection.getRepository(ProductVariant).findOne(updatedEntity.id, {
+                relations: variantRelations,
+            });
+            if (variant) {
+                updatedVariants = [variant];
+            }
+        }
+
+        if (updatedVariants.length) {
+            await this.saveSearchIndexItems(ctx, updatedVariants);
+        }
+        if (removedVariantIds.length) {
+            await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
+        }
+    }
+
+    async updateVariantsById(ctx: RequestContext, ids: ID[]) {
+        return new Promise(async resolve => {
+            if (ids.length) {
+                const ipcChannel = new IpcChannel(this.workerProcess);
+                const batches = Math.ceil(ids.length / BATCH_SIZE);
+                Logger.verbose(`Updating ${ids.length} variants...`);
+
+                ipcChannel.subscribe(MessageType.COMPLETED, message => {
+                    Logger.verbose(`Completed updating variants`);
+                    ipcChannel.close();
+                    resolve();
+                });
+
+                for (let i = 0; i < batches; i++) {
+                    const begin = i * BATCH_SIZE;
+                    const end = begin + BATCH_SIZE;
+                    Logger.verbose(`Updating ids from index ${begin} to ${end}`);
+                    const batchIds = ids.slice(begin, end);
+                    const batch = await this.getBatchByIds(this.workerProcess, batchIds);
+                    const variants = this.hydrateVariants(ctx, batch);
+
+                    ipcChannel.send(new SaveVariantsMessage({ variants, ctx, batch: i, total: batches }));
+                }
+            } else {
+                resolve();
+            }
+        });
+    }
+
+    /**
+     * Add or update items in the search index
+     */
+    private async saveSearchIndexItems(ctx: RequestContext, variants: ProductVariant[]) {
+        const items = this.hydrateVariants(ctx, variants);
+        Logger.verbose(`Updating search index for ${variants.length} variants`, loggerCtx);
+        return new Promise(resolve => {
+            const ipcChannel = new IpcChannel(this.workerProcess);
+            ipcChannel.subscribe(MessageType.COMPLETED, message => {
+                Logger.verbose(`Done!`, loggerCtx);
+                ipcChannel.close();
+                resolve();
+            });
+            ipcChannel.send(new SaveVariantsMessage({ variants: items, ctx, batch: 0, total: 1 }));
+        });
+    }
+
+    /**
+     * Remove items from the search index
+     */
+    private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
+        const compositeKeys = variantIds.map(id => ({
+            productVariantId: id,
+            languageCode,
+        })) as any[];
+        await this.connection.getRepository(SearchIndexItem).delete(compositeKeys);
+    }
+
+    /**
+     * Given an array of ProductVariants, this method applies the correct taxes and translations.
+     */
+    private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
+        return variants
+            .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
+            .map(v => translateDeep(v, ctx.languageCode, ['product']));
+    }
+
+    /**
+     * Forks a child process based on the given filename. The filename can be a JS or TS file, as this method will attempt to
+     * use either (attempts JS first).
+     */
+    private getChildProcess(filename: string): ChildProcess {
+        const ext = path.extname(filename);
+        const fileWithoutExt = filename.replace(new RegExp(`${ext}$`), '');
+        let error: any;
+        try {
+            const jsFile = fileWithoutExt + '.js';
+            if (fs.existsSync(jsFile)) {
+                return fork(jsFile, [], { execArgv: [] });
+            }
+        } catch (e) {
+            // ignore and try ts
+            error = e;
+        }
+        try {
+            const tsFile = fileWithoutExt + '.ts';
+            if (fs.existsSync(tsFile)) {
+                // Fork the TS file using ts-node. This is useful when running in dev mode or
+                // for e2e tests.
+                return fork(tsFile, [], { execArgv: ['-r', 'ts-node/register'] });
+            }
+        } catch (e) {
+            // ignore and thow at the end.
+            error = e;
+        }
+        throw error;
+    }
+
+    private establishConnection(child: ChildProcess): Promise<boolean> {
+        const connectionOptions = pick(this.configService.dbConnectionOptions as any,
+            ['type', 'name', 'database', 'host', 'port', 'username', 'password']);
+        return new Promise(resolve => {
+            const ipcChannel = new IpcChannel(child);
+            ipcChannel.subscribe(MessageType.CONNECTED, message => {
+                Logger.verbose(`IndexBuilder connection result: ${message.value}`, loggerCtx);
+                ipcChannel.close();
+                resolve(message.value);
+            });
+            ipcChannel.send(new ConnectionOptionsMessage(connectionOptions));
+        });
+    }
+
+    private getBatch(child: ChildProcess | IndexBuilder, batch: number): Promise<ProductVariant[]> {
+        return new Promise(resolve => {
+            const ipcChannel = new IpcChannel(child);
+            ipcChannel.subscribe(MessageType.RETURN_RAW_BATCH, message => {
+                ipcChannel.close();
+                resolve(message.value.variants);
+            });
+            ipcChannel.send(new GetRawBatchMessage({ batchNumber: batch }));
+        });
+    }
+
+    private getBatchByIds(child: ChildProcess | IndexBuilder, ids: ID[]): Promise<ProductVariant[]> {
+        return new Promise(resolve => {
+            const ipcChannel = new IpcChannel(child);
+            ipcChannel.subscribe(MessageType.RETURN_RAW_BATCH, message => {
+                ipcChannel.close();
+                resolve(message.value.variants);
+            });
+            ipcChannel.send(new GetRawBatchByIdsMessage({ ids }));
+        });
+    }
+
+}

+ 0 - 76
packages/core/src/plugin/default-search-plugin/ipc.ts

@@ -1,76 +0,0 @@
-import { ChildProcess } from 'child_process';
-import { ConnectionOptions } from 'typeorm';
-
-import { RequestContext } from '../../api/common/request-context';
-import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
-
-export enum MessageType {
-    CONNECTION_OPTIONS,
-    CONNECTED,
-    GET_RAW_BATCH,
-    RETURN_RAW_BATCH,
-    SAVE_VARIANTS,
-    VARIANTS_SAVED,
-    COMPLETED,
-}
-
-export interface SaveVariantsPayload {
-    variants: ProductVariant[];
-    ctx: RequestContext;
-    batch: number;
-    total: number;
-}
-
-export interface IPCMessage {
-    type: MessageType;
-}
-
-export class ConnectionOptionsMessage implements IPCMessage {
-    readonly type = MessageType.CONNECTION_OPTIONS;
-    constructor(public value: ConnectionOptions) {}
-}
-
-export class ConnectedMessage implements IPCMessage {
-    readonly type = MessageType.CONNECTED;
-    constructor(public value: boolean) {}
-}
-
-export class GetRawBatchMessage implements IPCMessage {
-    readonly type = MessageType.GET_RAW_BATCH;
-    constructor(public value: { batchNumber: number; }) {}
-}
-
-export class ReturnRawBatchMessage implements IPCMessage {
-    readonly type = MessageType.RETURN_RAW_BATCH;
-    constructor(public value: { variants: ProductVariant[]; }) {}
-}
-
-export class SaveVariantsMessage implements IPCMessage {
-    readonly type = MessageType.SAVE_VARIANTS;
-    constructor(public value: SaveVariantsPayload) {}
-}
-
-export class VariantsSavedMessage implements IPCMessage {
-    readonly type = MessageType.VARIANTS_SAVED;
-    constructor(public value: { batchNumber: number; }) {}
-}
-
-export class CompletedMessage implements IPCMessage {
-    readonly type = MessageType.COMPLETED;
-    constructor(public value: boolean) {}
-}
-
-export type Message = ConnectionOptionsMessage |
-    ConnectedMessage |
-    GetRawBatchMessage |
-    ReturnRawBatchMessage |
-    SaveVariantsMessage |
-    VariantsSavedMessage |
-    CompletedMessage;
-
-export type MessageOfType<T extends MessageType> = Extract<Message, { type: T }>;
-
-export function sendIPCMessage(target: NodeJS.Process | ChildProcess, message: Message) {
-    // tslint:disable-next-line:no-non-null-assertion
-    target.send!(JSON.stringify({ type: message.type, value: message.value }));
-}

+ 0 - 133
packages/core/src/plugin/default-search-plugin/search-index-worker.ts

@@ -1,133 +0,0 @@
-/* tslint:disable:no-non-null-assertion no-console */
-import { Type } from '@vendure/common/lib/shared-types';
-import { unique } from '@vendure/common/lib/unique';
-import { Connection, ConnectionOptions, createConnection, SelectQueryBuilder } from 'typeorm';
-import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
-
-import { RequestContext } from '../../api/common/request-context';
-import { FacetValue } from '../../entity/facet-value/facet-value.entity';
-import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
-
-import {
-    CompletedMessage,
-    ConnectedMessage,
-    ConnectionOptionsMessage,
-    GetRawBatchMessage,
-    MessageType,
-    ReturnRawBatchMessage,
-    SaveVariantsMessage,
-    SaveVariantsPayload,
-    sendIPCMessage,
-    VariantsSavedMessage,
-} from './ipc';
-import { SearchIndexItem } from './search-index-item.entity';
-
-export const BATCH_SIZE = 100;
-export const variantRelations = [
-    'product',
-    'product.featuredAsset',
-    'product.facetValues',
-    'product.facetValues.facet',
-    'featuredAsset',
-    'facetValues',
-    'facetValues.facet',
-    'collections',
-    'taxCategory',
-];
-
-export type IncomingMessage = ConnectionOptionsMessage | GetRawBatchMessage | SaveVariantsMessage;
-
-export class SearchIndexWorker {
-
-    private connection: Connection;
-    private indexQueryBuilder: SelectQueryBuilder<ProductVariant>;
-
-    async connect(dbConnectionOptions: ConnectionOptions) {
-        const { coreEntitiesMap } = await import('../../entity/entities');
-        const coreEntities = Object.values(coreEntitiesMap) as Array<Type<any>>;
-        this.connection = await createConnection({ ...dbConnectionOptions, entities: [SearchIndexItem, ...coreEntities] });
-
-        this.indexQueryBuilder = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
-        FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(this.indexQueryBuilder, {
-            relations: variantRelations,
-        });
-        FindOptionsUtils.joinEagerRelations(this.indexQueryBuilder, this.indexQueryBuilder.alias, this.connection.getMetadata(ProductVariant));
-
-        sendIPCMessage(process, new ConnectedMessage(this.connection.isConnected));
-    }
-
-    async getRawBatch(batchNumber: string | number) {
-        const i = Number.parseInt(batchNumber.toString(), 10);
-        const variants = await this.indexQueryBuilder
-            .where('variants__product.deletedAt IS NULL')
-            .take(BATCH_SIZE)
-            .skip(i * BATCH_SIZE)
-            .getMany();
-
-        sendIPCMessage(process, new ReturnRawBatchMessage({variants}));
-    }
-
-    async saveVariants(payload: SaveVariantsPayload) {
-        const { variants, ctx, batch, total } = payload;
-        const requestContext = new RequestContext(ctx);
-
-        const items = variants.map((v: ProductVariant) =>
-            new SearchIndexItem({
-                sku: v.sku,
-                enabled: v.enabled,
-                slug: v.product.slug,
-                price: v.price,
-                priceWithTax: v.priceWithTax,
-                languageCode: requestContext.languageCode,
-                productVariantId: v.id,
-                productId: v.product.id,
-                productName: v.product.name,
-                description: v.product.description,
-                productVariantName: v.name,
-                productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
-                productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
-                facetIds: this.getFacetIds(v),
-                facetValueIds: this.getFacetValueIds(v),
-                collectionIds: v.collections.map(c => c.id.toString()),
-            }),
-        );
-        await this.connection.getRepository(SearchIndexItem).save(items);
-        sendIPCMessage(process, new VariantsSavedMessage({batchNumber: batch}));
-        if (batch === total - 1) {
-            sendIPCMessage(process, new CompletedMessage(true));
-        }
-    }
-
-    private getFacetIds(variant: ProductVariant): string[] {
-        const facetIds = (fv: FacetValue) => fv.facet.id.toString();
-        const variantFacetIds = variant.facetValues.map(facetIds);
-        const productFacetIds = variant.product.facetValues.map(facetIds);
-        return unique([...variantFacetIds, ...productFacetIds]);
-    }
-
-    private getFacetValueIds(variant: ProductVariant): string[] {
-        const facetValueIds = (fv: FacetValue) => fv.id.toString();
-        const variantFacetValueIds = variant.facetValues.map(facetValueIds);
-        const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
-        return unique([...variantFacetValueIds, ...productFacetValueIds]);
-    }
-}
-
-const worker = new SearchIndexWorker();
-
-process.on('message', (messageString) => {
-    const message: IncomingMessage = JSON.parse(messageString);
-    switch (message.type) {
-        case MessageType.CONNECTION_OPTIONS:
-            worker.connect(message.value);
-            break;
-        case MessageType.GET_RAW_BATCH:
-            worker.getRawBatch(message.value.batchNumber);
-            break;
-        case MessageType.SAVE_VARIANTS:
-            worker.saveVariants(message.value);
-            break;
-        default:
-            // ignore
-    }
-});

+ 0 - 173
packages/core/src/plugin/default-search-plugin/search-index.service.ts

@@ -1,173 +0,0 @@
-import { Injectable } from '@nestjs/common';
-import { InjectConnection } from '@nestjs/typeorm';
-import { pick } from '@vendure/common/lib/pick';
-import { ChildProcess, fork } from 'child_process';
-import fs from 'fs-extra';
-import path from 'path';
-import { Connection } from 'typeorm';
-import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
-
-import { RequestContext } from '../../api/common/request-context';
-import { ConfigService } from '../../config/config.service';
-import { Logger } from '../../config/logger/vendure-logger';
-import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
-import { JobReporter } from '../../service/helpers/job-manager/job-manager';
-import { translateDeep } from '../../service/helpers/utils/translate-entity';
-import { ProductVariantService } from '../../service/services/product-variant.service';
-
-import {
-    CompletedMessage,
-    ConnectedMessage,
-    ConnectionOptionsMessage,
-    GetRawBatchMessage,
-    MessageOfType,
-    MessageType,
-    ReturnRawBatchMessage,
-    SaveVariantsMessage,
-    sendIPCMessage,
-    VariantsSavedMessage,
-} from './ipc';
-import { SearchIndexItem } from './search-index-item.entity';
-import { BATCH_SIZE, variantRelations } from './search-index-worker';
-
-export type IncomingMessage = ConnectedMessage | ReturnRawBatchMessage | VariantsSavedMessage | CompletedMessage;
-
-@Injectable()
-export class SearchIndexService {
-    private workerProcess: ChildProcess;
-
-    constructor(@InjectConnection() private connection: Connection,
-                private productVariantService: ProductVariantService,
-                private configService: ConfigService) {}
-
-    /**
-     * Creates the search index worker process and has it connect to the database.
-     */
-    async connect() {
-        try {
-            this.workerProcess = this.getChildProcess(path.join(__dirname, 'search-index-worker.ts'));
-        } catch (e) {
-            Logger.error(e);
-        }
-        Logger.verbose(`Created search index worker process`, 'DefaultSearchPlugin');
-        this.workerProcess.on('error', err => `Search index worker error: ` + err.message);
-        await this.establishConnection(this.workerProcess);
-    }
-
-    async reindex(ctx: RequestContext, reporter: JobReporter) {
-        const timeStart = Date.now();
-        Logger.verbose('Reindexing search index...', 'DefaultSearchPlugin');
-        const qb = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
-        FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
-            relations: variantRelations,
-        });
-        FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
-        const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
-        Logger.verbose(`Getting ${count} variants`, 'DefaultSearchPlugin');
-        const batches = Math.ceil(count / BATCH_SIZE);
-
-        Logger.verbose('Deleting existing index items...', 'DefaultSearchPlugin');
-        await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
-        Logger.verbose('Deleted!', 'DefaultSearchPlugin');
-
-        return new Promise(async (resolve, reject) => {
-            this.subscribe(MessageType.COMPLETED, (message, unsubscribe) => {
-                Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`, 'DefaultSearchPlugin');
-                unsubscribe();
-                unsubscribeProgress();
-                resolve({
-                    success: true,
-                    indexedItemCount: count,
-                    timeTaken: Date.now() - timeStart,
-                });
-            });
-            const unsubscribeProgress = this.subscribe(MessageType.VARIANTS_SAVED, (message, unsubscribe) => {
-                reporter.setProgress(Math.ceil(((message.value.batchNumber + 1) / batches) * 100));
-                Logger.verbose(`Completed batch ${message.value.batchNumber + 1} of ${batches}`, 'DefaultSearchPlugin');
-            });
-
-            for (let i = 0; i < batches; i++) {
-                Logger.verbose(`Processing batch ${i + 1} of ${batches}`, 'DefaultSearchPlugin');
-
-                const variants = await this.getBatch(this.workerProcess, i);
-                const items = variants
-                    .map((v: any) => this.productVariantService.applyChannelPriceAndTax(v, ctx))
-                    .map((v: any) => translateDeep(v, ctx.languageCode, ['product']));
-
-                sendIPCMessage(this.workerProcess, new SaveVariantsMessage({ variants: items, ctx, batch: i, total: batches }));
-            }
-        });
-    }
-
-    /**
-     * Forks a child process based on the given filename. The filename can be a JS or TS file, as this method will attempt to
-     * use either (attempts JS first).
-     */
-    private getChildProcess(filename: string): ChildProcess {
-        const ext = path.extname(filename);
-        const fileWithoutExt = filename.replace(new RegExp(`${ext}$`), '');
-        let error: any;
-        try {
-            const jsFile = fileWithoutExt + '.js';
-            if (fs.existsSync(jsFile)) {
-                return fork(jsFile, [], { execArgv: [] });
-            }
-        } catch (e) {
-            // ignore and try ts
-            error = e;
-        }
-        try {
-            const tsFile = fileWithoutExt + '.ts';
-            if (fs.existsSync(tsFile)) {
-                // Fork the TS file using ts-node. This is useful when running in dev mode or
-                // for e2e tests.
-                return fork(tsFile, [], { execArgv: ['-r', 'ts-node/register'] });
-            }
-        } catch (e) {
-            // ignore and thow at the end.
-            error = e;
-        }
-        throw error;
-    }
-
-    establishConnection(child: ChildProcess): Promise<boolean> {
-        const connectionOptions = pick(this.configService.dbConnectionOptions as any,
-            ['type', 'name', 'database', 'host', 'port', 'username', 'password']);
-        return new Promise(resolve => {
-            sendIPCMessage(child, new ConnectionOptionsMessage(connectionOptions));
-            this.subscribe(MessageType.CONNECTED, (message, unsubscribe) => {
-                Logger.verbose(`Connection result: ${message.value}`, 'DefaultSearchPlugin');
-                unsubscribe();
-                resolve(message.value);
-            });
-        });
-    }
-
-    getBatch(child: ChildProcess, batch: number): Promise<ProductVariant[]> {
-        return new Promise(resolve => {
-            sendIPCMessage(child, new GetRawBatchMessage({ batchNumber: batch }));
-            this.subscribe(MessageType.RETURN_RAW_BATCH, (message, unsubscribe) => {
-                unsubscribe();
-                resolve(message.value.variants);
-            });
-        });
-    }
-
-    /**
-     * Subscribes to the given IPC message and executes the callback when the message is received. Returns an unsubscribe
-     * function which should be called to clean up the event listener. Alternatively, if only the first such event is
-     * important, call the `unsubscribe` function which is passed to the handler as the second argument.
-     */
-    private subscribe<T extends MessageType>(messageType: T, callback: (message: MessageOfType<T>, unsubscribe: () => void) => any): () => void {
-        const handler = (messageString: string) => {
-            const message = JSON.parse(messageString) as IncomingMessage;
-            if (message.type === messageType) {
-                callback(message as MessageOfType<T>, unsubscribe);
-            }
-        };
-        const unsubscribe = () =>  this.workerProcess.off('message', handler);
-        this.workerProcess.on('message', handler);
-        return unsubscribe;
-    }
-
-}

+ 35 - 17
packages/core/src/service/helpers/job-manager/job-manager.spec.ts

@@ -13,15 +13,15 @@ describe('JobManager', () => {
         expect(jm.getOne('invalid')).toBeNull();
     });
 
-    it('startJob() returns a job', () => {
+    it('createJob() returns a job', () => {
         const jm = new JobManager();
-        const job = jm.startJob('test', noop);
+        const job = jm.createJob('test', noop);
         expect(job.name).toBe('test');
     });
 
     it('getOne() returns job by id', () => {
         const jm = new JobManager();
-        const job1 = jm.startJob('test', noop);
+        const job1 = jm.createJob('test', noop);
         const job2 = jm.getOne(job1.id);
 
         expect(job1.id).toBe(job2!.id);
@@ -30,7 +30,8 @@ describe('JobManager', () => {
     it('job completes once work fn returns', async () => {
         const jm = new JobManager();
         const subject = new Subject();
-        const job = jm.startJob('test', () => subject.toPromise());
+        const job = jm.createJob('test', () => subject.toPromise());
+        job.start();
         await tick();
 
         expect(jm.getOne(job.id)!.state).toBe(JobState.RUNNING);
@@ -47,7 +48,8 @@ describe('JobManager', () => {
     it('job fails if work fn throws', async () => {
         const jm = new JobManager();
         const subject = new Subject();
-        const job = jm.startJob('test', () => subject.toPromise());
+        const job = jm.createJob('test', () => subject.toPromise());
+        job.start();
         await tick();
 
         expect(jm.getOne(job.id)!.state).toBe(JobState.RUNNING);
@@ -64,10 +66,11 @@ describe('JobManager', () => {
         const jm = new JobManager();
         const subject = new Subject();
         const progressSubject = new Subject<number>();
-        const job = jm.startJob('test', (reporter => {
+        const job = jm.createJob('test', (reporter => {
             progressSubject.subscribe(val => reporter.setProgress(val));
             return subject.toPromise();
         }));
+        job.start();
         await tick();
         expect(jm.getOne(job.id)!.progress).toBe(0);
 
@@ -92,18 +95,18 @@ describe('JobManager', () => {
 
     it('getAll() returns all jobs', () => {
         const jm = new JobManager();
-        const job1 = jm.startJob('job1', noop);
-        const job2 = jm.startJob('job2', noop);
-        const job3 = jm.startJob('job3', noop);
+        const job1 = jm.createJob('job1', noop);
+        const job2 = jm.createJob('job2', noop);
+        const job3 = jm.createJob('job3', noop);
 
         expect(jm.getAll().map(j => j.id)).toEqual([job1.id, job2.id, job3.id]);
     });
 
     it('getAll() filters by id', () => {
         const jm = new JobManager();
-        const job1 = jm.startJob('job1', noop);
-        const job2 = jm.startJob('job2', noop);
-        const job3 = jm.startJob('job3', noop);
+        const job1 = jm.createJob('job1', noop);
+        const job2 = jm.createJob('job2', noop);
+        const job3 = jm.createJob('job3', noop);
 
         expect(jm.getAll({ ids: [job1.id, job3.id]}).map(j => j.id)).toEqual([job1.id, job3.id]);
     });
@@ -111,9 +114,12 @@ describe('JobManager', () => {
     it('getAll() filters by state', async () => {
         const jm = new JobManager();
         const subject = new Subject();
-        const job1 = jm.startJob('job1', noop);
-        const job2 = jm.startJob('job2', noop);
-        const job3 = jm.startJob('job3', () => subject.toPromise());
+        const job1 = jm.createJob('job1', noop);
+        const job2 = jm.createJob('job2', noop);
+        const job3 = jm.createJob('job3', () => subject.toPromise());
+        job1.start();
+        job2.start();
+        job3.start();
 
         await tick();
 
@@ -126,8 +132,10 @@ describe('JobManager', () => {
         const subject1 = new Subject();
         const subject2 = new Subject();
 
-        const job1 = jm.startJob('job1', () => subject1.toPromise());
-        const job2 = jm.startJob('job2', () => subject2.toPromise());
+        const job1 = jm.createJob('job1', () => subject1.toPromise());
+        const job2 = jm.createJob('job2', () => subject2.toPromise());
+        job1.start();
+        job2.start();
 
         subject1.complete();
         await tick();
@@ -147,6 +155,16 @@ describe('JobManager', () => {
             { name: 'job2', state: JobState.RUNNING },
         ]);
     });
+
+    it('findRunningJob() works', async () => {
+        const jm = new JobManager();
+        const subject1 = new Subject();
+
+        const job1 = jm.createJob('job1', () => subject1.toPromise());
+        job1.start();
+
+        expect(jm.findRunningJob('job1')).toBe(job1);
+    });
 });
 
 function tick(duration: number = 0) {

+ 6 - 3
packages/core/src/service/helpers/job-manager/job-manager.ts

@@ -1,4 +1,4 @@
-import { JobInfo, JobListInput } from '@vendure/common/lib/generated-types';
+import { JobInfo, JobListInput, JobState } from '@vendure/common/lib/generated-types';
 import { pick } from '@vendure/common/lib/pick';
 import ms = require('ms');
 
@@ -36,10 +36,9 @@ export class JobManager {
      * property of the job. If the function throws, the job will fail and the `result` property
      * will be the error thrown.
      */
-    startJob(name: string, work: (reporter: JobReporter) => any | Promise<any>): Job {
+    createJob(name: string, work: (reporter: JobReporter) => any | Promise<any>): Job {
         const job = new Job(name, work);
         this.jobs.set(job.id, job);
-        job.start();
         return job;
     }
 
@@ -83,6 +82,10 @@ export class JobManager {
         });
     }
 
+    findRunningJob(name: string): Job | undefined {
+        return Array.from(this.jobs.values()).find(job => job.name === name && job.state === JobState.RUNNING);
+    }
+
     private toJobInfo(job: Job): JobInfo {
         const info =  pick(job, ['id', 'name', 'state', 'progress', 'result', 'started', 'ended']);
         const duration = job.ended ? +job.ended - +info.started : Date.now() - +info.started;

+ 18 - 0
packages/core/src/service/helpers/job-manager/job.spec.ts

@@ -0,0 +1,18 @@
+import { Job } from './job';
+
+describe('Job', () => {
+    it('does not run work more than once', () => {
+        let counter = 0;
+        const job = new Job('test', () => {
+            counter++;
+            return new Promise(() => {});
+        });
+        job.start();
+
+        expect(counter).toBe(1);
+
+        job.start();
+
+        expect(counter).toBe(1);
+    });
+});

+ 3 - 0
packages/core/src/service/helpers/job-manager/job.ts

@@ -22,6 +22,9 @@ export class Job {
     }
 
     async start() {
+        if (this.state !== JobState.PENDING) {
+            return;
+        }
         const reporter: JobReporter = {
             setProgress: (percentage: number) => {
                 this.progress = Math.max(Math.min(percentage, 100), 0);

+ 15 - 4
packages/core/src/service/services/job.service.ts

@@ -1,9 +1,9 @@
 import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
-import { JobInfo, JobListInput } from '@vendure/common/lib/generated-types';
-import ms = require('ms');
+import { JobInfo, JobListInput, JobState } from '@vendure/common/lib/generated-types';
 
 import { Job } from '../helpers/job-manager/job';
 import { JobManager, JobReporter } from '../helpers/job-manager/job-manager';
+import ms = require('ms');
 
 @Injectable()
 export class JobService implements OnModuleInit, OnModuleDestroy {
@@ -19,8 +19,19 @@ export class JobService implements OnModuleInit, OnModuleDestroy {
         global.clearInterval(this.cleanJobsTimer);
     }
 
-    startJob(name: string, work: (reporter: JobReporter) => any | Promise<any>): Job {
-        return this.manager.startJob(name, work);
+    createJob(options: {
+        name: string;
+        work: (reporter: JobReporter) => any | Promise<any>;
+        /** Limit this job to a single instance at a time */
+        singleInstance?: boolean;
+    }): Job {
+        if (options.singleInstance === true) {
+            const runningInstance = this.manager.findRunningJob(options.name);
+            if (runningInstance) {
+                return runningInstance;
+            }
+        }
+        return this.manager.createJob(options.name, options.work);
     }
 
     getAll(input?: JobListInput): JobInfo[] {