Browse Source

chore: Set up elasticsearch-plugin e2e workflow (#201)

* refactor(core): Create asyncObservable fn to make worker method simpler

* chore(core): Remove unused imports

* chore: Set up elasticsearch-plugin e2e workflow

* fix(elasticsearch-plugin): Correctly reindex based on active Channel

* chore: Attempt local elasticsearch install

* refactor(core): Move AsyncQueue class to common and export in package

* fix(elasticsearch-plugin): Fix some concurrency issues

Race conditions leading to errors when running many worker tasks. Solution is to use a queue to have them execute in series.

* fix(testing): Close down worker before main app when populating

The other way round can cause problems when a shared resource (e.g. the TypeORM Connection) is prematurely closed by the app before the worker has finished all jobs.

* fix(elasticsearch-plugin): Do not share elasticsearch client

The elasticsearch client was being shared between app and worker. This lead to issues in unit testing (and probably could cause other bugs in production). Now each process creates its own client and manages the lifecycle independently.

* refactor(elasticsearch-plugin): Increase concurrency in indexer

* chore: Restore full test suite
Michael Bromley 6 years ago
parent
commit
e639bc95b3

+ 14 - 2
.github/workflows/build_and_test.yml

@@ -17,20 +17,32 @@ jobs:
       matrix:
         node-version: [10.x, 12.x]
 
+    env:
+      CI: true
+
     steps:
     - uses: actions/checkout@v1
     - name: Use Node.js ${{ matrix.node-version }}
       uses: actions/setup-node@v1
       with:
         node-version: ${{ matrix.node-version }}
+    # This is required for the elasticsearch-plugin e2e tests. It would be better to use the
+    # "services" feature, but I cannot figure out how to connect to it from the test server.
+    - name: Install elasticsearch
+      run: |
+        curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.4.2-linux-x86_64.tar.gz
+        tar -xvf elasticsearch-7.4.2-linux-x86_64.tar.gz
+        cd elasticsearch-7.4.2/bin
+        ./elasticsearch &
     - name: Install & build
       run: |
         yarn install
         yarn bootstrap
         yarn build
-      env:
-        CI: true
     - name: Unit tests
       run: yarn test
     - name: e2e tests
+      env:
+        ELASTICSEARCH_PORT: 9200
+        ELASTICSEARCH_HOST: http://localhost
       run: yarn e2e

+ 0 - 1
packages/core/src/api/middleware/exception-logger.filter.ts

@@ -1,5 +1,4 @@
 import { ArgumentsHost, ExceptionFilter, HttpException } from '@nestjs/common';
-import { Request, Response } from 'express';
 
 import { Logger, LogLevel } from '../../config';
 import { I18nError } from '../../i18n/i18n-error';

+ 8 - 5
packages/core/src/plugin/default-search-plugin/async-queue.ts → packages/core/src/common/async-queue.ts

@@ -1,18 +1,18 @@
 export type Task<T = any> = () => Promise<T> | T;
 export type Resolve<T> = (val: T) => void;
 export type Reject<T> = (val: T) => void;
-type TaskQueueItem = { task: Task; resolve: Resolve<any>; reject: Reject<any>; };
+type TaskQueueItem = { task: Task; resolve: Resolve<any>; reject: Reject<any> };
 
 /**
+ * @description
  * A queue class for limiting concurrent async tasks. This can be used e.g. to prevent
  * race conditions when working on a shared resource such as writing to a database.
  *
- * The task queue itself is shared across instances (even across processes) by means of the
- * 'label' constructor argument.
+ * @docsCategory common
  */
 export class AsyncQueue {
     private static running: { [label: string]: number } = {};
-    private static taskQueue: { [label: string]: TaskQueueItem[]; } = {};
+    private static taskQueue: { [label: string]: TaskQueueItem[] } = {};
 
     constructor(private label: string = 'default', private concurrency: number = 1) {
         if (!AsyncQueue.taskQueue[label]) {
@@ -34,13 +34,16 @@ export class AsyncQueue {
     }
 
     /**
+     * @description
      * Pushes a new task onto the queue, upon which the task will either execute immediately or
      * (if the number of running tasks is equal to the concurrency limit) enqueue the task to
      * be executed at the soonest opportunity.
      */
     push<T>(task: Task<T>): Promise<T> {
         return new Promise<T>((resolve, reject) => {
-            this.running < this.concurrency ? this.runTask(task, resolve, reject) : this.enqueueTask(task, resolve, reject);
+            this.running < this.concurrency
+                ? this.runTask(task, resolve, reject)
+                : this.enqueueTask(task, resolve, reject);
         });
     }
 

+ 1 - 0
packages/core/src/common/index.ts

@@ -1,2 +1,3 @@
 export * from './error/errors';
 export * from './utils';
+export * from './async-queue';

+ 62 - 72
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -9,13 +9,14 @@ import { Connection } from 'typeorm';
 import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
 
 import { RequestContext } from '../../../api/common/request-context';
+import { AsyncQueue } from '../../../common/async-queue';
 import { Logger } from '../../../config/logger/vendure-logger';
 import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
 import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
 import { Product } from '../../../entity/product/product.entity';
 import { translateDeep } from '../../../service/helpers/utils/translate-entity';
 import { ProductVariantService } from '../../../service/services/product-variant.service';
-import { AsyncQueue } from '../async-queue';
+import { asyncObservable } from '../../../worker/async-observable';
 import { SearchIndexItem } from '../search-index-item.entity';
 import {
     AssignProductToChannelMessage,
@@ -56,46 +57,40 @@ export class IndexerController {
     @MessagePattern(ReindexMessage.pattern)
     reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
-        return new Observable(observer => {
-            (async () => {
-                const timeStart = Date.now();
-                const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
-                const count = await qb.getCount();
-                Logger.verbose(
-                    `Reindexing ${count} variants for channel ${ctx.channel.code}`,
-                    workerLoggerCtx,
-                );
-                const batches = Math.ceil(count / BATCH_SIZE);
+        return asyncObservable(async observer => {
+            const timeStart = Date.now();
+            const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
+            const count = await qb.getCount();
+            Logger.verbose(`Reindexing ${count} variants for channel ${ctx.channel.code}`, workerLoggerCtx);
+            const batches = Math.ceil(count / BATCH_SIZE);
 
-                await this.connection
-                    .getRepository(SearchIndexItem)
-                    .delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
-                Logger.verbose('Deleted existing index items', workerLoggerCtx);
+            await this.connection
+                .getRepository(SearchIndexItem)
+                .delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
+            Logger.verbose('Deleted existing index items', workerLoggerCtx);
 
-                for (let i = 0; i < batches; i++) {
-                    Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
+            for (let i = 0; i < batches; i++) {
+                Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
 
-                    const variants = await qb
-                        .andWhere('variants__product.deletedAt IS NULL')
-                        .take(BATCH_SIZE)
-                        .skip(i * BATCH_SIZE)
-                        .getMany();
-                    const hydratedVariants = this.hydrateVariants(ctx, variants);
-                    await this.saveVariants(ctx.languageCode, ctx.channelId, hydratedVariants);
-                    observer.next({
-                        total: count,
-                        completed: Math.min((i + 1) * BATCH_SIZE, count),
-                        duration: +new Date() - timeStart,
-                    });
-                }
-                Logger.verbose(`Completed reindexing`, workerLoggerCtx);
+                const variants = await qb
+                    .andWhere('variants__product.deletedAt IS NULL')
+                    .take(BATCH_SIZE)
+                    .skip(i * BATCH_SIZE)
+                    .getMany();
+                const hydratedVariants = this.hydrateVariants(ctx, variants);
+                await this.saveVariants(ctx.languageCode, ctx.channelId, hydratedVariants);
                 observer.next({
                     total: count,
-                    completed: count,
+                    completed: Math.min((i + 1) * BATCH_SIZE, count),
                     duration: +new Date() - timeStart,
                 });
-                observer.complete();
-            })();
+            }
+            Logger.verbose(`Completed reindexing`, workerLoggerCtx);
+            return {
+                total: count,
+                completed: count,
+                duration: +new Date() - timeStart,
+            };
         });
     }
 
@@ -106,47 +101,42 @@ export class IndexerController {
     }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
 
-        return new Observable(observer => {
-            (async () => {
-                const timeStart = Date.now();
-                if (ids.length) {
-                    const batches = Math.ceil(ids.length / BATCH_SIZE);
-                    Logger.verbose(`Updating ${ids.length} variants...`);
+        return asyncObservable(async observer => {
+            const timeStart = Date.now();
+            if (ids.length) {
+                const batches = Math.ceil(ids.length / BATCH_SIZE);
+                Logger.verbose(`Updating ${ids.length} variants...`);
 
-                    for (let i = 0; i < batches; i++) {
-                        const begin = i * BATCH_SIZE;
-                        const end = begin + BATCH_SIZE;
-                        Logger.verbose(`Updating ids from index ${begin} to ${end}`);
-                        const batchIds = ids.slice(begin, end);
-                        const batch = await this.connection
-                            .getRepository(ProductVariant)
-                            .findByIds(batchIds, {
-                                relations: variantRelations,
-                            });
-                        const variants = this.hydrateVariants(ctx, batch);
-                        await this.saveVariants(ctx.languageCode, ctx.channelId, variants);
-                        observer.next({
-                            total: ids.length,
-                            completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
-                            duration: +new Date() - timeStart,
-                        });
-                    }
+                for (let i = 0; i < batches; i++) {
+                    const begin = i * BATCH_SIZE;
+                    const end = begin + BATCH_SIZE;
+                    Logger.verbose(`Updating ids from index ${begin} to ${end}`);
+                    const batchIds = ids.slice(begin, end);
+                    const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
+                        relations: variantRelations,
+                    });
+                    const variants = this.hydrateVariants(ctx, batch);
+                    await this.saveVariants(ctx.languageCode, ctx.channelId, variants);
+                    observer.next({
+                        total: ids.length,
+                        completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
+                        duration: +new Date() - timeStart,
+                    });
                 }
-                Logger.verbose(`Completed reindexing!`);
-                observer.next({
-                    total: ids.length,
-                    completed: ids.length,
-                    duration: +new Date() - timeStart,
-                });
-                observer.complete();
-            })();
+            }
+            Logger.verbose(`Completed reindexing!`);
+            return {
+                total: ids.length,
+                completed: ids.length,
+                duration: +new Date() - timeStart,
+            };
         });
     }
 
     @MessagePattern(UpdateProductMessage.pattern)
     updateProduct(data: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
-        return defer(async () => {
+        return asyncObservable(async () => {
             return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
         });
     }
@@ -154,7 +144,7 @@ export class IndexerController {
     @MessagePattern(UpdateVariantMessage.pattern)
     updateVariants(data: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
-        return defer(async () => {
+        return asyncObservable(async () => {
             return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
         });
     }
@@ -162,7 +152,7 @@ export class IndexerController {
     @MessagePattern(DeleteProductMessage.pattern)
     deleteProduct(data: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
-        return defer(async () => {
+        return asyncObservable(async () => {
             return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
         });
     }
@@ -170,7 +160,7 @@ export class IndexerController {
     @MessagePattern(DeleteVariantMessage.pattern)
     deleteVariant(data: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
-        return defer(async () => {
+        return asyncObservable(async () => {
             const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
             if (variants.length) {
                 await this.removeSearchIndexItems(ctx.languageCode, ctx.channelId, variants.map(v => v.id));
@@ -184,7 +174,7 @@ export class IndexerController {
         data: AssignProductToChannelMessage['data'],
     ): Observable<AssignProductToChannelMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
-        return defer(async () => {
+        return asyncObservable(async () => {
             return this.updateProductInChannel(ctx, data.productId, data.channelId);
         });
     }
@@ -194,7 +184,7 @@ export class IndexerController {
         data: RemoveProductFromChannelMessage['data'],
     ): Observable<RemoveProductFromChannelMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
-        return defer(async () => {
+        return asyncObservable(async () => {
             return this.deleteProductInChannel(ctx, data.productId, data.channelId);
         });
     }

+ 18 - 21
packages/core/src/service/controllers/collection.controller.ts

@@ -3,7 +3,6 @@ import { MessagePattern } from '@nestjs/microservices';
 import { InjectConnection } from '@nestjs/typeorm';
 import { ConfigurableOperation } from '@vendure/common/lib/generated-types';
 import { ID } from '@vendure/common/lib/shared-types';
-import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 import { Observable } from 'rxjs';
 import { Connection } from 'typeorm';
 
@@ -14,6 +13,7 @@ import {
 import { Logger } from '../../config/logger/vendure-logger';
 import { Collection } from '../../entity/collection/collection.entity';
 import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
+import { asyncObservable } from '../../worker/async-observable';
 import { CollectionService } from '../services/collection.service';
 import { ApplyCollectionFiltersMessage } from '../types/collection-messages';
 
@@ -32,27 +32,24 @@ export class CollectionController {
     applyCollectionFilters({
         collectionIds,
     }: ApplyCollectionFiltersMessage['data']): Observable<ApplyCollectionFiltersMessage['response']> {
-        return new Observable(observer => {
-            (async () => {
-                Logger.verbose(`Processing ${collectionIds.length} Collections`);
-                const timeStart = Date.now();
-                const collections = await this.connection.getRepository(Collection).findByIds(collectionIds, {
-                    relations: ['productVariants'],
-                });
-                let completed = 0;
-                for (const collection of collections) {
-                    const affectedVariantIds = await this.applyCollectionFiltersInternal(collection);
+        return asyncObservable(async observer => {
+            Logger.verbose(`Processing ${collectionIds.length} Collections`);
+            const timeStart = Date.now();
+            const collections = await this.connection.getRepository(Collection).findByIds(collectionIds, {
+                relations: ['productVariants'],
+            });
+            let completed = 0;
+            for (const collection of collections) {
+                const affectedVariantIds = await this.applyCollectionFiltersInternal(collection);
 
-                    observer.next({
-                        total: collectionIds.length,
-                        completed: ++completed,
-                        duration: +new Date() - timeStart,
-                        collectionId: collection.id,
-                        affectedVariantIds,
-                    });
-                }
-                observer.complete();
-            })();
+                observer.next({
+                    total: collectionIds.length,
+                    completed: ++completed,
+                    duration: +new Date() - timeStart,
+                    collectionId: collection.id,
+                    affectedVariantIds,
+                });
+            }
         });
     }
 

+ 40 - 0
packages/core/src/worker/async-observable.ts

@@ -0,0 +1,40 @@
+import { Observable, Observer } from 'rxjs';
+
+/**
+ * @description
+ * Returns an observable which executes the given async work function and completes with
+ * the returned value. This is useful in Worker Controller methods which need to return
+ * an Observable but also want to work with async (Promise-returning) code.
+ *
+ * @example
+ * ```TypeScript
+ * @Controller()
+ * export class MyWorkerController {
+ *
+ *     @MessagePattern('test')
+ *     handleTest() {
+ *         return asyncObservable(async observer => {
+ *             const value = await this.connection.fetchSomething();
+ *             return value;
+ *         });
+ *     }
+ * }
+ * ```
+ *
+ * @docsCategory worker
+ */
+export function asyncObservable<T>(work: (observer: Observer<T>) => Promise<T | void>): Observable<T> {
+    return new Observable<T>(subscriber => {
+        (async () => {
+            try {
+                const result = await work(subscriber);
+                if (result) {
+                    subscriber.next(result);
+                }
+                subscriber.complete();
+            } catch (e) {
+                subscriber.error(e);
+            }
+        })();
+    });
+}

+ 1 - 0
packages/core/src/worker/index.ts

@@ -1,2 +1,3 @@
+export * from './async-observable';
 export * from './worker.service';
 export * from './types';

+ 1 - 1
packages/elasticsearch-plugin/.gitignore

@@ -1,4 +1,4 @@
 preview/output
 yarn-error.log
 lib
-e2e/__data__
+e2e/__data__/*.sqlite

+ 0 - 0
packages/elasticsearch-plugin/e2e/__data__/.gitkeep


+ 53 - 12
packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts

@@ -1,6 +1,7 @@
+/* tslint:disable:no-non-null-assertion */
 import { SortOrder } from '@vendure/common/lib/generated-types';
 import { pick } from '@vendure/common/lib/pick';
-import { mergeConfig } from '@vendure/core';
+import { DefaultLogger, LogLevel, mergeConfig } from '@vendure/core';
 import { facetValueCollectionFilter } from '@vendure/core/dist/config/collection/default-collection-filters';
 import { createTestEnvironment, E2E_DEFAULT_CHANNEL_TOKEN, SimpleGraphQLClient } from '@vendure/testing';
 import gql from 'graphql-tag';
@@ -44,6 +45,7 @@ import { SEARCH_PRODUCTS_SHOP } from './../../core/e2e/graphql/shop-definitions'
 import { awaitRunningJobs } from './../../core/e2e/utils/await-running-jobs';
 import { dataDir, TEST_SETUP_TIMEOUT_MS, testConfig } from './config/test-config';
 import { initialData } from './fixtures/e2e-initial-data';
+import { GetJobInfo, JobState, Reindex } from './graphql/generated-e2e-elasticsearch-plugin-types';
 
 describe('Elasticsearch plugin', () => {
     const { server, adminClient, shopClient } = createTestEnvironment(
@@ -51,8 +53,10 @@ describe('Elasticsearch plugin', () => {
             plugins: [
                 ElasticsearchPlugin.init({
                     indexPrefix: 'e2e-tests',
-                    port: 9200,
-                    host: 'http://192.168.99.100',
+                    port: process.env.CI ? +(process.env.ELASTICSEARCH_PORT || 9200) : 9200,
+                    host: process.env.CI
+                        ? process.env.ELASTICSEARCH_HOST || 'elasticsearch'
+                        : 'http://192.168.99.100',
                 }),
             ],
         }),
@@ -66,13 +70,7 @@ describe('Elasticsearch plugin', () => {
             customerCount: 1,
         });
         await adminClient.asSuperAdmin();
-        await adminClient.query(gql`
-            mutation {
-                reindex {
-                    id
-                }
-            }
-        `);
+        await adminClient.query(REINDEX);
         await awaitRunningJobs(adminClient);
     }, TEST_SETUP_TIMEOUT_MS);
 
@@ -652,6 +650,7 @@ describe('Elasticsearch plugin', () => {
                         defaultLanguageCode: LanguageCode.en,
                         currencyCode: CurrencyCode.GBP,
                         pricesIncludeTax: true,
+                        defaultTaxZoneId: 'T_2',
                     },
                 });
                 secondChannel = createChannel;
@@ -670,7 +669,7 @@ describe('Elasticsearch plugin', () => {
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
                 const { search } = await doAdminSearchQuery({ groupByProduct: true });
                 expect(search.items.map(i => i.productId).sort()).toEqual(['T_1', 'T_2']);
-            }, 10000);
+            });
 
             it('removing product from channel', async () => {
                 adminClient.setChannelToken(E2E_DEFAULT_CHANNEL_TOKEN);
@@ -688,7 +687,23 @@ describe('Elasticsearch plugin', () => {
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
                 const { search } = await doAdminSearchQuery({ groupByProduct: true });
                 expect(search.items.map(i => i.productId)).toEqual(['T_1']);
-            }, 10000);
+            });
+
+            it('reindexes in channel', async () => {
+                adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
+
+                const { reindex } = await adminClient.query<Reindex.Mutation>(REINDEX);
+                await awaitRunningJobs(adminClient);
+
+                const { job } = await adminClient.query<GetJobInfo.Query, GetJobInfo.Variables>(
+                    GET_JOB_INFO,
+                    { id: reindex.id },
+                );
+                expect(job!.state).toBe(JobState.COMPLETED);
+
+                const { search } = await doAdminSearchQuery({ groupByProduct: true });
+                expect(search.items.map(i => i.productId).sort()).toEqual(['T_1']);
+            });
         });
     });
 });
@@ -752,3 +767,29 @@ export const SEARCH_GET_PRICES = gql`
         }
     }
 `;
+
+const REINDEX = gql`
+    mutation Reindex {
+        reindex {
+            id
+            name
+            state
+            progress
+            duration
+            result
+        }
+    }
+`;
+
+const GET_JOB_INFO = gql`
+    query GetJobInfo($id: String!) {
+        job(jobId: $id) {
+            id
+            name
+            state
+            progress
+            duration
+            result
+        }
+    }
+`;

+ 34 - 0
packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts

@@ -3450,6 +3450,28 @@ export type SearchGetPricesQuery = { __typename?: 'Query' } & {
         >;
     };
 };
+
+export type ReindexMutationVariables = {};
+
+export type ReindexMutation = { __typename?: 'Mutation' } & {
+    reindex: { __typename?: 'JobInfo' } & Pick<
+        JobInfo,
+        'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'
+    >;
+};
+
+export type GetJobInfoQueryVariables = {
+    id: Scalars['String'];
+};
+
+export type GetJobInfoQuery = { __typename?: 'Query' } & {
+    job: Maybe<
+        { __typename?: 'JobInfo' } & Pick<
+            JobInfo,
+            'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'
+        >
+    >;
+};
 type DiscriminateUnion<T, U> = T extends U ? T : never;
 
 type RequireField<T, TNames extends string> = T & { [P in TNames]: (T & { [name: string]: never })[P] };
@@ -3493,3 +3515,15 @@ export namespace SearchGetPrices {
         { __typename: 'SinglePrice' }
     >;
 }
+
+export namespace Reindex {
+    export type Variables = ReindexMutationVariables;
+    export type Mutation = ReindexMutation;
+    export type Reindex = ReindexMutation['reindex'];
+}
+
+export namespace GetJobInfo {
+    export type Variables = GetJobInfoQueryVariables;
+    export type Query = GetJobInfoQuery;
+    export type Job = NonNullable<GetJobInfoQuery['job']>;
+}

+ 2 - 1
packages/elasticsearch-plugin/package.json

@@ -11,7 +11,8 @@
     "watch": "tsc -p ./tsconfig.build.json --watch",
     "build": "rimraf lib && tsc -p ./tsconfig.build.json",
     "lint": "tslint --fix --project ./",
-    "test": "jest --config ./jest.config.js"
+    "test": "jest --config ./jest.config.js",
+    "e2e": "jest --config ./e2e/config/jest-e2e.json"
   },
   "publishConfig": {
     "access": "public"

+ 0 - 1
packages/elasticsearch-plugin/src/constants.ts

@@ -1,5 +1,4 @@
 export const ELASTIC_SEARCH_OPTIONS = Symbol('ELASTIC_SEARCH_OPTIONS');
-export const ELASTIC_SEARCH_CLIENT = Symbol('ELASTIC_SEARCH_CLIENT');
 export const VARIANT_INDEX_NAME = 'variants';
 export const VARIANT_INDEX_TYPE = 'variant-index-item';
 export const PRODUCT_INDEX_NAME = 'products';

+ 4 - 2
packages/elasticsearch-plugin/src/elasticsearch-index.service.ts

@@ -28,13 +28,15 @@ import {
 export class ElasticsearchIndexService {
     constructor(private workerService: WorkerService, private jobService: JobService) {}
 
-    reindex(ctx: RequestContext): Job {
+    reindex(ctx: RequestContext, dropIndices: boolean): Job {
         return this.jobService.createJob({
             name: 'reindex',
             singleInstance: true,
             work: async reporter => {
                 Logger.verbose(`sending reindex message`);
-                this.workerService.send(new ReindexMessage({ ctx })).subscribe(this.createObserver(reporter));
+                this.workerService
+                    .send(new ReindexMessage({ ctx, dropIndices }))
+                    .subscribe(this.createObserver(reporter));
             },
         });
     }

+ 1 - 1
packages/elasticsearch-plugin/src/elasticsearch-resolver.ts

@@ -72,6 +72,6 @@ export class AdminElasticSearchResolver implements SearchResolver {
     @Mutation()
     @Allow(Permission.UpdateCatalog)
     async reindex(@Ctx() ctx: RequestContext): Promise<JobInfo> {
-        return this.elasticsearchService.reindex(ctx);
+        return this.elasticsearchService.reindex(ctx, false);
     }
 }

+ 21 - 45
packages/elasticsearch-plugin/src/elasticsearch.service.ts

@@ -1,5 +1,5 @@
 import { Client } from '@elastic/elasticsearch';
-import { Inject, Injectable } from '@nestjs/common';
+import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
 import { JobInfo, SearchResult } from '@vendure/common/lib/generated-types';
 import {
     DeepRequired,
@@ -13,7 +13,6 @@ import {
 
 import { buildElasticBody } from './build-elastic-body';
 import {
-    ELASTIC_SEARCH_CLIENT,
     ELASTIC_SEARCH_OPTIONS,
     loggerCtx,
     PRODUCT_INDEX_NAME,
@@ -22,6 +21,7 @@ import {
     VARIANT_INDEX_TYPE,
 } from './constants';
 import { ElasticsearchIndexService } from './elasticsearch-index.service';
+import { createIndices } from './indexing-utils';
 import { ElasticsearchOptions } from './options';
 import {
     CustomMapping,
@@ -35,10 +35,11 @@ import {
 } from './types';
 
 @Injectable()
-export class ElasticsearchService {
+export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
+    private client: Client;
+
     constructor(
         @Inject(ELASTIC_SEARCH_OPTIONS) private options: DeepRequired<ElasticsearchOptions>,
-        @Inject(ELASTIC_SEARCH_CLIENT) private client: Client,
         private searchService: SearchService,
         private elasticsearchIndexService: ElasticsearchIndexService,
         private facetValueService: FacetValueService,
@@ -46,6 +47,17 @@ export class ElasticsearchService {
         searchService.adopt(this);
     }
 
+    onModuleInit(): any {
+        const { host, port } = this.options;
+        this.client = new Client({
+            node: `${host}:${port}`,
+        });
+    }
+
+    onModuleDestroy(): any {
+        return this.client.close();
+    }
+
     checkConnection() {
         return this.client.ping({}, { requestTimeout: 1000 });
     }
@@ -59,7 +71,7 @@ export class ElasticsearchService {
 
             if (result.body === false) {
                 Logger.verbose(`Index "${index}" does not exist. Creating...`, loggerCtx);
-                await this.createIndices(indexPrefix);
+                await createIndices(this.client, indexPrefix);
             } else {
                 Logger.verbose(`Index "${index}" exists`, loggerCtx);
             }
@@ -225,58 +237,22 @@ export class ElasticsearchService {
     /**
      * Rebuilds the full search index.
      */
-    async reindex(ctx: RequestContext): Promise<JobInfo> {
+    async reindex(ctx: RequestContext, dropIndices = true): Promise<JobInfo> {
         const { indexPrefix } = this.options;
-        await this.deleteIndices(indexPrefix);
-        await this.createIndices(indexPrefix);
-        const job = this.elasticsearchIndexService.reindex(ctx);
+        const job = this.elasticsearchIndexService.reindex(ctx, dropIndices);
         job.start();
         return job;
     }
 
     /**
-     * Rebuilds the full search index.
+     * Reindexes all in current Channel without dropping indices.
      */
     async updateAll(ctx: RequestContext): Promise<JobInfo> {
-        const job = this.elasticsearchIndexService.reindex(ctx);
+        const job = this.elasticsearchIndexService.reindex(ctx, false);
         job.start();
         return job;
     }
 
-    private async createIndices(prefix: string) {
-        try {
-            const index = prefix + VARIANT_INDEX_NAME;
-            await this.client.indices.create({ index });
-            Logger.verbose(`Created index "${index}"`, loggerCtx);
-        } catch (e) {
-            Logger.error(JSON.stringify(e, null, 2), loggerCtx);
-        }
-        try {
-            const index = prefix + PRODUCT_INDEX_NAME;
-            await this.client.indices.create({ index });
-            Logger.verbose(`Created index "${index}"`, loggerCtx);
-        } catch (e) {
-            Logger.error(JSON.stringify(e, null, 2), loggerCtx);
-        }
-    }
-
-    private async deleteIndices(prefix: string) {
-        try {
-            const index = prefix + VARIANT_INDEX_NAME;
-            await this.client.indices.delete({ index });
-            Logger.verbose(`Deleted index "${index}"`, loggerCtx);
-        } catch (e) {
-            Logger.error(e, loggerCtx);
-        }
-        try {
-            const index = prefix + PRODUCT_INDEX_NAME;
-            await this.client.indices.delete({ index });
-            Logger.verbose(`Deleted index "${index}"`, loggerCtx);
-        } catch (e) {
-            Logger.error(e, loggerCtx);
-        }
-    }
-
     private mapVariantToSearchResult(hit: SearchHit<VariantIndexItem>): SearchResult {
         const source = hit._source;
         const result = {

+ 63 - 36
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -1,9 +1,11 @@
 import { Client } from '@elastic/elasticsearch';
-import { Controller, Inject } from '@nestjs/common';
+import { Controller, Inject, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
 import { MessagePattern } from '@nestjs/microservices';
 import { InjectConnection } from '@nestjs/typeorm';
 import { unique } from '@vendure/common/lib/unique';
 import {
+    asyncObservable,
+    AsyncQueue,
     FacetValue,
     ID,
     JobService,
@@ -14,12 +16,11 @@ import {
     RequestContext,
     translateDeep,
 } from '@vendure/core';
-import { defer, Observable } from 'rxjs';
+import { Observable } from 'rxjs';
 import { Connection, SelectQueryBuilder } from 'typeorm';
 import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
 
 import {
-    ELASTIC_SEARCH_CLIENT,
     ELASTIC_SEARCH_OPTIONS,
     loggerCtx,
     PRODUCT_INDEX_NAME,
@@ -27,6 +28,7 @@ import {
     VARIANT_INDEX_NAME,
     VARIANT_INDEX_TYPE,
 } from './constants';
+import { createIndices, deleteByChannel, deleteIndices } from './indexing-utils';
 import { ElasticsearchOptions } from './options';
 import {
     AssignProductToChannelMessage,
@@ -64,15 +66,28 @@ export interface ReindexMessageResponse {
 }
 
 @Controller()
-export class ElasticsearchIndexerController {
+export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
+    private client: Client;
+    private asyncQueue = new AsyncQueue('elasticsearch-indexer', 5);
+
     constructor(
         @InjectConnection() private connection: Connection,
         @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
-        @Inject(ELASTIC_SEARCH_CLIENT) private client: Client,
         private productVariantService: ProductVariantService,
         private jobService: JobService,
     ) {}
 
+    onModuleInit(): any {
+        const { host, port } = this.options;
+        this.client = new Client({
+            node: `${host}:${port}`,
+        });
+    }
+
+    onModuleDestroy(): any {
+        return this.client.close();
+    }
+
     /**
      * Updates the search index only for the affected product.
      */
@@ -82,7 +97,7 @@ export class ElasticsearchIndexerController {
         productId,
     }: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
-        return defer(async () => {
+        return asyncObservable(async () => {
             await this.updateProductInternal(ctx, productId, ctx.channelId);
             return true;
         });
@@ -97,7 +112,7 @@ export class ElasticsearchIndexerController {
         productId,
     }: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
-        return defer(async () => {
+        return asyncObservable(async () => {
             await this.deleteProductInternal(productId, ctx.channelId);
             const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
             await this.deleteVariantsInternal(variants.map(v => v.id), ctx.channelId);
@@ -115,7 +130,7 @@ export class ElasticsearchIndexerController {
         channelId,
     }: AssignProductToChannelMessage['data']): Observable<AssignProductToChannelMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
-        return defer(async () => {
+        return asyncObservable(async () => {
             await this.updateProductInternal(ctx, productId, channelId);
             const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
             await this.updateVariantsInternal(ctx, variants.map(v => v.id), channelId);
@@ -133,7 +148,7 @@ export class ElasticsearchIndexerController {
         channelId,
     }: RemoveProductFromChannelMessage['data']): Observable<RemoveProductFromChannelMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
-        return defer(async () => {
+        return asyncObservable(async () => {
             await this.deleteProductInternal(productId, channelId);
             const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
             await this.deleteVariantsInternal(variants.map(v => v.id), channelId);
@@ -150,9 +165,11 @@ export class ElasticsearchIndexerController {
         variantIds,
     }: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
-        return defer(async () => {
-            await this.updateVariantsInternal(ctx, variantIds, ctx.channelId);
-            return true;
+        return asyncObservable(async () => {
+            return this.asyncQueue.push(async () => {
+                await this.updateVariantsInternal(ctx, variantIds, ctx.channelId);
+                return true;
+            });
         });
     }
 
@@ -162,7 +179,7 @@ export class ElasticsearchIndexerController {
         variantIds,
     }: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
-        return defer(async () => {
+        return asyncObservable(async () => {
             await this.deleteVariantsInternal(variantIds, ctx.channelId);
             return true;
         });
@@ -176,20 +193,18 @@ export class ElasticsearchIndexerController {
         const ctx = RequestContext.fromObject(rawContext);
         const { batchSize } = this.options;
 
-        return new Observable(observer => {
-            (async () => {
+        return asyncObservable(async observer => {
+            return this.asyncQueue.push(async () => {
                 const timeStart = Date.now();
-
                 if (ids.length) {
                     const batches = Math.ceil(ids.length / batchSize);
-                    Logger.verbose(`Updating ${ids.length} variants...`);
+                    Logger.verbose(`Updating ${ids.length} variants...`, loggerCtx);
 
                     let variantsInProduct: ProductVariant[] = [];
 
                     for (let i = 0; i < batches; i++) {
                         const begin = i * batchSize;
                         const end = begin + batchSize;
-                        Logger.verbose(`Updating ids from index ${begin} to ${end}`);
                         const batchIds = ids.slice(begin, end);
                         const variants = await this.getVariantsByIds(ctx, batchIds);
                         variantsInProduct = await this.processVariantBatch(
@@ -215,27 +230,37 @@ export class ElasticsearchIndexerController {
                         });
                     }
                 }
-                Logger.verbose(`Completed reindexing!`);
-                observer.next({
+                Logger.verbose(`Completed updating variants`, loggerCtx);
+                return {
                     total: ids.length,
                     completed: ids.length,
                     duration: +new Date() - timeStart,
-                });
-                observer.complete();
-            })();
+                };
+            });
         });
     }
 
     @MessagePattern(ReindexMessage.pattern)
-    reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
+    reindex({
+        ctx: rawContext,
+        dropIndices,
+    }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
         const { batchSize } = this.options;
 
-        return new Observable(observer => {
-            (async () => {
+        return asyncObservable(async observer => {
+            return this.asyncQueue.push(async () => {
                 const timeStart = Date.now();
-                const qb = this.getSearchIndexQueryBuilder();
-                const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
+
+                if (dropIndices) {
+                    await deleteIndices(this.client, this.options.indexPrefix);
+                    await createIndices(this.client, this.options.indexPrefix);
+                } else {
+                    await deleteByChannel(this.client, this.options.indexPrefix, ctx.channelId);
+                }
+
+                const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
+                const count = await qb.andWhere('variants__product.deletedAt IS NULL').getCount();
                 Logger.verbose(`Reindexing ${count} ProductVariants`, loggerCtx);
 
                 const batches = Math.ceil(count / batchSize);
@@ -270,14 +295,13 @@ export class ElasticsearchIndexerController {
                         duration: +new Date() - timeStart,
                     });
                 }
-                Logger.verbose(`Completed reindexing!`);
-                observer.next({
+                Logger.verbose(`Completed reindexing!`, loggerCtx);
+                return {
                     total: count,
                     completed: count,
                     duration: +new Date() - timeStart,
-                });
-                observer.complete();
-            })();
+                };
+            });
         });
     }
 
@@ -427,11 +451,11 @@ export class ElasticsearchIndexerController {
             return body;
         } catch (e) {
             Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
-            Logger.error('Error details: ' + JSON.stringify(e.body.error, null, 2), loggerCtx);
+            Logger.error('Error details: ' + JSON.stringify(e.body && e.body.error, null, 2), loggerCtx);
         }
     }
 
-    private getSearchIndexQueryBuilder() {
+    private getSearchIndexQueryBuilder(channelId: ID) {
         const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
         FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
             relations: variantRelations,
@@ -440,6 +464,9 @@ export class ElasticsearchIndexerController {
             },
         });
         FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
+        qb.leftJoin('variants.product', '__product')
+            .leftJoin('__product.channels', '__channel')
+            .where('__channel.id = :channelId', { channelId });
         return qb;
     }
 
@@ -451,7 +478,7 @@ export class ElasticsearchIndexerController {
         const { batchSize } = this.options;
         const i = Number.parseInt(batchNumber.toString(), 10);
         const variants = await qb
-            .where('variants__product.deletedAt IS NULL')
+            .andWhere('variants__product.deletedAt IS NULL')
             .take(batchSize)
             .skip(i * batchSize)
             .getMany();

+ 69 - 0
packages/elasticsearch-plugin/src/indexing-utils.ts

@@ -0,0 +1,69 @@
+import { Client } from '@elastic/elasticsearch';
+import { ID, Logger } from '@vendure/core';
+
+import { loggerCtx, PRODUCT_INDEX_NAME, VARIANT_INDEX_NAME } from './constants';
+
+export async function createIndices(client: Client, prefix: string) {
+    try {
+        const index = prefix + VARIANT_INDEX_NAME;
+        await client.indices.create({ index });
+        Logger.verbose(`Created index "${index}"`, loggerCtx);
+    } catch (e) {
+        Logger.error(JSON.stringify(e, null, 2), loggerCtx);
+    }
+    try {
+        const index = prefix + PRODUCT_INDEX_NAME;
+        await client.indices.create({ index });
+        Logger.verbose(`Created index "${index}"`, loggerCtx);
+    } catch (e) {
+        Logger.error(JSON.stringify(e, null, 2), loggerCtx);
+    }
+}
+
+export async function deleteIndices(client: Client, prefix: string) {
+    try {
+        const index = prefix + VARIANT_INDEX_NAME;
+        await client.indices.delete({ index });
+        Logger.verbose(`Deleted index "${index}"`, loggerCtx);
+    } catch (e) {
+        Logger.error(e, loggerCtx);
+    }
+    try {
+        const index = prefix + PRODUCT_INDEX_NAME;
+        await client.indices.delete({ index });
+        Logger.verbose(`Deleted index "${index}"`, loggerCtx);
+    } catch (e) {
+        Logger.error(e, loggerCtx);
+    }
+}
+
+export async function deleteByChannel(client: Client, prefix: string, channelId: ID) {
+    try {
+        const index = prefix + VARIANT_INDEX_NAME;
+        await client.deleteByQuery({
+            index,
+            body: {
+                query: {
+                    match: { channelId },
+                },
+            },
+        });
+        Logger.verbose(`Deleted index "${index} for channel "${channelId}"`, loggerCtx);
+    } catch (e) {
+        Logger.error(e, loggerCtx);
+    }
+    try {
+        const index = prefix + PRODUCT_INDEX_NAME;
+        await client.deleteByQuery({
+            index,
+            body: {
+                query: {
+                    match: { channelId },
+                },
+            },
+        });
+        Logger.verbose(`Deleted index "${index}" for channel "${channelId}"`, loggerCtx);
+    } catch (e) {
+        Logger.error(e, loggerCtx);
+    }
+}

+ 2 - 17
packages/elasticsearch-plugin/src/plugin.ts

@@ -1,4 +1,3 @@
-import { Client } from '@elastic/elasticsearch';
 import {
     CollectionModificationEvent,
     DeepRequired,
@@ -7,12 +6,9 @@ import {
     idsAreEqual,
     Logger,
     OnVendureBootstrap,
-    OnVendureClose,
     PluginCommonModule,
-    Product,
     ProductChannelEvent,
     ProductEvent,
-    ProductVariant,
     ProductVariantEvent,
     TaxRateModificationEvent,
     Type,
@@ -20,7 +16,7 @@ import {
 } from '@vendure/core';
 import { buffer, debounceTime, filter, map } from 'rxjs/operators';
 
-import { ELASTIC_SEARCH_CLIENT, ELASTIC_SEARCH_OPTIONS, loggerCtx } from './constants';
+import { ELASTIC_SEARCH_OPTIONS, loggerCtx } from './constants';
 import { CustomMappingsResolver } from './custom-mappings.resolver';
 import { ElasticsearchIndexService } from './elasticsearch-index.service';
 import { AdminElasticSearchResolver, ShopElasticSearchResolver } from './elasticsearch-resolver';
@@ -194,7 +190,6 @@ import { ElasticsearchOptions, mergeWithDefaults } from './options';
         ElasticsearchIndexService,
         ElasticsearchService,
         { provide: ELASTIC_SEARCH_OPTIONS, useFactory: () => ElasticsearchPlugin.options },
-        { provide: ELASTIC_SEARCH_CLIENT, useFactory: () => ElasticsearchPlugin.client },
     ],
     adminApiExtensions: { resolvers: [AdminElasticSearchResolver] },
     shopApiExtensions: {
@@ -211,9 +206,8 @@ import { ElasticsearchOptions, mergeWithDefaults } from './options';
     },
     workers: [ElasticsearchIndexerController],
 })
-export class ElasticsearchPlugin implements OnVendureBootstrap, OnVendureClose {
+export class ElasticsearchPlugin implements OnVendureBootstrap {
     private static options: DeepRequired<ElasticsearchOptions>;
-    private static client: Client;
 
     /** @internal */
     constructor(
@@ -226,11 +220,7 @@ export class ElasticsearchPlugin implements OnVendureBootstrap, OnVendureClose {
      * Set the plugin options.
      */
     static init(options: ElasticsearchOptions): Type<ElasticsearchPlugin> {
-        const { host, port } = options;
         this.options = mergeWithDefaults(options);
-        this.client = new Client({
-            node: `${host}:${port}`,
-        });
         return ElasticsearchPlugin;
     }
 
@@ -298,9 +288,4 @@ export class ElasticsearchPlugin implements OnVendureBootstrap, OnVendureClose {
             }
         });
     }
-
-    /** @internal */
-    onVendureClose() {
-        return ElasticsearchPlugin.client.close();
-    }
 }

+ 6 - 1
packages/elasticsearch-plugin/src/types.ts

@@ -135,6 +135,11 @@ export interface ReindexMessageResponse {
     duration: number;
 }
 
+export type ReindexMessageData = {
+    ctx: RequestContext;
+    dropIndices: boolean;
+};
+
 export type UpdateProductMessageData = {
     ctx: RequestContext;
     productId: ID;
@@ -156,7 +161,7 @@ export interface ProductChannelMessageData {
     channelId: ID;
 }
 
-export class ReindexMessage extends WorkerMessage<{ ctx: RequestContext }, ReindexMessageResponse> {
+export class ReindexMessage extends WorkerMessage<ReindexMessageData, ReindexMessageResponse> {
     static readonly pattern = 'Reindex';
 }
 export class UpdateVariantMessage extends WorkerMessage<UpdateVariantMessageData, boolean> {

+ 1 - 1
packages/testing/src/test-server.ts

@@ -112,10 +112,10 @@ export class TestServer {
             logging: false,
             ...options,
         });
-        await app.close();
         if (worker) {
             await worker.close();
         }
+        await app.close();
 
         (testingConfig.dbConnectionOptions as Mutable<SqljsConnectionOptions>).autoSave = false;
     }