Просмотр исходного кода

refactor(elasticsearch-plugin): Use WorkerService

Michael Bromley 6 лет назад
Родитель
Сommit
7df2b9cde6

+ 0 - 5
packages/elasticsearch-plugin/src/constants.ts

@@ -5,8 +5,3 @@ export const VARIANT_INDEX_TYPE = 'variant-index-item';
 export const PRODUCT_INDEX_NAME = 'products';
 export const PRODUCT_INDEX_TYPE = 'product-index-item';
 export const loggerCtx = 'ElasticsearchPlugin';
-export enum Message {
-    Reindex = 'Reindex',
-    UpdateVariantsById = 'UpdateVariantsById',
-    UpdateProductOrVariant = 'UpdateProductOrVariant',
-}

+ 69 - 91
packages/elasticsearch-plugin/src/elasticsearch-index.service.ts

@@ -1,15 +1,34 @@
 import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
 import { ClientProxy } from '@nestjs/microservices';
-import { ID, Job, JobService, Logger, Product, ProductVariant, RequestContext, VENDURE_WORKER_CLIENT } from '@vendure/core';
+import {
+    ID,
+    Job,
+    JobReporter,
+    JobService,
+    Logger,
+    Product,
+    ProductVariant,
+    RequestContext,
+    WorkerService,
+} from '@vendure/core';
 
-import { Message } from './constants';
 import { ReindexMessageResponse } from './indexer.controller';
+import { ReindexMessage, UpdateProductOrVariantMessage, UpdateVariantsByIdMessage } from './types';
 
 @Injectable()
-export class ElasticsearchIndexService implements OnModuleDestroy {
+export class ElasticsearchIndexService {
+    constructor(private workerService: WorkerService, private jobService: JobService) {}
 
-    constructor(@Inject(VENDURE_WORKER_CLIENT) private readonly client: ClientProxy,
-                private jobService: JobService) {}
+    reindex(ctx: RequestContext): Job {
+        return this.jobService.createJob({
+            name: 'reindex',
+            singleInstance: true,
+            work: async reporter => {
+                Logger.verbose(`sending reindex message`);
+                this.workerService.send(new ReindexMessage({ ctx })).subscribe(this.createObserver(reporter));
+            },
+        });
+    }
 
     /**
      * Updates the search index only for the affected entities.
@@ -17,16 +36,18 @@ export class ElasticsearchIndexService implements OnModuleDestroy {
     updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
         return this.jobService.createJob({
             name: 'update-index',
-            work: async () => {
-                if (updatedEntity instanceof Product) {
-                    return this.client.send(Message.UpdateProductOrVariant, { ctx, productId: updatedEntity.id })
-                        .toPromise()
-                        .catch(err => Logger.error(err));
-                } else {
-                    return this.client.send(Message.UpdateProductOrVariant, { ctx, variantId: updatedEntity.id })
-                        .toPromise()
-                        .catch(err => Logger.error(err));
-                }
+            work: reporter => {
+                const data =
+                    updatedEntity instanceof Product
+                        ? { ctx, productId: updatedEntity.id }
+                        : { ctx, variantId: updatedEntity.id };
+                this.workerService.send(new UpdateProductOrVariantMessage(data)).subscribe({
+                    complete: () => reporter.complete(true),
+                    error: err => {
+                        Logger.error(err);
+                        reporter.complete(false);
+                    },
+                });
             },
         });
     }
@@ -34,87 +55,44 @@ export class ElasticsearchIndexService implements OnModuleDestroy {
     updateVariantsById(ctx: RequestContext, ids: ID[]) {
         return this.jobService.createJob({
             name: 'update-index',
-            work: async reporter => {
-                return new Promise((resolve, reject) => {
-                    Logger.verbose(`sending reindex message`);
-                    let total: number | undefined;
-                    let duration = 0;
-                    let completed = 0;
-                    this.client.send<ReindexMessageResponse>(Message.UpdateVariantsById, { ctx, ids })
-                        .subscribe({
-                            next: response => {
-                                if (!total) {
-                                    total = response.total;
-                                }
-                                duration = response.duration;
-                                completed = response.completed;
-                                const progress = Math.ceil((completed / total) * 100);
-                                reporter.setProgress(progress);
-                            },
-                            complete: () => {
-                                resolve({
-                                    success: true,
-                                    indexedItemCount: total,
-                                    timeTaken: duration,
-                                });
-                            },
-                            error: (err) => {
-                                Logger.error(JSON.stringify(err));
-                                resolve({
-                                    success: false,
-                                    indexedItemCount: 0,
-                                    timeTaken: 0,
-                                });
-                            },
-                        });
-                });
+            work: reporter => {
+                Logger.verbose(`sending reindex message`);
+                this.workerService
+                    .send(new UpdateVariantsByIdMessage({ ctx, ids }))
+                    .subscribe(this.createObserver(reporter));
             },
         });
     }
 
-    reindex(ctx: RequestContext): Job {
-        return this.jobService.createJob({
-            name: 'reindex',
-            singleInstance: true,
-            work: async reporter => {
-                return new Promise((resolve, reject) => {
-                    Logger.verbose(`sending reindex message`);
-                    let total: number | undefined;
-                    let duration = 0;
-                    let completed = 0;
-                    this.client.send<ReindexMessageResponse>(Message.Reindex, { ctx })
-                        .subscribe({
-                            next: response => {
-                                if (!total) {
-                                    total = response.total;
-                                }
-                                duration = response.duration;
-                                completed = response.completed;
-                                const progress = Math.ceil((completed / total) * 100);
-                                reporter.setProgress(progress);
-                            },
-                            complete: () => {
-                                resolve({
-                                    success: true,
-                                    indexedItemCount: total,
-                                    timeTaken: duration,
-                                });
-                            },
-                            error: (err) => {
-                                Logger.error(JSON.stringify(err));
-                                resolve({
-                                    success: false,
-                                    indexedItemCount: 0,
-                                    timeTaken: 0,
-                                });
-                            },
-                        });
+    private createObserver(reporter: JobReporter) {
+        let total: number | undefined;
+        let duration = 0;
+        let completed = 0;
+        return {
+            next: (response: ReindexMessageResponse) => {
+                if (!total) {
+                    total = response.total;
+                }
+                duration = response.duration;
+                completed = response.completed;
+                const progress = Math.ceil((completed / total) * 100);
+                reporter.setProgress(progress);
+            },
+            complete: () => {
+                reporter.complete({
+                    success: true,
+                    indexedItemCount: total,
+                    timeTaken: duration,
                 });
             },
-        });
-    }
-
-    onModuleDestroy(): any {
-        this.client.close();
+            error: (err: any) => {
+                Logger.error(JSON.stringify(err));
+                reporter.complete({
+                    success: false,
+                    indexedItemCount: 0,
+                    timeTaken: 0,
+                });
+            },
+        };
     }
 }

+ 1 - 1
packages/elasticsearch-plugin/src/elasticsearch.service.ts

@@ -1,6 +1,6 @@
 import { Client } from '@elastic/elasticsearch';
 import { Inject, Injectable } from '@nestjs/common';
-import { JobInfo, SearchInput, SearchResponse, SearchResult } from '@vendure/common/lib/generated-types';
+import { JobInfo, SearchResult } from '@vendure/common/lib/generated-types';
 import {
     DeepRequired,
     FacetValue,

+ 9 - 15
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -22,7 +22,6 @@ import {
     ELASTIC_SEARCH_CLIENT,
     ELASTIC_SEARCH_OPTIONS,
     loggerCtx,
-    Message,
     PRODUCT_INDEX_NAME,
     PRODUCT_INDEX_TYPE,
     VARIANT_INDEX_NAME,
@@ -34,6 +33,9 @@ import {
     BulkOperationDoc,
     BulkResponseBody,
     ProductIndexItem,
+    ReindexMessage,
+    UpdateProductOrVariantMessage,
+    UpdateVariantsByIdMessage,
     VariantIndexItem,
 } from './types';
 
@@ -68,18 +70,13 @@ export class ElasticsearchIndexerController {
     /**
      * Updates the search index only for the affected entities.
      */
-    @MessagePattern(Message.UpdateProductOrVariant)
+    @MessagePattern(UpdateProductOrVariantMessage.pattern)
     updateProductOrVariant({
         ctx: rawContext,
         productId,
         variantId,
-    }: {
-        ctx: any;
-        productId?: ID;
-        variantId?: ID;
-    }): Observable<boolean> {
+    }: UpdateProductOrVariantMessage['data']): Observable<boolean> {
         const ctx = RequestContext.fromObject(rawContext);
-
         return defer(async () => {
             if (productId) {
                 await this.updateProduct(ctx, productId);
@@ -90,14 +87,11 @@ export class ElasticsearchIndexerController {
         });
     }
 
-    @MessagePattern(Message.UpdateVariantsById)
+    @MessagePattern(UpdateVariantsByIdMessage.pattern)
     updateVariantsById({
         ctx: rawContext,
         ids,
-    }: {
-        ctx: any;
-        ids: ID[];
-    }): Observable<ReindexMessageResponse> {
+    }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
         const { batchSize } = this.options;
 
@@ -164,8 +158,8 @@ export class ElasticsearchIndexerController {
         });
     }
 
-    @MessagePattern(Message.Reindex)
-    reindex({ ctx: rawContext }: { ctx: any }): Observable<ReindexMessageResponse> {
+    @MessagePattern(ReindexMessage.pattern)
+    reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
         const ctx = RequestContext.fromObject(rawContext);
         const { batchSize } = this.options;
 

+ 31 - 0
packages/elasticsearch-plugin/src/types.ts

@@ -6,6 +6,7 @@ import {
     SearchResult,
 } from '@vendure/common/lib/generated-types';
 import { ID } from '@vendure/common/lib/shared-types';
+import { RequestContext, WorkerMessage } from '@vendure/core';
 
 export type ElasticSearchInput = SearchInput & {
     priceRange?: PriceRange;
@@ -122,3 +123,33 @@ export type BulkResponseBody = {
     errors: boolean;
     items: BulkResponseResult[];
 };
+
+export interface ReindexMessageResponse {
+    total: number;
+    completed: number;
+    duration: number;
+}
+
+export type UpdateProductOrVariantMessageData = {
+    ctx: RequestContext;
+    productId?: ID;
+    variantId?: ID;
+};
+
+export interface UpdateVariantsByIdMessageData {
+    ctx: RequestContext;
+    ids: ID[];
+}
+
+export class ReindexMessage extends WorkerMessage<{ ctx: RequestContext }, ReindexMessageResponse> {
+    static readonly pattern = 'Reindex';
+}
+export class UpdateProductOrVariantMessage extends WorkerMessage<UpdateProductOrVariantMessageData, boolean> {
+    static readonly pattern = 'UpdateProductOrVariant';
+}
+export class UpdateVariantsByIdMessage extends WorkerMessage<
+    UpdateVariantsByIdMessageData,
+    ReindexMessageResponse
+> {
+    static readonly pattern = 'UpdateVariantsById';
+}