Browse Source

feat(elasticsearch-plugin): Implement worker

Michael Bromley 6 years ago
parent
commit
4775c2e44d

+ 5 - 1
packages/elasticsearch-plugin/src/constants.ts

@@ -1,8 +1,12 @@
 export const ELASTIC_SEARCH_OPTIONS = Symbol('ELASTIC_SEARCH_OPTIONS');
 export const ELASTIC_SEARCH_OPTIONS = Symbol('ELASTIC_SEARCH_OPTIONS');
 export const ELASTIC_SEARCH_CLIENT = Symbol('ELASTIC_SEARCH_CLIENT');
 export const ELASTIC_SEARCH_CLIENT = Symbol('ELASTIC_SEARCH_CLIENT');
-export const BATCH_SIZE = 500;
 export const VARIANT_INDEX_NAME = 'variants';
 export const VARIANT_INDEX_NAME = 'variants';
 export const VARIANT_INDEX_TYPE = 'variant-index-item';
 export const VARIANT_INDEX_TYPE = 'variant-index-item';
 export const PRODUCT_INDEX_NAME = 'products';
 export const PRODUCT_INDEX_NAME = 'products';
 export const PRODUCT_INDEX_TYPE = 'product-index-item';
 export const PRODUCT_INDEX_TYPE = 'product-index-item';
 export const loggerCtx = 'ElasticsearchPlugin';
 export const loggerCtx = 'ElasticsearchPlugin';
+export enum Message {
+    Reindex = 'Reindex',
+    UpdateVariantsById = 'UpdateVariantsById',
+    UpdateProductOrVariant = 'UpdateProductOrVariant',
+}

+ 94 - 298
packages/elasticsearch-plugin/src/elasticsearch-index.service.ts

@@ -1,324 +1,120 @@
-import { Client } from '@elastic/elasticsearch';
-import { Inject, Injectable } from '@nestjs/common';
-import { InjectConnection } from '@nestjs/typeorm';
-import { unique } from '@vendure/common/lib/unique';
-import {
-    FacetValue,
-    ID,
-    Job,
-    JobService,
-    Logger,
-    Product,
-    ProductVariant,
-    ProductVariantService,
-    RequestContext,
-    translateDeep,
-} from '@vendure/core';
-import { Connection, SelectQueryBuilder } from 'typeorm';
-import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
+import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
+import { ClientProxy } from '@nestjs/microservices';
+import { ID, Job, JobService, Logger, Product, ProductVariant, RequestContext, VENDURE_WORKER_CLIENT } from '@vendure/core';
 
 
-import {
-    BATCH_SIZE,
-    ELASTIC_SEARCH_CLIENT,
-    ELASTIC_SEARCH_OPTIONS,
-    loggerCtx,
-    PRODUCT_INDEX_NAME,
-    PRODUCT_INDEX_TYPE,
-    VARIANT_INDEX_NAME,
-    VARIANT_INDEX_TYPE,
-} from './constants';
-import { ElasticsearchOptions } from './plugin';
-import { BulkOperation, BulkOperationDoc, BulkResponseBody, ProductIndexItem, VariantIndexItem } from './types';
-
-export const variantRelations = [
-    'product',
-    'product.featuredAsset',
-    'product.facetValues',
-    'product.facetValues.facet',
-    'featuredAsset',
-    'facetValues',
-    'facetValues.facet',
-    'collections',
-    'taxCategory',
-];
+import { Message } from './constants';
+import { ReindexMessageResponse } from './indexer.controller';
 
 
 @Injectable()
 @Injectable()
-export class ElasticsearchIndexService {
+export class ElasticsearchIndexService implements OnModuleDestroy {
 
 
-    constructor(@InjectConnection() private connection: Connection,
-                @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
-                @Inject(ELASTIC_SEARCH_CLIENT) private client: Client,
-                private productVariantService: ProductVariantService,
+    constructor(@Inject(VENDURE_WORKER_CLIENT) private readonly client: ClientProxy,
                 private jobService: JobService) {}
                 private jobService: JobService) {}
 
 
     /**
     /**
      * Updates the search index only for the affected entities.
      * Updates the search index only for the affected entities.
      */
      */
-    async updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
-        let updatedProductVariants: ProductVariant[] = [];
-        let removedProducts: Product[] = [];
-        let updatedVariants: ProductVariant[] = [];
-        let removedVariantIds: ID[] = [];
-        if (updatedEntity instanceof Product) {
-            const product = await this.connection.getRepository(Product).findOne(updatedEntity.id, {
-                relations: ['variants'],
-            });
-            if (product) {
-                if (product.deletedAt) {
-                    removedProducts = [product];
-                    removedVariantIds = product.variants.map(v => v.id);
+    updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
+        return this.jobService.createJob({
+            name: 'update-index',
+            work: async () => {
+                if (updatedEntity instanceof Product) {
+                    return this.client.send(Message.UpdateProductOrVariant, { ctx, productId: updatedEntity.id })
+                        .toPromise()
+                        .catch(err => Logger.error(err));
                 } else {
                 } else {
-                    const variants = await this.connection
-                        .getRepository(ProductVariant)
-                        .findByIds(product.variants.map(v => v.id), {
-                            relations: variantRelations,
-                        });
-                    updatedVariants = this.hydrateVariants(ctx, variants);
-                    updatedProductVariants = updatedVariants;
+                    return this.client.send(Message.UpdateProductOrVariant, { ctx, variantId: updatedEntity.id })
+                        .toPromise()
+                        .catch(err => Logger.error(err));
                 }
                 }
-            }
-        } else {
-            const variant = await this.connection.getRepository(ProductVariant).findOne(updatedEntity.id, {
-                relations: variantRelations,
-            });
-            if (variant) {
-                updatedVariants = [variant];
-            }
-        }
-
-        if (updatedProductVariants.length) {
-            const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
-            const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
-                { update: { _id: updatedProductIndexItem.productId.toString() } },
-                { doc: updatedProductIndexItem },
-            ];
-            await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
-        }
-        if (updatedVariants.length) {
-            const operations = updatedVariants.reduce((ops, variant) => {
-                return [
-                    ...ops,
-                    { update: { _id: variant.id.toString() } },
-                    { doc: this.createVariantIndexItem(variant) },
-                ];
-            }, [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>);
-            await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
-        }
-        if (removedVariantIds.length) {
-            const operations = removedVariantIds.reduce((ops, id) => {
-                return [
-                    ...ops,
-                    { delete: { _id: id.toString() } },
-                ];
-            }, [] as BulkOperation[]);
-            await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
-        }
+            },
+        });
     }
     }
 
 
-    async updateVariantsById(ctx: RequestContext, ids: ID[]) {
-        if (ids.length) {
-            const batches = Math.ceil(ids.length / BATCH_SIZE);
-            Logger.verbose(`Updating ${ids.length} variants...`);
-
-            for (let i = 0; i < batches; i++) {
-                const begin = i * BATCH_SIZE;
-                const end = begin + BATCH_SIZE;
-                Logger.verbose(`Updating ids from index ${begin} to ${end}`);
-                const batchIds = ids.slice(begin, end);
-                const batch = await this.getBatchByIds(ctx, batchIds);
-                const variants = this.hydrateVariants(ctx, batch);
-                const operations = this.hydrateVariants(ctx, variants).reduce((ops, variant) => {
-                    return [
-                        ...ops,
-                        { update: { _id: variant.id.toString() } },
-                        { doc: this.createVariantIndexItem(variant) },
-                    ];
-                }, [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>);
-                await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
-            }
-        }
+    updateVariantsById(ctx: RequestContext, ids: ID[]) {
+        return this.jobService.createJob({
+            name: 'update-index',
+            work: async reporter => {
+                return new Promise((resolve, reject) => {
+                    Logger.verbose(`sending reindex message`);
+                    let total: number | undefined;
+                    let duration = 0;
+                    let completed = 0;
+                    this.client.send<ReindexMessageResponse>(Message.UpdateVariantsById, { ctx, ids })
+                        .subscribe({
+                            next: response => {
+                                if (!total) {
+                                    total = response.total;
+                                }
+                                duration = response.duration;
+                                completed = response.completed;
+                                const progress = Math.ceil((completed / total) * 100);
+                                reporter.setProgress(progress);
+                            },
+                            complete: () => {
+                                resolve({
+                                    success: true,
+                                    indexedItemCount: total,
+                                    timeTaken: duration,
+                                });
+                            },
+                            error: (err) => {
+                                Logger.error(JSON.stringify(err));
+                                resolve({
+                                    success: false,
+                                    indexedItemCount: 0,
+                                    timeTaken: 0,
+                                });
+                            },
+                        });
+                });
+            },
+        });
     }
     }
 
 
     reindex(ctx: RequestContext): Job {
     reindex(ctx: RequestContext): Job {
-        const job = this.jobService.createJob({
+        return this.jobService.createJob({
             name: 'reindex',
             name: 'reindex',
             singleInstance: true,
             singleInstance: true,
             work: async reporter => {
             work: async reporter => {
-                const timeStart = Date.now();
-                const qb = this.getSearchIndexQueryBuilder();
-                const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
-                Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
-
-                const batches = Math.ceil(count / BATCH_SIZE);
-                let variantsInProduct: ProductVariant[] = [];
-
-                for (let i = 0; i < batches; i++) {
-                    Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
-
-                    const variants = await this.getBatch(ctx, qb, i);
-                    Logger.verbose(`variants count: ${variants.length}`);
-
-                    const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
-                    const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
-
-                    // tslint:disable-next-line:prefer-for-of
-                    for (let j = 0; j < variants.length; j++) {
-                        const variant = variants[j];
-                        variantsInProduct.push(variant);
-                        variantsToIndex.push({ index: { _id: variant.id.toString() } });
-                        variantsToIndex.push(this.createVariantIndexItem(variant));
-
-                        const nextVariant = variants[j + 1];
-                        if (nextVariant && nextVariant.productId !== variant.productId) {
-                            productsToIndex.push({ index: { _id: variant.productId.toString() } });
-                            productsToIndex.push(this.createProductIndexItem(variantsInProduct) as any);
-                            variantsInProduct = [];
-                        }
-                    }
-                    await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
-                    await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
-                    reporter.setProgress(Math.ceil(((i + 1) / batches) * 100));
-                }
-                return {
-                    success: true,
-                    indexedItemCount: count,
-                    timeTaken: Date.now() - timeStart,
-                };
-            },
-        });
-        return job;
-    }
-
-    private async executeBulkOperations(indexName: string,
-                                        indexType: string,
-                                        operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>) {
-        try {
-            const {body}: { body: BulkResponseBody; } = await this.client.bulk({
-                refresh: 'true',
-                index: this.options.indexPrefix + indexName,
-                type: indexType,
-                body: operations,
-            });
-
-            if (body.errors) {
-                Logger.error(`Some errors occurred running bulk operations on ${indexType}! Set logger to "debug" to print all errors.`, loggerCtx);
-                body.items.forEach(item => {
-                    if (item.index) {
-                        Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
-                    }
-                    if (item.update) {
-                        Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
-                    }
-                    if (item.delete) {
-                        Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
-                    }
+                return new Promise((resolve, reject) => {
+                    Logger.verbose(`sending reindex message`);
+                    let total: number | undefined;
+                    let duration = 0;
+                    let completed = 0;
+                    this.client.send<ReindexMessageResponse>(Message.Reindex, { ctx })
+                        .subscribe({
+                            next: response => {
+                                if (!total) {
+                                    total = response.total;
+                                }
+                                duration = response.duration;
+                                completed = response.completed;
+                                const progress = Math.ceil((completed / total) * 100);
+                                reporter.setProgress(progress);
+                            },
+                            complete: () => {
+                                resolve({
+                                    success: true,
+                                    indexedItemCount: total,
+                                    timeTaken: duration,
+                                });
+                            },
+                            error: (err) => {
+                                Logger.error(JSON.stringify(err));
+                                resolve({
+                                    success: false,
+                                    indexedItemCount: 0,
+                                    timeTaken: 0,
+                                });
+                            },
+                        });
                 });
                 });
-            } else {
-                Logger.verbose(`Executed ${body.items.length} bulk operations on ${indexType}`);
-            }
-            return body;
-        } catch (e) {
-            Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
-            Logger.error('Error details: ' + JSON.stringify(e.body.error, null, 2), loggerCtx);
-        }
-    }
-
-    private getSearchIndexQueryBuilder() {
-        const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
-        FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
-            relations: variantRelations,
-            order: {
-                productId: 'ASC',
             },
             },
         });
         });
-        FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
-        return qb;
-    }
-
-    private async getBatch(ctx: RequestContext, qb: SelectQueryBuilder<ProductVariant>, batchNumber: string | number): Promise<ProductVariant[]> {
-        const i = Number.parseInt(batchNumber.toString(), 10);
-        const variants = await qb
-            .where('variants__product.deletedAt IS NULL')
-            .take(BATCH_SIZE)
-            .skip(i * BATCH_SIZE)
-            .getMany();
-
-        return this.hydrateVariants(ctx, variants);
-    }
-
-    private async getBatchByIds(ctx: RequestContext, ids: ID[]) {
-        const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
-            relations: variantRelations,
-        });
-        return this.hydrateVariants(ctx, variants);
-    }
-    /**
-     * Given an array of ProductVariants, this method applies the correct taxes and translations.
-     */
-    private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
-        return variants
-            .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
-            .map(v => translateDeep(v, ctx.languageCode, ['product']));
-    }
-
-    private createVariantIndexItem(v: ProductVariant): VariantIndexItem {
-        return {
-            sku: v.sku,
-            slug: v.product.slug,
-            productId: v.product.id as string,
-            productName: v.product.name,
-            productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
-            productVariantId: v.id as string,
-            productVariantName: v.name,
-            productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
-            price: v.price,
-            priceWithTax: v.priceWithTax,
-            currencyCode: v.currencyCode,
-            description: v.product.description,
-            facetIds: this.getFacetIds([v]),
-            facetValueIds: this.getFacetValueIds([v]),
-            collectionIds: v.collections.map(c => c.id.toString()),
-            enabled: v.enabled && v.product.enabled,
-        };
-    }
-
-    private createProductIndexItem(variants: ProductVariant[]): ProductIndexItem {
-        const first = variants[0];
-        const prices = variants.map(v => v.price);
-        const pricesWithTax = variants.map(v => v.priceWithTax);
-        return {
-            sku: variants.map(v => v.sku),
-            slug: variants.map(v => v.product.slug),
-            productId: first.product.id,
-            productName: variants.map(v => v.product.name),
-            productPreview: first.product.featuredAsset ? first.product.featuredAsset.preview : '',
-            productVariantId: variants.map(v => v.id),
-            productVariantName: variants.map(v => v.name),
-            productVariantPreview: variants.filter(v => v.featuredAsset).map(v => v.featuredAsset.preview),
-            priceMin: Math.min(...prices),
-            priceMax: Math.max(...prices),
-            priceWithTaxMin: Math.min(...pricesWithTax),
-            priceWithTaxMax: Math.max(...pricesWithTax),
-            currencyCode: first.currencyCode,
-            description: first.product.description,
-            facetIds: this.getFacetIds(variants),
-            facetValueIds: this.getFacetValueIds(variants),
-            collectionIds: variants.reduce((ids, v) => [ ...ids, ...v.collections.map(c => c.id)], [] as ID[]),
-            enabled: first.product.enabled,
-        };
-    }
-
-    private getFacetIds(variants: ProductVariant[]): string[] {
-        const facetIds = (fv: FacetValue) => fv.facet.id.toString();
-        const variantFacetIds = variants.reduce((ids, v) => [ ...ids, ...v.facetValues.map(facetIds)], [] as string[]);
-        const productFacetIds = variants[0].product.facetValues.map(facetIds);
-        return unique([...variantFacetIds, ...productFacetIds]);
     }
     }
 
 
-    private getFacetValueIds(variants: ProductVariant[]): string[] {
-        const facetValueIds = (fv: FacetValue) => fv.id.toString();
-        const variantFacetValueIds = variants.reduce((ids, v) => [ ...ids, ...v.facetValues.map(facetValueIds)], [] as string[]);
-        const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
-        return unique([...variantFacetValueIds, ...productFacetValueIds]);
+    onModuleDestroy(): any {
+        this.client.close();
     }
     }
 }
 }

+ 358 - 0
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -0,0 +1,358 @@
+import { Client } from '@elastic/elasticsearch';
+import { Controller, Inject } from '@nestjs/common';
+import { MessagePattern } from '@nestjs/microservices';
+import { InjectConnection } from '@nestjs/typeorm';
+import { unique } from '@vendure/common/lib/unique';
+import { FacetValue, ID, JobService, Logger, Product, ProductVariant, ProductVariantService, RequestContext, translateDeep } from '@vendure/core';
+import { defer, Observable } from 'rxjs';
+import { Connection, SelectQueryBuilder } from 'typeorm';
+import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
+
+import {
+    ELASTIC_SEARCH_CLIENT,
+    ELASTIC_SEARCH_OPTIONS,
+    loggerCtx,
+    Message,
+    PRODUCT_INDEX_NAME,
+    PRODUCT_INDEX_TYPE,
+    VARIANT_INDEX_NAME,
+    VARIANT_INDEX_TYPE,
+} from './constants';
+import { ElasticsearchOptions } from './plugin';
+import { BulkOperation, BulkOperationDoc, BulkResponseBody, ProductIndexItem, VariantIndexItem } from './types';
+
+export const variantRelations = [
+    'product',
+    'product.featuredAsset',
+    'product.facetValues',
+    'product.facetValues.facet',
+    'featuredAsset',
+    'facetValues',
+    'facetValues.facet',
+    'collections',
+    'taxCategory',
+];
+
+export interface ReindexMessageResponse {
+    total: number;
+    completed: number;
+    duration: number;
+}
+
+@Controller()
+export class ElasticsearchIndexerController {
+
+    constructor(@InjectConnection() private connection: Connection,
+                @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
+                @Inject(ELASTIC_SEARCH_CLIENT) private client: Client,
+                private productVariantService: ProductVariantService,
+                private jobService: JobService) {}
+
+    /**
+     * Updates the search index only for the affected entities.
+     */
+    @MessagePattern(Message.UpdateProductOrVariant)
+    updateProductOrVariant({ ctx: rawContext, productId, variantId }: { ctx: any, productId?: ID, variantId?: ID }): Observable<boolean> {
+        const ctx = RequestContext.fromObject(rawContext);
+        let updatedProductVariants: ProductVariant[] = [];
+        let removedProducts: Product[] = [];
+        let updatedVariants: ProductVariant[] = [];
+        let removedVariantIds: ID[] = [];
+
+        return defer(async () => {
+            if (productId) {
+                const product = await this.connection.getRepository(Product).findOne(productId, {
+                    relations: ['variants'],
+                });
+                if (product) {
+                    if (product.deletedAt) {
+                        removedProducts = [product];
+                        removedVariantIds = product.variants.map(v => v.id);
+                    } else {
+                        const variants = await this.connection
+                            .getRepository(ProductVariant)
+                            .findByIds(product.variants.map(v => v.id), {
+                                relations: variantRelations,
+                            });
+                        updatedProductVariants = updatedVariants;
+                    }
+                }
+            } else {
+                const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
+                    relations: variantRelations,
+                });
+                if (variant) {
+                    updatedVariants = [variant];
+                }
+            }
+            Logger.verbose(`Updating ${updatedVariants.length} variants`, loggerCtx);
+            updatedVariants = this.hydrateVariants(ctx, updatedVariants);
+
+            if (updatedProductVariants.length) {
+                const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
+                const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
+                    { update: { _id: updatedProductIndexItem.productId.toString() } },
+                    { doc: updatedProductIndexItem },
+                ];
+                await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
+            }
+            if (updatedVariants.length) {
+                const operations = updatedVariants.reduce((ops, variant) => {
+                    return [
+                        ...ops,
+                        { update: { _id: variant.id.toString() } },
+                        { doc: this.createVariantIndexItem(variant) },
+                    ];
+                }, [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>);
+                await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
+            }
+            if (removedVariantIds.length) {
+                const operations = removedVariantIds.reduce((ops, id) => {
+                    return [
+                        ...ops,
+                        { delete: { _id: id.toString() } },
+                    ];
+                }, [] as BulkOperation[]);
+                await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
+            }
+
+            return true;
+        });
+    }
+
+    @MessagePattern(Message.UpdateVariantsById)
+    updateVariantsById({ ctx: rawContext, ids }: { ctx: any, ids: ID[] }): Observable<ReindexMessageResponse> {
+        const ctx = RequestContext.fromObject(rawContext);
+        const { batchSize } = this.options;
+
+        return new Observable(observer => {
+            (async () => {
+                const timeStart = Date.now();
+                if (ids.length) {
+                    const batches = Math.ceil(ids.length / batchSize);
+                    Logger.verbose(`Updating ${ids.length} variants...`);
+
+                    for (let i = 0; i < batches; i++) {
+                        const begin = i * batchSize;
+                        const end = begin + batchSize;
+                        Logger.verbose(`Updating ids from index ${begin} to ${end}`);
+                        const batchIds = ids.slice(begin, end);
+                        const batch = await this.getBatchByIds(ctx, batchIds);
+                        const operations = batch.reduce((ops, variant) => {
+                            return [
+                                ...ops,
+                                { update: { _id: variant.id.toString() } },
+                                { doc: this.createVariantIndexItem(variant) },
+                            ];
+                        }, [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>);
+                        await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
+                        observer.next({
+                            total: ids.length,
+                            completed: Math.min((i + 1) * batchSize, ids.length),
+                            duration: +new Date() - timeStart,
+                        });
+                    }
+                }
+                Logger.verbose(`Completed reindexing!`);
+                observer.next({
+                    total: ids.length,
+                    completed: ids.length,
+                    duration: +new Date() - timeStart,
+                });
+                observer.complete();
+            })();
+        });
+    }
+
+    @MessagePattern(Message.Reindex)
+    reindex({ ctx: rawContext }: { ctx: any }): Observable<ReindexMessageResponse> {
+        const ctx = RequestContext.fromObject(rawContext);
+        const { batchSize } = this.options;
+
+        return new Observable(observer => {
+                (async () => {
+                    const timeStart = Date.now();
+                    const qb = this.getSearchIndexQueryBuilder();
+                    const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
+                    Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
+
+                    const batches = Math.ceil(count / batchSize);
+                    let variantsInProduct: ProductVariant[] = [];
+
+                    for (let i = 0; i < batches; i++) {
+                        Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
+
+                        const variants = await this.getBatch(ctx, qb, i);
+                        Logger.verbose(`variants count: ${variants.length}`);
+
+                        const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
+                        const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
+
+                        // tslint:disable-next-line:prefer-for-of
+                        for (let j = 0; j < variants.length; j++) {
+                            const variant = variants[j];
+                            variantsInProduct.push(variant);
+                            variantsToIndex.push({index: {_id: variant.id.toString()}});
+                            variantsToIndex.push(this.createVariantIndexItem(variant));
+
+                            const nextVariant = variants[j + 1];
+                            if (nextVariant && nextVariant.productId !== variant.productId) {
+                                productsToIndex.push({index: {_id: variant.productId.toString()}});
+                                productsToIndex.push(this.createProductIndexItem(variantsInProduct) as any);
+                                variantsInProduct = [];
+                            }
+                        }
+                        await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
+                        await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
+                        observer.next({
+                            total: count,
+                            completed: Math.min((i + 1) * batchSize, count),
+                            duration: +new Date() - timeStart,
+                        });
+                    }
+                    Logger.verbose(`Completed reindexing!`);
+                    observer.next({
+                        total: count,
+                        completed: count,
+                        duration: +new Date() - timeStart,
+                    });
+                    observer.complete();
+                })();
+            },
+        );
+    }
+
+    private async executeBulkOperations(indexName: string,
+                                        indexType: string,
+                                        operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>) {
+        try {
+            const {body}: { body: BulkResponseBody; } = await this.client.bulk({
+                refresh: 'true',
+                index: this.options.indexPrefix + indexName,
+                type: indexType,
+                body: operations,
+            });
+
+            if (body.errors) {
+                Logger.error(`Some errors occurred running bulk operations on ${indexType}! Set logger to "debug" to print all errors.`, loggerCtx);
+                body.items.forEach(item => {
+                    if (item.index) {
+                        Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
+                    }
+                    if (item.update) {
+                        Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
+                    }
+                    if (item.delete) {
+                        Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
+                    }
+                });
+            } else {
+                Logger.verbose(`Executed ${body.items.length} bulk operations on ${indexType}`);
+            }
+            return body;
+        } catch (e) {
+            Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
+            Logger.error('Error details: ' + JSON.stringify(e.body.error, null, 2), loggerCtx);
+        }
+    }
+
+    private getSearchIndexQueryBuilder() {
+        const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
+        FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
+            relations: variantRelations,
+            order: {
+                productId: 'ASC',
+            },
+        });
+        FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
+        return qb;
+    }
+
+    private async getBatch(ctx: RequestContext, qb: SelectQueryBuilder<ProductVariant>, batchNumber: string | number): Promise<ProductVariant[]> {
+        const { batchSize } = this.options;
+        const i = Number.parseInt(batchNumber.toString(), 10);
+        const variants = await qb
+            .where('variants__product.deletedAt IS NULL')
+            .take(batchSize)
+            .skip(i * batchSize)
+            .getMany();
+
+        return this.hydrateVariants(ctx, variants);
+    }
+
+    private async getBatchByIds(ctx: RequestContext, ids: ID[]) {
+        const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
+            relations: variantRelations,
+        });
+        return this.hydrateVariants(ctx, variants);
+    }
+    /**
+     * Given an array of ProductVariants, this method applies the correct taxes and translations.
+     */
+    private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
+        return variants
+            .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
+            .map(v => translateDeep(v, ctx.languageCode, ['product']));
+    }
+
+    private createVariantIndexItem(v: ProductVariant): VariantIndexItem {
+        return {
+            sku: v.sku,
+            slug: v.product.slug,
+            productId: v.product.id as string,
+            productName: v.product.name,
+            productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
+            productVariantId: v.id as string,
+            productVariantName: v.name,
+            productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
+            price: v.price,
+            priceWithTax: v.priceWithTax,
+            currencyCode: v.currencyCode,
+            description: v.product.description,
+            facetIds: this.getFacetIds([v]),
+            facetValueIds: this.getFacetValueIds([v]),
+            collectionIds: v.collections.map(c => c.id.toString()),
+            enabled: v.enabled && v.product.enabled,
+        };
+    }
+
+    private createProductIndexItem(variants: ProductVariant[]): ProductIndexItem {
+        const first = variants[0];
+        const prices = variants.map(v => v.price);
+        const pricesWithTax = variants.map(v => v.priceWithTax);
+        return {
+            sku: variants.map(v => v.sku),
+            slug: variants.map(v => v.product.slug),
+            productId: first.product.id,
+            productName: variants.map(v => v.product.name),
+            productPreview: first.product.featuredAsset ? first.product.featuredAsset.preview : '',
+            productVariantId: variants.map(v => v.id),
+            productVariantName: variants.map(v => v.name),
+            productVariantPreview: variants.filter(v => v.featuredAsset).map(v => v.featuredAsset.preview),
+            priceMin: Math.min(...prices),
+            priceMax: Math.max(...prices),
+            priceWithTaxMin: Math.min(...pricesWithTax),
+            priceWithTaxMax: Math.max(...pricesWithTax),
+            currencyCode: first.currencyCode,
+            description: first.product.description,
+            facetIds: this.getFacetIds(variants),
+            facetValueIds: this.getFacetValueIds(variants),
+            collectionIds: variants.reduce((ids, v) => [ ...ids, ...v.collections.map(c => c.id)], [] as ID[]),
+            enabled: first.product.enabled,
+        };
+    }
+
+    private getFacetIds(variants: ProductVariant[]): string[] {
+        const facetIds = (fv: FacetValue) => fv.facet.id.toString();
+        const variantFacetIds = variants.reduce((ids, v) => [ ...ids, ...v.facetValues.map(facetIds)], [] as string[]);
+        const productFacetIds = variants[0].product.facetValues.map(facetIds);
+        return unique([...variantFacetIds, ...productFacetIds]);
+    }
+
+    private getFacetValueIds(variants: ProductVariant[]): string[] {
+        const facetValueIds = (fv: FacetValue) => fv.id.toString();
+        const variantFacetValueIds = variants.reduce((ids, v) => [ ...ids, ...v.facetValues.map(facetValueIds)], [] as string[]);
+        const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
+        return unique([...variantFacetValueIds, ...productFacetValueIds]);
+    }
+}

+ 72 - 3
packages/elasticsearch-plugin/src/plugin.ts

@@ -20,25 +20,86 @@ import { ELASTIC_SEARCH_CLIENT, ELASTIC_SEARCH_OPTIONS, loggerCtx } from './cons
 import { ElasticsearchIndexService } from './elasticsearch-index.service';
 import { ElasticsearchIndexService } from './elasticsearch-index.service';
 import { AdminElasticSearchResolver, ShopElasticSearchResolver } from './elasticsearch-resolver';
 import { AdminElasticSearchResolver, ShopElasticSearchResolver } from './elasticsearch-resolver';
 import { ElasticsearchService } from './elasticsearch.service';
 import { ElasticsearchService } from './elasticsearch.service';
+import { ElasticsearchIndexerController } from './indexer.controller';
 
 
+/**
+ * @description
+ * Configuration options for the {@link ElasticsearchPlugin}.
+ *
+ * @docsCategory ElasticsearchPlugin
+ */
 export interface ElasticsearchOptions {
 export interface ElasticsearchOptions {
+    /**
+     * @description
+     * The host of the Elasticsearch server.
+     */
     host: string;
     host: string;
+    /**
+     * @description
+     * The port of the Elasticsearch server.
+     */
     port: number;
     port: number;
+    /**
+     * @description
+     * Prefix for the indices created by the plugin.
+     *
+     * @default
+     * 'vendure-'
+     */
     indexPrefix?: string;
     indexPrefix?: string;
+    /**
+     * @description
+     * Batch size for bulk operations (e.g. when rebuilding the indices)
+     *
+     * @default
+     * 2000
+     */
+    batchSize?: number;
 }
 }
 
 
+/**
+ * @description
+ * This plugin allows your product search to be powered by [Elasticsearch](https://github.com/elastic/elasticsearch) - a powerful Open Source search
+ * engine.
+ *
+ * ## Installation
+ *
+ * `yarn add \@vendure/elasticsearch-plugin`
+ *
+ * or
+ *
+ * `npm install \@vendure/elasticsearch-plugin`
+ *
+ * @example
+ * ```ts
+ * import { ElasticsearchPlugin } from '\@vendure/elasticsearch-plugin';
+ *
+ * const config: VendureConfig = {
+ *   // Add an instance of the plugin to the plugins array
+ *   plugins: [
+ *     new ElasticsearchPlugin({
+ *       host: 'http://localhost',
+ *       port: 9200,
+ *     }),
+ *   ],
+ * };
+ * ```
+ *
+ * @docsCategory ElasticsearchPlugin
+ */
 export class ElasticsearchPlugin implements VendurePlugin {
 export class ElasticsearchPlugin implements VendurePlugin {
     private readonly options: Required<ElasticsearchOptions>;
     private readonly options: Required<ElasticsearchOptions>;
     private readonly client: Client;
     private readonly client: Client;
 
 
     constructor(options: ElasticsearchOptions) {
     constructor(options: ElasticsearchOptions) {
-        this.options = { indexPrefix: 'vendure-', ...options };
+        this.options = { indexPrefix: 'vendure-', batchSize: 2000, ...options };
         const { host, port } = options;
         const { host, port } = options;
         this.client = new Client({
         this.client = new Client({
             node: `${host}:${port}`,
             node: `${host}:${port}`,
         });
         });
     }
     }
 
 
+    /** @internal */
     async onBootstrap(inject: <T>(type: Type<T>) => T): Promise<void> {
     async onBootstrap(inject: <T>(type: Type<T>) => T): Promise<void> {
         const elasticsearchService = inject(ElasticsearchService);
         const elasticsearchService = inject(ElasticsearchService);
         const elasticsearchIndexService = inject(ElasticsearchIndexService);
         const elasticsearchIndexService = inject(ElasticsearchIndexService);
@@ -57,11 +118,11 @@ export class ElasticsearchPlugin implements VendurePlugin {
         const eventBus = inject(EventBus);
         const eventBus = inject(EventBus);
         eventBus.subscribe(CatalogModificationEvent, event => {
         eventBus.subscribe(CatalogModificationEvent, event => {
             if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
             if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
-                return elasticsearchIndexService.updateProductOrVariant(event.ctx, event.entity);
+                return elasticsearchIndexService.updateProductOrVariant(event.ctx, event.entity).start();
             }
             }
         });
         });
         eventBus.subscribe(CollectionModificationEvent, event => {
         eventBus.subscribe(CollectionModificationEvent, event => {
-            return elasticsearchIndexService.updateVariantsById(event.ctx, event.productVariantIds);
+            return elasticsearchIndexService.updateVariantsById(event.ctx, event.productVariantIds).start();
         });
         });
         eventBus.subscribe(TaxRateModificationEvent, event => {
         eventBus.subscribe(TaxRateModificationEvent, event => {
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
@@ -97,6 +158,7 @@ export class ElasticsearchPlugin implements VendurePlugin {
         };
         };
     }
     }
 
 
+    /** @internal */
     defineProviders(): Provider[] {
     defineProviders(): Provider[] {
         return [
         return [
             { provide: ElasticsearchIndexService, useClass: ElasticsearchIndexService },
             { provide: ElasticsearchIndexService, useClass: ElasticsearchIndexService },
@@ -108,4 +170,11 @@ export class ElasticsearchPlugin implements VendurePlugin {
             { provide: ELASTIC_SEARCH_CLIENT, useFactory: () => this.client },
             { provide: ELASTIC_SEARCH_CLIENT, useFactory: () => this.client },
         ];
         ];
     }
     }
+
+    /** @internal */
+    defineWorkers(): Array<Type<any>> {
+        return [
+            ElasticsearchIndexerController,
+        ];
+    }
 }
 }