Browse Source

feat(core): Background thread search indexing

Michael Bromley 6 years ago
parent
commit
b78354ec12

+ 5 - 2
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -15,6 +15,7 @@ import { SearchService } from '../../service/services/search.service';
 
 import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
 import { FulltextSearchService } from './fulltext-search.service';
+import { SearchIndexService } from './search-index.service';
 import { SearchIndexItem } from './search-index-item.entity';
 
 export interface DefaultSearchReindexResponse extends SearchReindexResponse {
@@ -23,9 +24,10 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
 }
 
 export class DefaultSearchPlugin implements VendurePlugin {
-    onBootstrap(inject: <T>(type: Type<T>) => T): void | Promise<void> {
+    async onBootstrap(inject: <T>(type: Type<T>) => T): Promise<void> {
         const eventBus = inject(EventBus);
         const fulltextSearchService = inject(FulltextSearchService);
+        const searchIndexService = inject(SearchIndexService);
         eventBus.subscribe(CatalogModificationEvent, event => {
             if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
                 return fulltextSearchService.updateProductOrVariant(event.ctx, event.entity);
@@ -40,6 +42,7 @@ export class DefaultSearchPlugin implements VendurePlugin {
                 return fulltextSearchService.reindex(event.ctx);
             }
         });
+        await searchIndexService.connect();
     }
 
     extendAdminAPI(): APIExtensionDefinition {
@@ -71,6 +74,6 @@ export class DefaultSearchPlugin implements VendurePlugin {
     }
 
     defineProviders(): Provider[] {
-        return [FulltextSearchService, { provide: SearchService, useClass: FulltextSearchService }];
+        return [FulltextSearchService, SearchIndexService, { provide: SearchService, useClass: FulltextSearchService }];
     }
 }

+ 3 - 38
packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts

@@ -5,7 +5,6 @@ import { Omit } from '@vendure/common/lib/omit';
 import { ID } from '@vendure/common/lib/shared-types';
 import { unique } from '@vendure/common/lib/unique';
 import { Connection } from 'typeorm';
-import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
 
 import { RequestContext } from '../../api/common/request-context';
 import { InternalServerError } from '../../common/error/errors';
@@ -20,6 +19,7 @@ import { SearchService } from '../../service/services/search.service';
 
 import { AsyncQueue } from './async-queue';
 import { SearchIndexItem } from './search-index-item.entity';
+import { SearchIndexService } from './search-index.service';
 import { MysqlSearchStrategy } from './search-strategy/mysql-search-strategy';
 import { PostgresSearchStrategy } from './search-strategy/postgres-search-strategy';
 import { SearchStrategy } from './search-strategy/search-strategy';
@@ -52,6 +52,7 @@ export class FulltextSearchService implements SearchService {
         private eventBus: EventBus,
         private facetValueService: FacetValueService,
         private productVariantService: ProductVariantService,
+        private searchIndexService: SearchIndexService,
     ) {
         this.setSearchStrategy();
     }
@@ -94,43 +95,7 @@ export class FulltextSearchService implements SearchService {
      */
     async reindex(ctx: RequestContext): Promise<JobInfo> {
         const job = this.jobService.startJob('reindex', async reporter => {
-            const timeStart = Date.now();
-            const BATCH_SIZE = 100;
-            Logger.verbose('Reindexing search index...');
-            const qb = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
-            FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
-                relations: this.variantRelations,
-            });
-            FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
-            const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
-            Logger.verbose(`Getting ${count} variants`);
-            const batches = Math.ceil(count / BATCH_SIZE);
-
-            Logger.verbose('Deleting existing index items...');
-            await this.connection.getRepository(SearchIndexItem).delete({languageCode: ctx.languageCode});
-            Logger.verbose('Deleted!');
-
-            for (let i = 0; i < batches; i++) {
-                Logger.verbose(`Processing batch ${i + 1} of ${batches}, heap used: `
-                    + (process.memoryUsage().heapUsed / 1000 / 1000).toFixed(2) + 'MB');
-                const variants = await qb
-                    .where('variants__product.deletedAt IS NULL')
-                    .take(BATCH_SIZE)
-                    .skip(i * BATCH_SIZE)
-                    .getMany();
-                await this.taskQueue.push(async () => {
-                    await this.saveSearchIndexItems(ctx, variants);
-                });
-                reporter.setProgress(Math.round((i / batches) * 100));
-            }
-
-            Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`);
-
-            return {
-                success: true,
-                indexedItemCount: count,
-                timeTaken: Date.now() - timeStart,
-            };
+            return this.searchIndexService.reindex(ctx, reporter);
         });
         return job;
     }

+ 76 - 0
packages/core/src/plugin/default-search-plugin/ipc.ts

@@ -0,0 +1,76 @@
+import { ChildProcess } from 'child_process';
+import { ConnectionOptions } from 'typeorm';
+
+import { RequestContext } from '../../api/common/request-context';
+import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
+
+export enum MessageType {
+    CONNECTION_OPTIONS,
+    CONNECTED,
+    GET_RAW_BATCH,
+    RETURN_RAW_BATCH,
+    SAVE_VARIANTS,
+    VARIANTS_SAVED,
+    COMPLETED,
+}
+
+export interface SaveVariantsPayload {
+    variants: ProductVariant[];
+    ctx: RequestContext;
+    batch: number;
+    total: number;
+}
+
+export interface IPCMessage {
+    type: MessageType;
+}
+
+export class ConnectionOptionsMessage implements IPCMessage {
+    readonly type = MessageType.CONNECTION_OPTIONS;
+    constructor(public value: ConnectionOptions) {}
+}
+
+export class ConnectedMessage implements IPCMessage {
+    readonly type = MessageType.CONNECTED;
+    constructor(public value: boolean) {}
+}
+
+export class GetRawBatchMessage implements IPCMessage {
+    readonly type = MessageType.GET_RAW_BATCH;
+    constructor(public value: { batchNumber: number; }) {}
+}
+
+export class ReturnRawBatchMessage implements IPCMessage {
+    readonly type = MessageType.RETURN_RAW_BATCH;
+    constructor(public value: { variants: ProductVariant[]; }) {}
+}
+
+export class SaveVariantsMessage implements IPCMessage {
+    readonly type = MessageType.SAVE_VARIANTS;
+    constructor(public value: SaveVariantsPayload) {}
+}
+
+export class VariantsSavedMessage implements IPCMessage {
+    readonly type = MessageType.VARIANTS_SAVED;
+    constructor(public value: { batchNumber: number; }) {}
+}
+
+export class CompletedMessage implements IPCMessage {
+    readonly type = MessageType.COMPLETED;
+    constructor(public value: boolean) {}
+}
+
+export type Message = ConnectionOptionsMessage |
+    ConnectedMessage |
+    GetRawBatchMessage |
+    ReturnRawBatchMessage |
+    SaveVariantsMessage |
+    VariantsSavedMessage |
+    CompletedMessage;
+
+export type MessageOfType<T extends MessageType> = Extract<Message, { type: T }>;
+
+export function sendIPCMessage(target: NodeJS.Process | ChildProcess, message: Message) {
+    // tslint:disable-next-line:no-non-null-assertion
+    target.send!(JSON.stringify({ type: message.type, value: message.value }));
+}

+ 133 - 0
packages/core/src/plugin/default-search-plugin/search-index-worker.ts

@@ -0,0 +1,133 @@
+/* tslint:disable:no-non-null-assertion no-console */
+import { Type } from '@vendure/common/lib/shared-types';
+import { unique } from '@vendure/common/lib/unique';
+import { Connection, ConnectionOptions, createConnection, SelectQueryBuilder } from 'typeorm';
+import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
+
+import { RequestContext } from '../../api/common/request-context';
+import { FacetValue } from '../../entity/facet-value/facet-value.entity';
+import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
+
+import {
+    CompletedMessage,
+    ConnectedMessage,
+    ConnectionOptionsMessage,
+    GetRawBatchMessage,
+    MessageType,
+    ReturnRawBatchMessage,
+    SaveVariantsMessage,
+    SaveVariantsPayload,
+    sendIPCMessage,
+    VariantsSavedMessage,
+} from './ipc';
+import { SearchIndexItem } from './search-index-item.entity';
+
+export const BATCH_SIZE = 100;
+export const variantRelations = [
+    'product',
+    'product.featuredAsset',
+    'product.facetValues',
+    'product.facetValues.facet',
+    'featuredAsset',
+    'facetValues',
+    'facetValues.facet',
+    'collections',
+    'taxCategory',
+];
+
+export type IncomingMessage = ConnectionOptionsMessage | GetRawBatchMessage | SaveVariantsMessage;
+
+export class SearchIndexWorker {
+
+    private connection: Connection;
+    private indexQueryBuilder: SelectQueryBuilder<ProductVariant>;
+
+    async connect(dbConnectionOptions: ConnectionOptions) {
+        const { coreEntitiesMap } = await import('../../entity/entities');
+        const coreEntities = Object.values(coreEntitiesMap) as Array<Type<any>>;
+        this.connection = await createConnection({ ...dbConnectionOptions, entities: [SearchIndexItem, ...coreEntities] });
+
+        this.indexQueryBuilder = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
+        FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(this.indexQueryBuilder, {
+            relations: variantRelations,
+        });
+        FindOptionsUtils.joinEagerRelations(this.indexQueryBuilder, this.indexQueryBuilder.alias, this.connection.getMetadata(ProductVariant));
+
+        sendIPCMessage(process, new ConnectedMessage(this.connection.isConnected));
+    }
+
+    async getRawBatch(batchNumber: string | number) {
+        const i = Number.parseInt(batchNumber.toString(), 10);
+        const variants = await this.indexQueryBuilder
+            .where('variants__product.deletedAt IS NULL')
+            .take(BATCH_SIZE)
+            .skip(i * BATCH_SIZE)
+            .getMany();
+
+        sendIPCMessage(process, new ReturnRawBatchMessage({variants}));
+    }
+
+    async saveVariants(payload: SaveVariantsPayload) {
+        const { variants, ctx, batch, total } = payload;
+        const requestContext = new RequestContext(ctx);
+
+        const items = variants.map((v: ProductVariant) =>
+            new SearchIndexItem({
+                sku: v.sku,
+                enabled: v.enabled,
+                slug: v.product.slug,
+                price: v.price,
+                priceWithTax: v.priceWithTax,
+                languageCode: requestContext.languageCode,
+                productVariantId: v.id,
+                productId: v.product.id,
+                productName: v.product.name,
+                description: v.product.description,
+                productVariantName: v.name,
+                productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
+                productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
+                facetIds: this.getFacetIds(v),
+                facetValueIds: this.getFacetValueIds(v),
+                collectionIds: v.collections.map(c => c.id.toString()),
+            }),
+        );
+        await this.connection.getRepository(SearchIndexItem).save(items);
+        sendIPCMessage(process, new VariantsSavedMessage({batchNumber: batch}));
+        if (batch === total - 1) {
+            sendIPCMessage(process, new CompletedMessage(true));
+        }
+    }
+
+    private getFacetIds(variant: ProductVariant): string[] {
+        const facetIds = (fv: FacetValue) => fv.facet.id.toString();
+        const variantFacetIds = variant.facetValues.map(facetIds);
+        const productFacetIds = variant.product.facetValues.map(facetIds);
+        return unique([...variantFacetIds, ...productFacetIds]);
+    }
+
+    private getFacetValueIds(variant: ProductVariant): string[] {
+        const facetValueIds = (fv: FacetValue) => fv.id.toString();
+        const variantFacetValueIds = variant.facetValues.map(facetValueIds);
+        const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
+        return unique([...variantFacetValueIds, ...productFacetValueIds]);
+    }
+}
+
+const worker = new SearchIndexWorker();
+
+process.on('message', (messageString) => {
+    const message: IncomingMessage = JSON.parse(messageString);
+    switch (message.type) {
+        case MessageType.CONNECTION_OPTIONS:
+            worker.connect(message.value);
+            break;
+        case MessageType.GET_RAW_BATCH:
+            worker.getRawBatch(message.value.batchNumber);
+            break;
+        case MessageType.SAVE_VARIANTS:
+            worker.saveVariants(message.value);
+            break;
+        default:
+            // ignore
+    }
+});

+ 173 - 0
packages/core/src/plugin/default-search-plugin/search-index.service.ts

@@ -0,0 +1,173 @@
+import { Injectable } from '@nestjs/common';
+import { InjectConnection } from '@nestjs/typeorm';
+import { pick } from '@vendure/common/lib/pick';
+import { ChildProcess, fork } from 'child_process';
+import fs from 'fs-extra';
+import path from 'path';
+import { Connection } from 'typeorm';
+import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
+
+import { RequestContext } from '../../api/common/request-context';
+import { ConfigService } from '../../config/config.service';
+import { Logger } from '../../config/logger/vendure-logger';
+import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
+import { JobReporter } from '../../service/helpers/job-manager/job-manager';
+import { translateDeep } from '../../service/helpers/utils/translate-entity';
+import { ProductVariantService } from '../../service/services/product-variant.service';
+
+import {
+    CompletedMessage,
+    ConnectedMessage,
+    ConnectionOptionsMessage,
+    GetRawBatchMessage,
+    MessageOfType,
+    MessageType,
+    ReturnRawBatchMessage,
+    SaveVariantsMessage,
+    sendIPCMessage,
+    VariantsSavedMessage,
+} from './ipc';
+import { SearchIndexItem } from './search-index-item.entity';
+import { BATCH_SIZE, variantRelations } from './search-index-worker';
+
+export type IncomingMessage = ConnectedMessage | ReturnRawBatchMessage | VariantsSavedMessage | CompletedMessage;
+
+@Injectable()
+export class SearchIndexService {
+    private workerProcess: ChildProcess;
+
+    constructor(@InjectConnection() private connection: Connection,
+                private productVariantService: ProductVariantService,
+                private configService: ConfigService) {}
+
+    /**
+     * Creates the search index worker process and has it connect to the database.
+     */
+    async connect() {
+        try {
+            this.workerProcess = this.getChildProcess(path.join(__dirname, 'search-index-worker.ts'));
+        } catch (e) {
+            Logger.error(e);
+        }
+        Logger.verbose(`Created search index worker process`, 'DefaultSearchPlugin');
+        this.workerProcess.on('error', err => `Search index worker error: ` + err.message);
+        await this.establishConnection(this.workerProcess);
+    }
+
+    async reindex(ctx: RequestContext, reporter: JobReporter) {
+        const timeStart = Date.now();
+        Logger.verbose('Reindexing search index...', 'DefaultSearchPlugin');
+        const qb = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
+        FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
+            relations: variantRelations,
+        });
+        FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
+        const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
+        Logger.verbose(`Getting ${count} variants`, 'DefaultSearchPlugin');
+        const batches = Math.ceil(count / BATCH_SIZE);
+
+        Logger.verbose('Deleting existing index items...', 'DefaultSearchPlugin');
+        await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
+        Logger.verbose('Deleted!', 'DefaultSearchPlugin');
+
+        return new Promise(async (resolve, reject) => {
+            this.subscribe(MessageType.COMPLETED, (message, unsubscribe) => {
+                Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`, 'DefaultSearchPlugin');
+                unsubscribe();
+                unsubscribeProgress();
+                resolve({
+                    success: true,
+                    indexedItemCount: count,
+                    timeTaken: Date.now() - timeStart,
+                });
+            });
+            const unsubscribeProgress = this.subscribe(MessageType.VARIANTS_SAVED, (message, unsubscribe) => {
+                reporter.setProgress(Math.ceil(((message.value.batchNumber + 1) / batches) * 100));
+                Logger.verbose(`Completed batch ${message.value.batchNumber + 1} of ${batches}`, 'DefaultSearchPlugin');
+            });
+
+            for (let i = 0; i < batches; i++) {
+                Logger.verbose(`Processing batch ${i + 1} of ${batches}`, 'DefaultSearchPlugin');
+
+                const variants = await this.getBatch(this.workerProcess, i);
+                const items = variants
+                    .map((v: any) => this.productVariantService.applyChannelPriceAndTax(v, ctx))
+                    .map((v: any) => translateDeep(v, ctx.languageCode, ['product']));
+
+                sendIPCMessage(this.workerProcess, new SaveVariantsMessage({ variants: items, ctx, batch: i, total: batches }));
+            }
+        });
+    }
+
+    /**
+     * Forks a child process based on the given filename. The filename can be a JS or TS file, as this method will attempt to
+     * use either (attempts JS first).
+     */
+    private getChildProcess(filename: string): ChildProcess {
+        const ext = path.extname(filename);
+        const fileWithoutExt = filename.replace(new RegExp(`${ext}$`), '');
+        let error: any;
+        try {
+            const jsFile = fileWithoutExt + '.js';
+            if (fs.existsSync(jsFile)) {
+                return fork(jsFile, [], { execArgv: [] });
+            }
+        } catch (e) {
+            // ignore and try ts
+            error = e;
+        }
+        try {
+            const tsFile = fileWithoutExt + '.ts';
+            if (fs.existsSync(tsFile)) {
+                // Fork the TS file using ts-node. This is useful when running in dev mode or
+                // for e2e tests.
+                return fork(tsFile, [], { execArgv: ['-r', 'ts-node/register'] });
+            }
+        } catch (e) {
+            // ignore and thow at the end.
+            error = e;
+        }
+        throw error;
+    }
+
+    establishConnection(child: ChildProcess): Promise<boolean> {
+        const connectionOptions = pick(this.configService.dbConnectionOptions as any,
+            ['type', 'name', 'database', 'host', 'port', 'username', 'password']);
+        return new Promise(resolve => {
+            sendIPCMessage(child, new ConnectionOptionsMessage(connectionOptions));
+            this.subscribe(MessageType.CONNECTED, (message, unsubscribe) => {
+                Logger.verbose(`Connection result: ${message.value}`, 'DefaultSearchPlugin');
+                unsubscribe();
+                resolve(message.value);
+            });
+        });
+    }
+
+    getBatch(child: ChildProcess, batch: number): Promise<ProductVariant[]> {
+        return new Promise(resolve => {
+            sendIPCMessage(child, new GetRawBatchMessage({ batchNumber: batch }));
+            this.subscribe(MessageType.RETURN_RAW_BATCH, (message, unsubscribe) => {
+                unsubscribe();
+                resolve(message.value.variants);
+            });
+        });
+    }
+
+    /**
+     * Subscribes to the given IPC message and executes the callback when the message is received. Returns an unsubscribe
+     * function which should be called to clean up the event listener. Alternatively, if only the first such event is
+     * important, call the `unsubscribe` function which is passed to the handler as the second argument.
+     */
+    private subscribe<T extends MessageType>(messageType: T, callback: (message: MessageOfType<T>, unsubscribe: () => void) => any): () => void {
+        const handler = (messageString: string) => {
+            const message = JSON.parse(messageString) as IncomingMessage;
+            if (message.type === messageType) {
+                callback(message as MessageOfType<T>, unsubscribe);
+            }
+        };
+        const unsubscribe = () =>  this.workerProcess.off('message', handler);
+        this.workerProcess.on('message', handler);
+        return unsubscribe;
+    }
+
+}