Jelajahi Sumber

feat(elasticsearch-plugin): Support search index job batching

Relates to #1137
Michael Bromley 4 tahun lalu
induk
melakukan
f3fb298d82

+ 7 - 5
packages/dev-server/dev-config.ts

@@ -9,6 +9,7 @@ import {
     LogLevel,
     VendureConfig,
 } from '@vendure/core';
+import { ElasticsearchPlugin } from '@vendure/elasticsearch-plugin';
 import { defaultEmailHandlers, EmailPlugin } from '@vendure/email-plugin';
 import { BullMQJobQueuePlugin } from '@vendure/job-queue-plugin/package/bullmq';
 import path from 'path';
@@ -64,14 +65,15 @@ export const devConfig: VendureConfig = {
             route: 'assets',
             assetUploadDir: path.join(__dirname, 'assets'),
         }),
-        DefaultSearchPlugin.init({ bufferUpdates: true }),
+        // DefaultSearchPlugin.init({ bufferUpdates: true }),
         BullMQJobQueuePlugin.init({}),
         // DefaultJobQueuePlugin,
         // JobQueueTestPlugin.init({ queueCount: 10 }),
-        // ElasticsearchPlugin.init({
-        //     host: 'http://localhost',
-        //     port: 9200,
-        // }),
+        ElasticsearchPlugin.init({
+            host: 'http://localhost',
+            port: 9200,
+            bufferUpdates: true,
+        }),
         EmailPlugin.init({
             devMode: true,
             route: 'mailbox',

+ 30 - 6
packages/elasticsearch-plugin/src/elasticsearch-resolver.ts

@@ -6,14 +6,21 @@ import {
     SearchResponse,
 } from '@vendure/common/lib/generated-types';
 import { Omit } from '@vendure/common/lib/omit';
-import { Allow, Collection, Ctx, FacetValue, RequestContext, SearchResolver } from '@vendure/core';
+import {
+    Allow,
+    Collection,
+    Ctx,
+    FacetValue,
+    RequestContext,
+    SearchJobBufferService,
+    SearchResolver,
+} from '@vendure/core';
 
 import { ElasticsearchService } from './elasticsearch.service';
 import { ElasticSearchInput, SearchPriceData } from './types';
 
 @Resolver('SearchResponse')
-export class ShopElasticSearchResolver
-    implements Omit<SearchResolver, 'facetValues' | 'collections' | 'reindex'> {
+export class ShopElasticSearchResolver implements Pick<SearchResolver, 'search'> {
     constructor(private elasticsearchService: ElasticsearchService) {}
 
     @Query()
@@ -38,8 +45,11 @@ export class ShopElasticSearchResolver
 }
 
 @Resolver('SearchResponse')
-export class AdminElasticSearchResolver implements Omit<SearchResolver, 'facetValues' | 'collections'> {
-    constructor(private elasticsearchService: ElasticsearchService) {}
+export class AdminElasticSearchResolver implements Pick<SearchResolver, 'search' | 'reindex'> {
+    constructor(
+        private elasticsearchService: ElasticsearchService,
+        private searchJobBufferService: SearchJobBufferService,
+    ) {}
 
     @Query()
     @Allow(Permission.ReadCatalog, Permission.ReadProduct)
@@ -56,7 +66,21 @@ export class AdminElasticSearchResolver implements Omit<SearchResolver, 'facetVa
     @Mutation()
     @Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
     async reindex(@Ctx() ctx: RequestContext): Promise<GraphQLJob> {
-        return (this.elasticsearchService.reindex(ctx) as unknown) as GraphQLJob;
+        return this.elasticsearchService.reindex(ctx) as unknown as GraphQLJob;
+    }
+
+    @Query()
+    @Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
+    async pendingSearchIndexUpdates(...args: any[]): Promise<any> {
+        return this.searchJobBufferService.getPendingSearchUpdates();
+    }
+
+    @Mutation()
+    @Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
+    async runPendingSearchIndexUpdates(...args: any[]): Promise<any> {
+        // Intentionally not awaiting this method call
+        this.searchJobBufferService.runPendingSearchUpdates();
+        return { success: true };
     }
 }
 

+ 3 - 0
packages/elasticsearch-plugin/src/options.ts

@@ -205,6 +205,8 @@ export interface ElasticsearchOptions {
     customProductVariantMappings?: {
         [fieldName: string]: CustomMapping<[ProductVariant, LanguageCode]>;
     };
+    // TODO: docs
+    bufferUpdates?: boolean;
 }
 
 /**
@@ -418,6 +420,7 @@ export const defaultOptions: ElasticsearchRuntimeOptions = {
     },
     customProductMappings: {},
     customProductVariantMappings: {},
+    bufferUpdates: false,
 };
 
 export function mergeWithDefaults(userOptions: ElasticsearchOptions): ElasticsearchRuntimeOptions {

+ 9 - 0
packages/elasticsearch-plugin/src/plugin.ts

@@ -2,6 +2,7 @@ import { NodeOptions } from '@elastic/elasticsearch';
 import { OnApplicationBootstrap } from '@nestjs/common';
 import {
     AssetEvent,
+    BUFFER_SEARCH_INDEX_UPDATES,
     CollectionModificationEvent,
     EventBus,
     HealthCheckRegistryService,
@@ -13,6 +14,7 @@ import {
     ProductEvent,
     ProductVariantChannelEvent,
     ProductVariantEvent,
+    SearchJobBufferService,
     TaxRateModificationEvent,
     Type,
     VendurePlugin,
@@ -201,7 +203,12 @@ import { ElasticsearchOptions, ElasticsearchRuntimeOptions, mergeWithDefaults }
         ElasticsearchService,
         ElasticsearchHealthIndicator,
         ElasticsearchIndexerController,
+        SearchJobBufferService,
         { provide: ELASTIC_SEARCH_OPTIONS, useFactory: () => ElasticsearchPlugin.options },
+        {
+            provide: BUFFER_SEARCH_INDEX_UPDATES,
+            useFactory: () => ElasticsearchPlugin.options.bufferUpdates === true,
+        },
     ],
     adminApiExtensions: { resolvers: [AdminElasticSearchResolver, EntityElasticSearchResolver] },
     shopApiExtensions: {
@@ -314,6 +321,7 @@ export class ElasticsearchPlugin implements OnApplicationBootstrap {
             }
         });
 
+        // TODO: Remove this buffering logic because because we have dedicated buffering based on #1137
         const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent);
         const closingNotifier$ = collectionModification$.pipe(debounceTime(50));
         collectionModification$
@@ -335,6 +343,7 @@ export class ElasticsearchPlugin implements OnApplicationBootstrap {
             // The delay prevents a "TransactionNotStartedError" (in SQLite/sqljs) by allowing any existing
             // transactions to complete before a new job is added to the queue (assuming the SQL-based
             // JobQueueStrategy).
+            // TODO: should be able to remove owing to f0fd6625
             .pipe(delay(1))
             .subscribe(event => {
                 const defaultTaxZone = event.ctx.channel.defaultTaxZone;