Browse Source

feat(elasticsearch-plugin): Update to use new job queue

Michael Bromley 5 years ago
parent
commit
42b1d2875a

+ 4 - 0
packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts

@@ -515,6 +515,8 @@ describe('Elasticsearch plugin', () => {
                     },
                 );
                 await awaitRunningJobs(adminClient);
+                // add an additional check for the collection filters to update
+                await awaitRunningJobs(adminClient);
                 const result = await doAdminSearchQuery({ collectionId: 'T_2', groupByProduct: true });
 
                 expect(result.search.items.map((i) => i.productName)).toEqual([
@@ -561,6 +563,8 @@ describe('Elasticsearch plugin', () => {
                     },
                 });
                 await awaitRunningJobs(adminClient);
+                // add an additional check for the collection filters to update
+                await awaitRunningJobs(adminClient);
                 const result = await doAdminSearchQuery({
                     collectionId: createCollection.id,
                     groupByProduct: true,

+ 81 - 83
packages/elasticsearch-plugin/src/elasticsearch-index.service.ts

@@ -3,8 +3,8 @@ import {
     Asset,
     ID,
     Job,
-    JobReporter,
-    JobService,
+    JobQueue,
+    JobQueueService,
     Logger,
     Product,
     ProductVariant,
@@ -21,125 +21,127 @@ import {
     ReindexMessage,
     RemoveProductFromChannelMessage,
     UpdateAssetMessage,
+    UpdateIndexQueueJobData,
     UpdateProductMessage,
     UpdateVariantMessage,
     UpdateVariantsByIdMessage,
 } from './types';
 
+let updateIndexQueue: JobQueue<UpdateIndexQueueJobData> | undefined;
+
 @Injectable()
 export class ElasticsearchIndexService {
-    constructor(private workerService: WorkerService, private jobService: JobService) {}
-
-    reindex(ctx: RequestContext, dropIndices: boolean): Job {
-        return this.jobService.createJob({
-            name: 'reindex',
-            singleInstance: true,
-            work: async (reporter) => {
-                Logger.verbose(`sending reindex message`);
-                this.workerService
-                    .send(new ReindexMessage({ ctx: ctx.serialize(), dropIndices }))
-                    .subscribe(this.createObserver(reporter));
+    constructor(private workerService: WorkerService, private jobService: JobQueueService) {}
+
+    initJobQueue() {
+        updateIndexQueue = this.jobService.createQueue({
+            name: 'update-search-index',
+            concurrency: 1,
+            process: (job) => {
+                const data = job.data;
+                switch (data.type) {
+                    case 'reindex':
+                        Logger.verbose(`sending ReindexMessage`);
+                        this.sendMessageWithProgress(job, new ReindexMessage(data));
+                        break;
+                    case 'update-product':
+                        this.sendMessage(job, new UpdateProductMessage(data));
+                        break;
+                    case 'update-variants':
+                        this.sendMessage(job, new UpdateVariantMessage(data));
+                        break;
+                    case 'delete-product':
+                        this.sendMessage(job, new DeleteProductMessage(data));
+                        break;
+                    case 'delete-variant':
+                        this.sendMessage(job, new DeleteVariantMessage(data));
+                        break;
+                    case 'update-variants-by-id':
+                        this.sendMessageWithProgress(job, new UpdateVariantsByIdMessage(data));
+                        break;
+                    case 'update-asset':
+                        this.sendMessage(job, new UpdateAssetMessage(data));
+                        break;
+                    case 'assign-product-to-channel':
+                        this.sendMessage(job, new AssignProductToChannelMessage(data));
+                        break;
+                    case 'remove-product-from-channel':
+                        this.sendMessage(job, new RemoveProductFromChannelMessage(data));
+                        break;
+                }
             },
         });
     }
 
+    reindex(ctx: RequestContext, dropIndices: boolean) {
+        return this.addJobToQueue({ type: 'reindex', ctx: ctx.serialize(), dropIndices });
+    }
+
     updateProduct(ctx: RequestContext, product: Product) {
-        const data = { ctx: ctx.serialize(), productId: product.id };
-        return this.createShortWorkerJob(new UpdateProductMessage(data), {
-            entity: 'Product',
-            id: product.id,
-        });
+        this.addJobToQueue({ type: 'update-product', ctx: ctx.serialize(), productId: product.id });
     }
 
     updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map((v) => v.id);
-        const data = { ctx: ctx.serialize(), variantIds };
-        return this.createShortWorkerJob(new UpdateVariantMessage(data), {
-            entity: 'ProductVariant',
-            ids: variantIds,
-        });
+        this.addJobToQueue({ type: 'update-variants', ctx: ctx.serialize(), variantIds });
     }
 
     deleteProduct(ctx: RequestContext, product: Product) {
-        const data = { ctx: ctx.serialize(), productId: product.id };
-        return this.createShortWorkerJob(new DeleteProductMessage(data), {
-            entity: 'Product',
-            id: product.id,
-        });
+        this.addJobToQueue({ type: 'delete-product', ctx: ctx.serialize(), productId: product.id });
     }
 
     deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map((v) => v.id);
-        const data = { ctx: ctx.serialize(), variantIds };
-        return this.createShortWorkerJob(new DeleteVariantMessage(data), {
-            entity: 'ProductVariant',
-            id: variantIds,
-        });
+        this.addJobToQueue({ type: 'delete-variant', ctx: ctx.serialize(), variantIds });
     }
 
     assignProductToChannel(ctx: RequestContext, product: Product, channelId: ID) {
-        const data = { ctx: ctx.serialize(), productId: product.id, channelId };
-        return this.createShortWorkerJob(new AssignProductToChannelMessage(data), {
-            entity: 'Product',
-            id: product.id,
+        this.addJobToQueue({
+            type: 'assign-product-to-channel',
+            ctx: ctx.serialize(),
+            productId: product.id,
+            channelId,
         });
     }
 
     removeProductFromChannel(ctx: RequestContext, product: Product, channelId: ID) {
-        const data = { ctx: ctx.serialize(), productId: product.id, channelId };
-        return this.createShortWorkerJob(new RemoveProductFromChannelMessage(data), {
-            entity: 'Product',
-            id: product.id,
+        this.addJobToQueue({
+            type: 'remove-product-from-channel',
+            ctx: ctx.serialize(),
+            productId: product.id,
+            channelId,
         });
     }
 
     updateVariantsById(ctx: RequestContext, ids: ID[]) {
-        return this.jobService.createJob({
-            name: 'update-variants',
-            metadata: {
-                variantIds: ids,
-            },
-            work: (reporter) => {
-                Logger.verbose(`sending UpdateVariantsByIdMessage`);
-                this.workerService
-                    .send(new UpdateVariantsByIdMessage({ ctx: ctx.serialize(), ids }))
-                    .subscribe(this.createObserver(reporter));
-            },
-        });
+        this.addJobToQueue({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids });
     }
 
     updateAsset(ctx: RequestContext, asset: Asset) {
-        const data = { ctx: ctx.serialize(), asset };
-        return this.createShortWorkerJob(new UpdateAssetMessage(data), {
-            entity: 'Asset',
-            id: asset.id,
-        });
+        this.addJobToQueue({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any });
     }
 
-    /**
-     * Creates a short-running job that does not expect progress updates.
-     */
-    private createShortWorkerJob<T extends WorkerMessage<any, any>>(message: T, metadata: any) {
-        return this.jobService.createJob({
-            name: 'update-index',
-            metadata,
-            work: (reporter) => {
-                this.workerService.send(message).subscribe({
-                    complete: () => reporter.complete(true),
-                    error: (err) => {
-                        Logger.error(err);
-                        reporter.complete(false);
-                    },
-                });
+    private addJobToQueue(data: UpdateIndexQueueJobData) {
+        if (updateIndexQueue) {
+            return updateIndexQueue.add(data);
+        }
+    }
+
+    private sendMessage(job: Job<any>, message: WorkerMessage<any, any>) {
+        this.workerService.send(message).subscribe({
+            complete: () => job.complete(true),
+            error: (err) => {
+                Logger.error(err);
+                job.fail(err);
             },
         });
     }
 
-    private createObserver(reporter: JobReporter) {
+    private sendMessageWithProgress(job: Job<any>, message: WorkerMessage<any, ReindexMessageResponse>) {
         let total: number | undefined;
         let duration = 0;
         let completed = 0;
-        return {
+        this.workerService.send(message).subscribe({
             next: (response: ReindexMessageResponse) => {
                 if (!total) {
                     total = response.total;
@@ -147,10 +149,10 @@ export class ElasticsearchIndexService {
                 duration = response.duration;
                 completed = response.completed;
                 const progress = Math.ceil((completed / total) * 100);
-                reporter.setProgress(progress);
+                job.setProgress(progress);
             },
             complete: () => {
-                reporter.complete({
+                job.complete({
                     success: true,
                     indexedItemCount: total,
                     timeTaken: duration,
@@ -158,12 +160,8 @@ export class ElasticsearchIndexService {
             },
             error: (err: any) => {
                 Logger.error(JSON.stringify(err));
-                reporter.complete({
-                    success: false,
-                    indexedItemCount: 0,
-                    timeTaken: 0,
-                });
+                job.fail();
             },
-        };
+        });
     }
 }

+ 4 - 4
packages/elasticsearch-plugin/src/elasticsearch-resolver.ts

@@ -1,6 +1,6 @@
 import { Args, Mutation, Parent, Query, ResolveField, Resolver } from '@nestjs/graphql';
 import {
-    JobInfo,
+    Job as GraphQLJob,
     Permission,
     QuerySearchArgs,
     SearchInput,
@@ -34,7 +34,7 @@ export class ShopElasticSearchResolver implements Omit<SearchResolver, 'reindex'
         @Parent() parent: { input: ElasticSearchInput },
     ): Promise<Array<{ facetValue: FacetValue; count: number }>> {
         const facetValues = await this.elasticsearchService.facetValues(ctx, parent.input, true);
-        return facetValues.filter(i => !i.facetValue.facet.isPrivate);
+        return facetValues.filter((i) => !i.facetValue.facet.isPrivate);
     }
 
     @ResolveField()
@@ -72,7 +72,7 @@ export class AdminElasticSearchResolver implements SearchResolver {
 
     @Mutation()
     @Allow(Permission.UpdateCatalog)
-    async reindex(@Ctx() ctx: RequestContext): Promise<JobInfo> {
-        return this.elasticsearchService.reindex(ctx, false);
+    async reindex(@Ctx() ctx: RequestContext): Promise<GraphQLJob> {
+        return (this.elasticsearchService.reindex(ctx, false) as unknown) as GraphQLJob;
     }
 }

+ 19 - 15
packages/elasticsearch-plugin/src/elasticsearch.service.ts

@@ -1,11 +1,12 @@
 import { Client } from '@elastic/elasticsearch';
 import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
-import { JobInfo, SearchResult, SearchResultAsset } from '@vendure/common/lib/generated-types';
+import { SearchResult, SearchResultAsset } from '@vendure/common/lib/generated-types';
 import {
     DeepRequired,
     FacetValue,
     FacetValueService,
     InternalServerError,
+    Job,
     Logger,
     RequestContext,
     SearchService,
@@ -104,7 +105,7 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
                 body: elasticSearchBody,
             });
             return {
-                items: body.hits.hits.map(hit => this.mapProductToSearchResult(hit)),
+                items: body.hits.hits.map((hit) => this.mapProductToSearchResult(hit)),
                 totalItems: body.hits.total.value,
             };
         } else {
@@ -114,7 +115,7 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
                 body: elasticSearchBody,
             });
             return {
-                items: body.hits.hits.map(hit => this.mapVariantToSearchResult(hit)),
+                items: body.hits.hits.map((hit) => this.mapVariantToSearchResult(hit)),
                 totalItems: body.hits.total.value,
             };
         }
@@ -153,9 +154,12 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
 
         const buckets = body.aggregations ? body.aggregations.facetValue.buckets : [];
 
-        const facetValues = await this.facetValueService.findByIds(buckets.map(b => b.key), ctx.languageCode);
+        const facetValues = await this.facetValueService.findByIds(
+            buckets.map((b) => b.key),
+            ctx.languageCode,
+        );
         return facetValues.map((facetValue, index) => {
-            const bucket = buckets.find(b => b.key.toString() === facetValue.id.toString());
+            const bucket = buckets.find((b) => b.key.toString() === facetValue.id.toString());
             return {
                 facetValue,
                 count: bucket ? bucket.doc_count : 0,
@@ -229,28 +233,28 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
                 min: aggregations.minPriceWithTax.value || 0,
                 max: aggregations.maxPriceWithTax.value || 0,
             },
-            buckets: aggregations.prices.buckets.map(mapPriceBuckets).filter(x => 0 < x.count),
-            bucketsWithTax: aggregations.prices.buckets.map(mapPriceBuckets).filter(x => 0 < x.count),
+            buckets: aggregations.prices.buckets.map(mapPriceBuckets).filter((x) => 0 < x.count),
+            bucketsWithTax: aggregations.prices.buckets.map(mapPriceBuckets).filter((x) => 0 < x.count),
         };
     }
 
     /**
      * Rebuilds the full search index.
      */
-    async reindex(ctx: RequestContext, dropIndices = true): Promise<JobInfo> {
+    async reindex(ctx: RequestContext, dropIndices = true): Promise<Job> {
         const { indexPrefix } = this.options;
-        const job = this.elasticsearchIndexService.reindex(ctx, dropIndices);
-        job.start();
-        return job;
+        const job = await this.elasticsearchIndexService.reindex(ctx, dropIndices);
+        // tslint:disable-next-line:no-non-null-assertion
+        return job!;
     }
 
     /**
      * Reindexes all in current Channel without dropping indices.
      */
-    async updateAll(ctx: RequestContext): Promise<JobInfo> {
-        const job = this.elasticsearchIndexService.reindex(ctx, false);
-        job.start();
-        return job;
+    async updateAll(ctx: RequestContext): Promise<Job> {
+        const job = await this.elasticsearchIndexService.reindex(ctx, false);
+        // tslint:disable-next-line:no-non-null-assertion
+        return job!;
     }
 
     private mapVariantToSearchResult(hit: SearchHit<VariantIndexItem>): SearchResult {

+ 0 - 2
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -9,7 +9,6 @@ import {
     AsyncQueue,
     FacetValue,
     ID,
-    JobService,
     Logger,
     Product,
     ProductVariant,
@@ -76,7 +75,6 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         @InjectConnection() private connection: Connection,
         @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
         private productVariantService: ProductVariantService,
-        private jobService: JobService,
     ) {}
 
     onModuleInit(): any {

+ 26 - 21
packages/elasticsearch-plugin/src/plugin.ts

@@ -240,36 +240,41 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
         Logger.info(`Sucessfully connected to Elasticsearch instance at "${host}:${port}"`, loggerCtx);
 
         await this.elasticsearchService.createIndicesIfNotExists();
+        this.elasticsearchIndexService.initJobQueue();
 
-        this.eventBus.ofType(ProductEvent).subscribe(event => {
+        this.eventBus.ofType(ProductEvent).subscribe((event) => {
             if (event.type === 'deleted') {
-                return this.elasticsearchIndexService.deleteProduct(event.ctx, event.product).start();
+                return this.elasticsearchIndexService.deleteProduct(event.ctx, event.product);
             } else {
-                return this.elasticsearchIndexService.updateProduct(event.ctx, event.product).start();
+                return this.elasticsearchIndexService.updateProduct(event.ctx, event.product);
             }
         });
-        this.eventBus.ofType(ProductVariantEvent).subscribe(event => {
+        this.eventBus.ofType(ProductVariantEvent).subscribe((event) => {
             if (event.type === 'deleted') {
-                return this.elasticsearchIndexService.deleteVariant(event.ctx, event.variants).start();
+                return this.elasticsearchIndexService.deleteVariant(event.ctx, event.variants);
             } else {
-                return this.elasticsearchIndexService.updateVariants(event.ctx, event.variants).start();
+                return this.elasticsearchIndexService.updateVariants(event.ctx, event.variants);
             }
         });
-        this.eventBus.ofType(AssetEvent).subscribe(event => {
+        this.eventBus.ofType(AssetEvent).subscribe((event) => {
             if (event.type === 'updated') {
-                return this.elasticsearchIndexService.updateAsset(event.ctx, event.asset).start();
+                return this.elasticsearchIndexService.updateAsset(event.ctx, event.asset);
             }
         });
 
-        this.eventBus.ofType(ProductChannelEvent).subscribe(event => {
+        this.eventBus.ofType(ProductChannelEvent).subscribe((event) => {
             if (event.type === 'assigned') {
-                return this.elasticsearchIndexService
-                    .assignProductToChannel(event.ctx, event.product, event.channelId)
-                    .start();
+                return this.elasticsearchIndexService.assignProductToChannel(
+                    event.ctx,
+                    event.product,
+                    event.channelId,
+                );
             } else {
-                return this.elasticsearchIndexService
-                    .removeProductFromChannel(event.ctx, event.product, event.channelId)
-                    .start();
+                return this.elasticsearchIndexService.removeProductFromChannel(
+                    event.ctx,
+                    event.product,
+                    event.channelId,
+                );
             }
         });
 
@@ -278,18 +283,18 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
         collectionModification$
             .pipe(
                 buffer(closingNotifier$),
-                filter(events => 0 < events.length),
-                map(events => ({
+                filter((events) => 0 < events.length),
+                map((events) => ({
                     ctx: events[0].ctx,
                     ids: events.reduce((ids, e) => [...ids, ...e.productVariantIds], [] as ID[]),
                 })),
-                filter(e => 0 < e.ids.length),
+                filter((e) => 0 < e.ids.length),
             )
-            .subscribe(events => {
-                return this.elasticsearchIndexService.updateVariantsById(events.ctx, events.ids).start();
+            .subscribe((events) => {
+                return this.elasticsearchIndexService.updateVariantsById(events.ctx, events.ids);
             });
 
-        this.eventBus.ofType(TaxRateModificationEvent).subscribe(event => {
+        this.eventBus.ofType(TaxRateModificationEvent).subscribe((event) => {
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
                 return this.elasticsearchService.updateAll(event.ctx);

+ 25 - 1
packages/elasticsearch-plugin/src/types.ts

@@ -9,6 +9,8 @@ import {
 import { ID } from '@vendure/common/lib/shared-types';
 import { Asset, SerializedRequestContext, WorkerMessage } from '@vendure/core';
 
+import { JsonCompatible } from '../../common/src/shared-types';
+
 export type ElasticSearchInput = SearchInput & {
     priceRange?: PriceRange;
     priceRangeWithTax?: PriceRange;
@@ -175,7 +177,7 @@ export interface ProductChannelMessageData {
 }
 export interface UpdateAssetMessageData {
     ctx: SerializedRequestContext;
-    asset: Asset;
+    asset: JsonCompatible<Required<Asset>>;
 }
 
 export class ReindexMessage extends WorkerMessage<ReindexMessageData, ReindexMessageResponse> {
@@ -215,6 +217,28 @@ type CustomMappingDefinition<Args extends any[], T extends string, R> = {
     valueFn: (...args: Args) => R;
 };
 
+type NamedJobData<Type extends string, MessageData> = { type: Type } & MessageData;
+
+export type ReindexJobData = NamedJobData<'reindex', ReindexMessageData>;
+type UpdateProductJobData = NamedJobData<'update-product', UpdateProductMessageData>;
+type UpdateVariantsJobData = NamedJobData<'update-variants', UpdateVariantMessageData>;
+type DeleteProductJobData = NamedJobData<'delete-product', UpdateProductMessageData>;
+type DeleteVariantJobData = NamedJobData<'delete-variant', UpdateVariantMessageData>;
+type UpdateVariantsByIdJobData = NamedJobData<'update-variants-by-id', UpdateVariantsByIdMessageData>;
+type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>;
+type AssignProductToChannelJobData = NamedJobData<'assign-product-to-channel', ProductChannelMessageData>;
+type RemoveProductFromChannelJobData = NamedJobData<'remove-product-from-channel', ProductChannelMessageData>;
+export type UpdateIndexQueueJobData =
+    | ReindexJobData
+    | UpdateProductJobData
+    | UpdateVariantsJobData
+    | DeleteProductJobData
+    | DeleteVariantJobData
+    | UpdateVariantsByIdJobData
+    | UpdateAssetJobData
+    | AssignProductToChannelJobData
+    | RemoveProductFromChannelJobData;
+
 type CustomStringMapping<Args extends any[]> = CustomMappingDefinition<Args, 'String!', string>;
 type CustomStringMappingNullable<Args extends any[]> = CustomMappingDefinition<Args, 'String', Maybe<string>>;
 type CustomIntMapping<Args extends any[]> = CustomMappingDefinition<Args, 'Int!', number>;