Browse Source

refactor(core) Remove worker from default search plugin

Fred Cox 5 years ago
parent
commit
ef26487b80

+ 1 - 4
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -57,11 +57,10 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
  */
 @VendurePlugin({
     imports: [PluginCommonModule],
-    providers: [FulltextSearchService, SearchIndexService],
+    providers: [FulltextSearchService, SearchIndexService, IndexerController],
     adminApiExtensions: { resolvers: [AdminFulltextSearchResolver] },
     shopApiExtensions: { resolvers: [ShopFulltextSearchResolver] },
     entities: [SearchIndexItem],
-    workers: [IndexerController],
 })
 export class DefaultSearchPlugin implements OnApplicationBootstrap {
     /** @internal */
@@ -69,8 +68,6 @@ export class DefaultSearchPlugin implements OnApplicationBootstrap {
 
     /** @internal */
     async onApplicationBootstrap() {
-        this.searchIndexService.initJobQueue();
-
         this.eventBus.ofType(ProductEvent).subscribe(event => {
             if (event.type === 'deleted') {
                 return this.searchIndexService.deleteProduct(event.ctx, event.product);

+ 59 - 104
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -1,5 +1,4 @@
-import { Controller } from '@nestjs/common';
-import { MessagePattern } from '@nestjs/microservices';
+import { Injectable } from '@nestjs/common';
 import { LanguageCode } from '@vendure/common/lib/generated-types';
 import { ID } from '@vendure/common/lib/shared-types';
 import { unique } from '@vendure/common/lib/unique';
@@ -20,18 +19,14 @@ import { TransactionalConnection } from '../../../service/transaction/transactio
 import { asyncObservable } from '../../../worker/async-observable';
 import { SearchIndexItem } from '../search-index-item.entity';
 import {
-    AssignProductToChannelMessage,
-    AssignVariantToChannelMessage,
-    DeleteAssetMessage,
-    DeleteProductMessage,
-    DeleteVariantMessage,
-    ReindexMessage,
-    RemoveProductFromChannelMessage,
-    RemoveVariantFromChannelMessage,
-    UpdateAssetMessage,
-    UpdateProductMessage,
-    UpdateVariantMessage,
-    UpdateVariantsByIdMessage,
+    ProductChannelMessageData,
+    ReindexMessageData,
+    ReindexMessageResponse,
+    UpdateAssetMessageData,
+    UpdateProductMessageData,
+    UpdateVariantMessageData,
+    UpdateVariantsByIdMessageData,
+    VariantChannelMessageData,
 } from '../types';
 
 export const BATCH_SIZE = 1000;
@@ -52,7 +47,7 @@ export const variantRelations = [
 
 export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
 
-@Controller()
+@Injectable()
 export class IndexerController {
     private queue = new AsyncQueue('search-index');
 
@@ -62,8 +57,7 @@ export class IndexerController {
         private configService: ConfigService,
     ) {}
 
-    @MessagePattern(ReindexMessage.pattern)
-    reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
+    reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
         const ctx = RequestContext.deserialize(rawContext);
         return asyncObservable(async observer => {
             const timeStart = Date.now();
@@ -101,11 +95,10 @@ export class IndexerController {
         });
     }
 
-    @MessagePattern(UpdateVariantsByIdMessage.pattern)
     updateVariantsById({
         ctx: rawContext,
         ids,
-    }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
+    }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
         const ctx = RequestContext.deserialize(rawContext);
 
         return asyncObservable(async observer => {
@@ -140,117 +133,79 @@ export class IndexerController {
         });
     }
 
-    @MessagePattern(UpdateProductMessage.pattern)
-    updateProduct(data: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
+    async updateProduct(data: UpdateProductMessageData): Promise<boolean> {
         const ctx = RequestContext.deserialize(data.ctx);
-        return asyncObservable(async () => {
-            return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
-        });
+        return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
     }
 
-    @MessagePattern(UpdateVariantMessage.pattern)
-    updateVariants(data: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
+    async updateVariants(data: UpdateVariantMessageData): Promise<boolean> {
         const ctx = RequestContext.deserialize(data.ctx);
-        return asyncObservable(async () => {
-            return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
-        });
+        return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
     }
 
-    @MessagePattern(DeleteProductMessage.pattern)
-    deleteProduct(data: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
+    async deleteProduct(data: UpdateProductMessageData): Promise<boolean> {
         const ctx = RequestContext.deserialize(data.ctx);
-        return asyncObservable(async () => {
-            return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
-        });
+        return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
     }
 
-    @MessagePattern(DeleteVariantMessage.pattern)
-    deleteVariant(data: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
+    async deleteVariant(data: UpdateVariantMessageData): Promise<boolean> {
         const ctx = RequestContext.deserialize(data.ctx);
-        return asyncObservable(async () => {
-            const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
-            if (variants.length) {
-                await this.removeSearchIndexItems(
-                    ctx.languageCode,
-                    ctx.channelId,
-                    variants.map(v => v.id),
-                );
-            }
-            return true;
-        });
+        const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
+        if (variants.length) {
+            await this.removeSearchIndexItems(
+                ctx.languageCode,
+                ctx.channelId,
+                variants.map(v => v.id),
+            );
+        }
+        return true;
     }
 
-    @MessagePattern(AssignProductToChannelMessage.pattern)
-    assignProductToChannel(
-        data: AssignProductToChannelMessage['data'],
-    ): Observable<AssignProductToChannelMessage['response']> {
+    async assignProductToChannel(data: ProductChannelMessageData): Promise<boolean> {
         const ctx = RequestContext.deserialize(data.ctx);
-        return asyncObservable(async () => {
-            return this.updateProductInChannel(ctx, data.productId, data.channelId);
-        });
+        return this.updateProductInChannel(ctx, data.productId, data.channelId);
     }
 
-    @MessagePattern(RemoveProductFromChannelMessage.pattern)
-    removeProductFromChannel(
-        data: RemoveProductFromChannelMessage['data'],
-    ): Observable<RemoveProductFromChannelMessage['response']> {
+    async removeProductFromChannel(data: ProductChannelMessageData): Promise<boolean> {
         const ctx = RequestContext.deserialize(data.ctx);
-        return asyncObservable(async () => {
-            return this.deleteProductInChannel(ctx, data.productId, data.channelId);
-        });
+        return this.deleteProductInChannel(ctx, data.productId, data.channelId);
     }
 
-    @MessagePattern(AssignVariantToChannelMessage.pattern)
-    assignVariantToChannel(
-        data: AssignVariantToChannelMessage['data'],
-    ): Observable<AssignProductToChannelMessage['response']> {
+    async assignVariantToChannel(data: VariantChannelMessageData): Promise<boolean> {
         const ctx = RequestContext.deserialize(data.ctx);
-        return asyncObservable(async () => {
-            return this.updateVariantsInChannel(ctx, [data.productVariantId], data.channelId);
-        });
+        return this.updateVariantsInChannel(ctx, [data.productVariantId], data.channelId);
     }
 
-    @MessagePattern(RemoveVariantFromChannelMessage.pattern)
-    removeVariantFromChannel(
-        data: RemoveVariantFromChannelMessage['data'],
-    ): Observable<RemoveProductFromChannelMessage['response']> {
+    async removeVariantFromChannel(data: VariantChannelMessageData): Promise<boolean> {
         const ctx = RequestContext.deserialize(data.ctx);
-        return asyncObservable(async () => {
-            await this.removeSearchIndexItems(ctx.languageCode, data.channelId, [data.productVariantId]);
-            return true;
-        });
+        await this.removeSearchIndexItems(ctx.languageCode, data.channelId, [data.productVariantId]);
+        return true;
     }
 
-    @MessagePattern(UpdateAssetMessage.pattern)
-    updateAsset(data: UpdateAssetMessage['data']): Observable<UpdateAssetMessage['response']> {
-        return asyncObservable(async () => {
-            const id = data.asset.id;
-            function getFocalPoint(point?: { x: number; y: number }) {
-                return point && point.x && point.y ? point : null;
-            }
-            const focalPoint = getFocalPoint(data.asset.focalPoint);
-            await this.connection
-                .getRepository(SearchIndexItem)
-                .update({ productAssetId: id }, { productPreviewFocalPoint: focalPoint });
-            await this.connection
-                .getRepository(SearchIndexItem)
-                .update({ productVariantAssetId: id }, { productVariantPreviewFocalPoint: focalPoint });
-            return true;
-        });
+    async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
+        const id = data.asset.id;
+        function getFocalPoint(point?: { x: number; y: number }) {
+            return point && point.x && point.y ? point : null;
+        }
+        const focalPoint = getFocalPoint(data.asset.focalPoint);
+        await this.connection
+            .getRepository(SearchIndexItem)
+            .update({ productAssetId: id }, { productPreviewFocalPoint: focalPoint });
+        await this.connection
+            .getRepository(SearchIndexItem)
+            .update({ productVariantAssetId: id }, { productVariantPreviewFocalPoint: focalPoint });
+        return true;
     }
 
-    @MessagePattern(DeleteAssetMessage.pattern)
-    deleteAsset(data: DeleteAssetMessage['data']): Observable<DeleteAssetMessage['response']> {
-        return asyncObservable(async () => {
-            const id = data.asset.id;
-            await this.connection
-                .getRepository(SearchIndexItem)
-                .update({ productAssetId: id }, { productAssetId: null });
-            await this.connection
-                .getRepository(SearchIndexItem)
-                .update({ productVariantAssetId: id }, { productVariantAssetId: null });
-            return true;
-        });
+    async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
+        const id = data.asset.id;
+        await this.connection
+            .getRepository(SearchIndexItem)
+            .update({ productAssetId: id }, { productAssetId: null });
+        await this.connection
+            .getRepository(SearchIndexItem)
+            .update({ productVariantAssetId: id }, { productVariantAssetId: null });
+        return true;
     }
 
     private async updateProductInChannel(

+ 40 - 69
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -1,6 +1,7 @@
-import { Injectable } from '@nestjs/common';
+import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
 import { ID } from '@vendure/common/lib/shared-types';
 import { assertNever } from '@vendure/common/lib/shared-utils';
+import { Observable } from 'rxjs';
 
 import { RequestContext } from '../../../api/common/request-context';
 import { Logger } from '../../../config/logger/vendure-logger';
@@ -10,65 +11,50 @@ import { Product } from '../../../entity/product/product.entity';
 import { Job } from '../../../job-queue/job';
 import { JobQueue } from '../../../job-queue/job-queue';
 import { JobQueueService } from '../../../job-queue/job-queue.service';
-import { WorkerMessage } from '../../../worker/types';
-import { WorkerService } from '../../../worker/worker.service';
-import {
-    AssignProductToChannelMessage,
-    AssignVariantToChannelMessage,
-    DeleteAssetMessage,
-    DeleteProductMessage,
-    DeleteVariantMessage,
-    ReindexMessage,
-    ReindexMessageResponse,
-    RemoveProductFromChannelMessage,
-    RemoveVariantFromChannelMessage,
-    UpdateAssetMessage,
-    UpdateIndexQueueJobData,
-    UpdateProductMessage,
-    UpdateVariantMessage,
-    UpdateVariantsByIdMessage,
-} from '../types';
-
-let updateIndexQueue: JobQueue<UpdateIndexQueueJobData> | undefined;
+import { ReindexMessageResponse, UpdateIndexQueueJobData } from '../types';
+
+import { IndexerController } from './indexer.controller';
 
 /**
  * This service is responsible for messaging the {@link IndexerController} with search index updates.
  */
 @Injectable()
-export class SearchIndexService {
-    constructor(private workerService: WorkerService, private jobService: JobQueueService) {}
+export class SearchIndexService implements OnApplicationBootstrap {
+    private updateIndexQueue: JobQueue<UpdateIndexQueueJobData>;
+
+    constructor(private jobService: JobQueueService, private indexerController: IndexerController) {}
 
-    initJobQueue() {
-        updateIndexQueue = this.jobService.createQueue({
+    onApplicationBootstrap() {
+        this.updateIndexQueue = this.jobService.createQueue({
             name: 'update-search-index',
             process: job => {
                 const data = job.data;
                 switch (data.type) {
                     case 'reindex':
                         Logger.verbose(`sending ReindexMessage`);
-                        return this.sendMessageWithProgress(job, new ReindexMessage(data));
+                        return this.jobWithProgress(job, this.indexerController.reindex(data));
                     case 'update-product':
-                        return this.sendMessage(job, new UpdateProductMessage(data));
+                        return this.indexerController.updateProduct(data);
                     case 'update-variants':
-                        return this.sendMessage(job, new UpdateVariantMessage(data));
+                        return this.indexerController.updateVariants(data);
                     case 'delete-product':
-                        return this.sendMessage(job, new DeleteProductMessage(data));
+                        return this.indexerController.deleteProduct(data);
                     case 'delete-variant':
-                        return this.sendMessage(job, new DeleteVariantMessage(data));
+                        return this.indexerController.deleteVariant(data);
                     case 'update-variants-by-id':
-                        return this.sendMessageWithProgress(job, new UpdateVariantsByIdMessage(data));
+                        return this.jobWithProgress(job, this.indexerController.updateVariantsById(data));
                     case 'update-asset':
-                        return this.sendMessage(job, new UpdateAssetMessage(data));
+                        return this.indexerController.updateAsset(data);
                     case 'delete-asset':
-                        return this.sendMessage(job, new DeleteAssetMessage(data));
+                        return this.indexerController.deleteAsset(data);
                     case 'assign-product-to-channel':
-                        return this.sendMessage(job, new AssignProductToChannelMessage(data));
+                        return this.indexerController.assignProductToChannel(data);
                     case 'remove-product-from-channel':
-                        return this.sendMessage(job, new RemoveProductFromChannelMessage(data));
+                        return this.indexerController.removeProductFromChannel(data);
                     case 'assign-variant-to-channel':
-                        return this.sendMessage(job, new AssignVariantToChannelMessage(data));
+                        return this.indexerController.assignVariantToChannel(data);
                     case 'remove-variant-from-channel':
-                        return this.sendMessage(job, new RemoveVariantFromChannelMessage(data));
+                        return this.indexerController.removeVariantFromChannel(data);
                     default:
                         assertNever(data);
                         return Promise.resolve();
@@ -78,41 +64,41 @@ export class SearchIndexService {
     }
 
     reindex(ctx: RequestContext) {
-        return this.addJobToQueue({ type: 'reindex', ctx: ctx.serialize() });
+        return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() });
     }
 
     updateProduct(ctx: RequestContext, product: Product) {
-        this.addJobToQueue({ type: 'update-product', ctx: ctx.serialize(), productId: product.id });
+        this.updateIndexQueue.add({ type: 'update-product', ctx: ctx.serialize(), productId: product.id });
     }
 
     updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map(v => v.id);
-        this.addJobToQueue({ type: 'update-variants', ctx: ctx.serialize(), variantIds });
+        this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds });
     }
 
     deleteProduct(ctx: RequestContext, product: Product) {
-        this.addJobToQueue({ type: 'delete-product', ctx: ctx.serialize(), productId: product.id });
+        this.updateIndexQueue.add({ type: 'delete-product', ctx: ctx.serialize(), productId: product.id });
     }
 
     deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map(v => v.id);
-        this.addJobToQueue({ type: 'delete-variant', ctx: ctx.serialize(), variantIds });
+        this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds });
     }
 
     updateVariantsById(ctx: RequestContext, ids: ID[]) {
-        this.addJobToQueue({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids });
+        this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids });
     }
 
     updateAsset(ctx: RequestContext, asset: Asset) {
-        this.addJobToQueue({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any });
+        this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any });
     }
 
     deleteAsset(ctx: RequestContext, asset: Asset) {
-        this.addJobToQueue({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any });
+        this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any });
     }
 
     assignProductToChannel(ctx: RequestContext, productId: ID, channelId: ID) {
-        this.addJobToQueue({
+        this.updateIndexQueue.add({
             type: 'assign-product-to-channel',
             ctx: ctx.serialize(),
             productId,
@@ -121,7 +107,7 @@ export class SearchIndexService {
     }
 
     removeProductFromChannel(ctx: RequestContext, productId: ID, channelId: ID) {
-        this.addJobToQueue({
+        this.updateIndexQueue.add({
             type: 'remove-product-from-channel',
             ctx: ctx.serialize(),
             productId,
@@ -130,7 +116,7 @@ export class SearchIndexService {
     }
 
     assignVariantToChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
-        this.addJobToQueue({
+        this.updateIndexQueue.add({
             type: 'assign-variant-to-channel',
             ctx: ctx.serialize(),
             productVariantId,
@@ -139,7 +125,7 @@ export class SearchIndexService {
     }
 
     removeVariantFromChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
-        this.addJobToQueue({
+        this.updateIndexQueue.add({
             type: 'remove-variant-from-channel',
             ctx: ctx.serialize(),
             productVariantId,
@@ -147,30 +133,15 @@ export class SearchIndexService {
         });
     }
 
-    private addJobToQueue(data: UpdateIndexQueueJobData) {
-        if (updateIndexQueue) {
-            return updateIndexQueue.add(data);
-        }
-    }
-
-    private sendMessage(job: Job<any>, message: WorkerMessage<any, any>): Promise<any> {
-        return new Promise((resolve, reject) => {
-            this.workerService.send(message).subscribe({
-                complete: () => resolve(),
-                error: err => {
-                    Logger.error(err);
-                    reject(err);
-                },
-            });
-        });
-    }
-
-    private sendMessageWithProgress(job: Job<any>, message: ReindexMessage | UpdateVariantsByIdMessage): Promise<any> {
+    private jobWithProgress(
+        job: Job<UpdateIndexQueueJobData>,
+        ob: Observable<ReindexMessageResponse>,
+    ): Promise<any> {
         return new Promise((resolve, reject) => {
             let total: number | undefined;
             let duration = 0;
             let completed = 0;
-            this.workerService.send(message).subscribe({
+            ob.subscribe({
                 next: (response: ReindexMessageResponse) => {
                     if (!total) {
                         total = response.total;

+ 0 - 41
packages/core/src/plugin/default-search-plugin/types.ts

@@ -2,7 +2,6 @@ import { ID, JsonCompatible } from '@vendure/common/lib/shared-types';
 
 import { SerializedRequestContext } from '../../api/common/request-context';
 import { Asset } from '../../entity/asset/asset.entity';
-import { WorkerMessage } from '../../worker/types';
 
 export type ReindexMessageResponse = {
     total: number;
@@ -46,46 +45,6 @@ export type VariantChannelMessageData = {
     channelId: ID;
 };
 
-export class ReindexMessage extends WorkerMessage<ReindexMessageData, ReindexMessageResponse> {
-    static readonly pattern = 'Reindex';
-}
-export class UpdateVariantMessage extends WorkerMessage<UpdateVariantMessageData, boolean> {
-    static readonly pattern = 'UpdateProduct';
-}
-export class UpdateProductMessage extends WorkerMessage<UpdateProductMessageData, boolean> {
-    static readonly pattern = 'UpdateVariant';
-}
-export class DeleteVariantMessage extends WorkerMessage<UpdateVariantMessageData, boolean> {
-    static readonly pattern = 'DeleteProduct';
-}
-export class DeleteProductMessage extends WorkerMessage<UpdateProductMessageData, boolean> {
-    static readonly pattern = 'DeleteVariant';
-}
-export class UpdateVariantsByIdMessage extends WorkerMessage<
-    UpdateVariantsByIdMessageData,
-    ReindexMessageResponse
-> {
-    static readonly pattern = 'UpdateVariantsById';
-}
-export class AssignProductToChannelMessage extends WorkerMessage<ProductChannelMessageData, boolean> {
-    static readonly pattern = 'AssignProductToChannel';
-}
-export class RemoveProductFromChannelMessage extends WorkerMessage<ProductChannelMessageData, boolean> {
-    static readonly pattern = 'RemoveProductFromChannel';
-}
-export class AssignVariantToChannelMessage extends WorkerMessage<VariantChannelMessageData, boolean> {
-    static readonly pattern = 'AssignVariantToChannel';
-}
-export class RemoveVariantFromChannelMessage extends WorkerMessage<VariantChannelMessageData, boolean> {
-    static readonly pattern = 'RemoveVariantFromChannel';
-}
-export class UpdateAssetMessage extends WorkerMessage<UpdateAssetMessageData, boolean> {
-    static readonly pattern = 'UpdateAsset';
-}
-export class DeleteAssetMessage extends WorkerMessage<UpdateAssetMessageData, boolean> {
-    static readonly pattern = 'DeleteAsset';
-}
-
 type NamedJobData<Type extends string, MessageData> = { type: Type } & MessageData;
 
 export type ReindexJobData = NamedJobData<'reindex', ReindexMessageData>;