Browse Source

refactor(core): Make search job buffer usable in external search plugins

Relates to #1137
Michael Bromley 4 years ago
parent
commit
becf132179

+ 1 - 0
packages/core/src/plugin/default-search-plugin/constants.ts

@@ -1 +1,2 @@
 export const PLUGIN_INIT_OPTIONS = Symbol('PLUGIN_INIT_OPTIONS');
+export const BUFFER_SEARCH_INDEX_UPDATES = Symbol('BUFFER_SEARCH_INDEX_UPDATES');

+ 7 - 1
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -17,7 +17,7 @@ import { JobQueueService } from '../../job-queue/job-queue.service';
 import { PluginCommonModule } from '../plugin-common.module';
 import { VendurePlugin } from '../vendure-plugin';
 
-import { PLUGIN_INIT_OPTIONS } from './constants';
+import { BUFFER_SEARCH_INDEX_UPDATES, PLUGIN_INIT_OPTIONS } from './constants';
 import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
 import { FulltextSearchService } from './fulltext-search.service';
 import { IndexerController } from './indexer/indexer.controller';
@@ -70,6 +70,10 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
         IndexerController,
         SearchJobBufferService,
         { provide: PLUGIN_INIT_OPTIONS, useFactory: () => DefaultSearchPlugin.options },
+        {
+            provide: BUFFER_SEARCH_INDEX_UPDATES,
+            useFactory: () => DefaultSearchPlugin.options.bufferUpdates === true,
+        },
     ],
     adminApiExtensions: { resolvers: [AdminFulltextSearchResolver] },
     shopApiExtensions: { resolvers: [ShopFulltextSearchResolver] },
@@ -145,6 +149,7 @@ export class DefaultSearchPlugin implements OnApplicationBootstrap {
             }
         });
 
+        // TODO: Remove this buffering logic because because we have dedicated buffering based on #1137
         const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent);
         const closingNotifier$ = collectionModification$.pipe(debounceTime(50));
         collectionModification$
@@ -166,6 +171,7 @@ export class DefaultSearchPlugin implements OnApplicationBootstrap {
             // The delay prevents a "TransactionNotStartedError" (in SQLite/sqljs) by allowing any existing
             // transactions to complete before a new job is added to the queue (assuming the SQL-based
             // JobQueueStrategy).
+            // TODO: should be able to remove owing to f0fd6625
             .pipe(delay(1))
             .subscribe(event => {
                 const defaultTaxZone = event.ctx.channel.defaultTaxZone;

+ 5 - 0
packages/core/src/plugin/default-search-plugin/index.ts

@@ -0,0 +1,5 @@
+export * from './constants';
+export * from './default-search-plugin';
+export * from './search-job-buffer/collection-job-buffer';
+export * from './search-job-buffer/search-index-job-buffer';
+export * from './search-job-buffer/search-job-buffer.service';

+ 31 - 7
packages/core/src/plugin/default-search-plugin/search-job-buffer/search-index-job-buffer.ts

@@ -2,30 +2,42 @@ import { ID } from '@vendure/common/lib/shared-types';
 import { unique } from '@vendure/common/lib/unique';
 
 import { Job, JobBuffer } from '../../../job-queue/index';
-import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from '../types';
+import {
+    UpdateIndexQueueJobData,
+    UpdateProductJobData,
+    UpdateVariantsByIdJobData,
+    UpdateVariantsJobData,
+} from '../types';
 
 export class SearchIndexJobBuffer implements JobBuffer<UpdateIndexQueueJobData> {
     readonly id = 'search-plugin-update-search-index';
 
     collect(job: Job<UpdateIndexQueueJobData>): boolean | Promise<boolean> {
-        return job.queueName === 'update-search-index';
+        return (
+            job.queueName === 'update-search-index' &&
+            ['update-product', 'update-variants', 'update-variants-by-id'].includes(job.data.type)
+        );
     }
 
     reduce(collectedJobs: Array<Job<UpdateIndexQueueJobData>>): Array<Job<any>> {
-        const variantsByIdJobs = this.removeBy<Job<UpdateVariantsByIdJobData | UpdateVariantsJobData>>(
+        const variantsJobs = this.removeBy<Job<UpdateVariantsByIdJobData | UpdateVariantsJobData>>(
             collectedJobs,
             item => item.data.type === 'update-variants-by-id' || item.data.type === 'update-variants',
         );
+        const productsJobs = this.removeBy<Job<UpdateProductJobData>>(
+            collectedJobs,
+            item => item.data.type === 'update-product',
+        );
 
         const jobsToAdd = [...collectedJobs];
 
-        if (variantsByIdJobs.length) {
-            const variantIdsToUpdate = variantsByIdJobs.reduce((result, job) => {
+        if (variantsJobs.length) {
+            const variantIdsToUpdate = variantsJobs.reduce((result, job) => {
                 const ids = job.data.type === 'update-variants-by-id' ? job.data.ids : job.data.variantIds;
                 return [...result, ...ids];
             }, [] as ID[]);
 
-            const referenceJob = variantsByIdJobs[0];
+            const referenceJob = variantsJobs[0];
             const batchedVariantJob = new Job<UpdateVariantsByIdJobData>({
                 ...referenceJob,
                 id: undefined,
@@ -36,7 +48,19 @@ export class SearchIndexJobBuffer implements JobBuffer<UpdateIndexQueueJobData>
                 },
             });
 
-            jobsToAdd.push(batchedVariantJob as any);
+            jobsToAdd.push(batchedVariantJob as Job);
+        }
+        if (productsJobs.length) {
+            const seenIds = new Set<ID>();
+            const uniqueProductJobs: Array<Job<UpdateProductJobData>> = [];
+            for (const job of productsJobs) {
+                if (seenIds.has(job.data.productId)) {
+                    continue;
+                }
+                uniqueProductJobs.push(job);
+                seenIds.add(job.data.productId);
+            }
+            jobsToAdd.push(...(uniqueProductJobs as Job[]));
         }
 
         return jobsToAdd;

+ 5 - 6
packages/core/src/plugin/default-search-plugin/search-job-buffer/search-job-buffer.service.ts

@@ -5,8 +5,7 @@ import { ConfigService } from '../../../config/config.service';
 import { isInspectableJobQueueStrategy } from '../../../config/job-queue/inspectable-job-queue-strategy';
 import { JobQueueService } from '../../../job-queue/job-queue.service';
 import { SubscribableJob } from '../../../job-queue/subscribable-job';
-import { PLUGIN_INIT_OPTIONS } from '../constants';
-import { DefaultSearchPluginInitOptions } from '../types';
+import { BUFFER_SEARCH_INDEX_UPDATES } from '../constants';
 
 import { CollectionJobBuffer } from './collection-job-buffer';
 import { SearchIndexJobBuffer } from './search-index-job-buffer';
@@ -19,18 +18,18 @@ export class SearchJobBufferService implements OnApplicationBootstrap {
     constructor(
         private jobQueueService: JobQueueService,
         private configService: ConfigService,
-        @Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions,
+        @Inject(BUFFER_SEARCH_INDEX_UPDATES) private bufferUpdates: boolean,
     ) {}
 
     onApplicationBootstrap(): any {
-        if (this.options.bufferUpdates === true) {
+        if (this.bufferUpdates === true) {
             this.jobQueueService.addBuffer(this.searchIndexJobBuffer);
             this.jobQueueService.addBuffer(this.collectionJobBuffer);
         }
     }
 
     async getPendingSearchUpdates(): Promise<number> {
-        if (!this.options.bufferUpdates) {
+        if (!this.bufferUpdates) {
             return 0;
         }
         const bufferSizes = await this.jobQueueService.bufferSize(
@@ -43,7 +42,7 @@ export class SearchJobBufferService implements OnApplicationBootstrap {
     }
 
     async runPendingSearchUpdates(): Promise<void> {
-        if (!this.options.bufferUpdates) {
+        if (!this.bufferUpdates) {
             return;
         }
         const { jobQueueStrategy } = this.configService.jobQueueOptions;

+ 1 - 0
packages/core/src/plugin/default-search-plugin/types.ts

@@ -4,6 +4,7 @@ import { SerializedRequestContext } from '../../api/common/request-context';
 import { Asset } from '../../entity/asset/asset.entity';
 
 export interface DefaultSearchPluginInitOptions {
+    // TODO: docs
     bufferUpdates?: boolean;
 }
 

+ 1 - 1
packages/core/src/plugin/index.ts

@@ -1,4 +1,4 @@
-export * from './default-search-plugin/default-search-plugin';
+export * from './default-search-plugin/index';
 export * from './default-job-queue-plugin/default-job-queue-plugin';
 export * from './default-job-queue-plugin/job-record-buffer.entity';
 export * from './default-job-queue-plugin/sql-job-buffer-storage-strategy';