Browse Source

feat(core): Create buffering logic for DefaultSearchPlugin

Relates to #1137
Michael Bromley 4 years ago
parent
commit
6a47dcf85e

+ 1 - 1
packages/core/src/api/resolvers/admin/job.resolver.ts

@@ -86,7 +86,7 @@ export class JobResolver {
     @Query()
     @Allow(Permission.ReadSettings, Permission.ReadSystem)
     async jobBufferSize(@Args() args: QueryJobBufferSizeArgs) {
-        const bufferSizes = this.jobBuffer.bufferSize(args.processorIds);
+        const bufferSizes = await this.jobBuffer.bufferSize(args.processorIds);
         return Object.entries(bufferSizes).map(([processorId, size]) => ({ processorId, size }));
     }
 

+ 5 - 0
packages/core/src/job-queue/index.ts

@@ -1,4 +1,9 @@
 export * from './injectable-job-queue-strategy';
+export * from './job-buffer/in-memory-job-buffer-storage-strategy';
+export * from './job-buffer/job-buffer';
+export * from './job-buffer/job-buffer-processor';
+export * from './job-buffer/job-buffer-storage-strategy';
+export * from './job-buffer/sql-job-buffer-storage-strategy';
 export * from './job';
 export * from './job-queue';
 export * from './job-queue.service';

+ 14 - 4
packages/core/src/job-queue/job-buffer/job-buffer.ts

@@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common';
 
 import { InternalServerError } from '../../common/error/errors';
 import { ConfigService } from '../../config/config.service';
+import { Logger } from '../../config/logger/vendure-logger';
 import { Job } from '../job';
 
 import { JobBufferProcessor } from './job-buffer-processor';
@@ -16,7 +17,7 @@ export class JobBuffer {
         this.storageStrategy = configService.jobQueueOptions.jobBufferStorageStrategy;
     }
 
-    addProcessor(processor: JobBufferProcessor) {
+    addProcessor(processor: JobBufferProcessor<any>) {
         const idAlreadyExists = Array.from(this.processors).find(p => p.id === processor.id);
         if (idAlreadyExists) {
             throw new InternalServerError(
@@ -26,7 +27,7 @@ export class JobBuffer {
         this.processors.add(processor);
     }
 
-    removeProcessor(processor: JobBufferProcessor) {
+    removeProcessor(processor: JobBufferProcessor<any>) {
         this.processors.delete(processor);
     }
 
@@ -58,8 +59,17 @@ export class JobBuffer {
         for (const processor of this.processors) {
             const jobsForProcessor = flushResult[processor.id];
             if (jobsForProcessor?.length) {
-                const reducedJobs = await processor.reduce(jobsForProcessor);
-                for (const job of reducedJobs) {
+                let jobsToAdd = jobsForProcessor;
+                try {
+                    jobsToAdd = await processor.reduce(jobsForProcessor);
+                } catch (e) {
+                    Logger.error(
+                        `Error encountered processing jobs in "${processor.id}:\n${e.message}"`,
+                        undefined,
+                        e.stack,
+                    );
+                }
+                for (const job of jobsToAdd) {
                     await jobQueueStrategy.add(job);
                 }
             }

+ 4 - 4
packages/core/src/job-queue/job-buffer/sql-job-buffer-storage-strategy.ts

@@ -15,11 +15,11 @@ export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
         return Promise.resolve(job);
     }
 
-    bufferSize(processorIds?: string[]): Promise<number> {
-        return Promise.resolve(0);
+    bufferSize(processorIds?: string[]) {
+        return Promise.resolve({});
     }
 
-    flush(processorIds?: string[]): Promise<void> {
-        return Promise.resolve(undefined);
+    flush(processorIds?: string[]) {
+        return Promise.resolve({});
     }
 }

+ 34 - 0
packages/core/src/plugin/default-search-plugin/collection-job-buffer-processor.ts

@@ -0,0 +1,34 @@
+import { ID } from '@vendure/common/lib/shared-types';
+import { unique } from '@vendure/common/lib/unique';
+
+import { Job, JobBufferProcessor } from '../../job-queue';
+import { ApplyCollectionFiltersJobData } from '../../service/services/collection.service';
+
+import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types';
+
+export class CollectionJobBufferProcessor implements JobBufferProcessor<ApplyCollectionFiltersJobData> {
+    readonly id = 'search-plugin-apply-collection-filters';
+
+    collect(job: Job): boolean {
+        return job.queueName === 'apply-collection-filters';
+    }
+
+    reduce(collectedJobs: Array<Job<ApplyCollectionFiltersJobData>>): Array<Job<any>> {
+        const collectionIdsToUpdate = collectedJobs.reduce((result, job) => {
+            return [...result, ...job.data.collectionIds];
+        }, [] as ID[]);
+
+        const referenceJob = collectedJobs[0];
+        const batchedCollectionJob = new Job<ApplyCollectionFiltersJobData>({
+            ...referenceJob,
+            id: undefined,
+            data: {
+                collectionIds: unique(collectionIdsToUpdate),
+                ctx: referenceJob.data.ctx,
+                applyToChangedVariantsOnly: referenceJob.data.applyToChangedVariantsOnly,
+            },
+        });
+
+        return [batchedCollectionJob];
+    }
+}

+ 11 - 1
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -12,14 +12,17 @@ import { ProductEvent } from '../../event-bus/events/product-event';
 import { ProductVariantChannelEvent } from '../../event-bus/events/product-variant-channel-event';
 import { ProductVariantEvent } from '../../event-bus/events/product-variant-event';
 import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
+import { JobBuffer } from '../../job-queue/job-buffer/job-buffer';
 import { PluginCommonModule } from '../plugin-common.module';
 import { VendurePlugin } from '../vendure-plugin';
 
+import { CollectionJobBufferProcessor } from './collection-job-buffer-processor';
 import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
 import { FulltextSearchService } from './fulltext-search.service';
 import { IndexerController } from './indexer/indexer.controller';
 import { SearchIndexService } from './indexer/search-index.service';
 import { SearchIndexItem } from './search-index-item.entity';
+import { SearchJobBufferProcessor } from './search-job-buffer-processor';
 
 export interface DefaultSearchReindexResponse extends SearchReindexResponse {
     timeTaken: number;
@@ -64,10 +67,17 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
 })
 export class DefaultSearchPlugin implements OnApplicationBootstrap {
     /** @internal */
-    constructor(private eventBus: EventBus, private searchIndexService: SearchIndexService) {}
+    constructor(
+        private eventBus: EventBus,
+        private searchIndexService: SearchIndexService,
+        private jobBuffer: JobBuffer,
+    ) {}
 
     /** @internal */
     async onApplicationBootstrap() {
+        this.jobBuffer.addProcessor(new SearchJobBufferProcessor());
+        this.jobBuffer.addProcessor(new CollectionJobBufferProcessor());
+
         this.eventBus.ofType(ProductEvent).subscribe(event => {
             if (event.type === 'deleted') {
                 return this.searchIndexService.deleteProduct(event.ctx, event.product);

+ 61 - 0
packages/core/src/plugin/default-search-plugin/search-job-buffer-processor.ts

@@ -0,0 +1,61 @@
+import { ID } from '@vendure/common/lib/shared-types';
+import { unique } from '@vendure/common/lib/unique';
+
+import { Job, JobBufferProcessor } from '../../job-queue';
+
+import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types';
+
+export class SearchJobBufferProcessor implements JobBufferProcessor<UpdateIndexQueueJobData> {
+    readonly id = 'search-plugin-update-search-index';
+
+    collect(job: Job<UpdateIndexQueueJobData>): boolean | Promise<boolean> {
+        return job.queueName === 'update-search-index';
+    }
+
+    reduce(collectedJobs: Array<Job<UpdateIndexQueueJobData>>): Array<Job<any>> {
+        const variantsByIdJobs = this.removeBy<Job<UpdateVariantsByIdJobData | UpdateVariantsJobData>>(
+            collectedJobs,
+            item => item.data.type === 'update-variants-by-id' || item.data.type === 'update-variants',
+        );
+
+        const jobsToAdd = [...collectedJobs];
+
+        if (variantsByIdJobs.length) {
+            const variantIdsToUpdate = variantsByIdJobs.reduce((result, job) => {
+                const ids = job.data.type === 'update-variants-by-id' ? job.data.ids : job.data.variantIds;
+                return [...result, ...ids];
+            }, [] as ID[]);
+
+            const referenceJob = variantsByIdJobs[0];
+            const batchedVariantJob = new Job<UpdateVariantsByIdJobData>({
+                ...referenceJob,
+                id: undefined,
+                data: {
+                    type: 'update-variants-by-id',
+                    ids: unique(variantIdsToUpdate),
+                    ctx: referenceJob.data.ctx,
+                },
+            });
+
+            jobsToAdd.push(batchedVariantJob as any);
+        }
+
+        return jobsToAdd;
+    }
+
+    /**
+     * Removes items from the array based on the filterFn and returns a new array with only the removed
+     * items. The original input array is mutated.
+     */
+    private removeBy<R extends T, T = any>(input: T[], filterFn: (item: T) => boolean): R[] {
+        const removed: R[] = [];
+        for (let i = input.length - 1; i >= 0; i--) {
+            const item = input[i];
+            if (filterFn(item)) {
+                removed.push(item as R);
+                input.splice(i, 1);
+            }
+        }
+        return removed;
+    }
+}

+ 23 - 11
packages/core/src/plugin/default-search-plugin/types.ts

@@ -48,17 +48,29 @@ export type VariantChannelMessageData = {
 type NamedJobData<Type extends string, MessageData> = { type: Type } & MessageData;
 
 export type ReindexJobData = NamedJobData<'reindex', ReindexMessageData>;
-type UpdateProductJobData = NamedJobData<'update-product', UpdateProductMessageData>;
-type UpdateVariantsJobData = NamedJobData<'update-variants', UpdateVariantMessageData>;
-type DeleteProductJobData = NamedJobData<'delete-product', UpdateProductMessageData>;
-type DeleteVariantJobData = NamedJobData<'delete-variant', UpdateVariantMessageData>;
-type UpdateVariantsByIdJobData = NamedJobData<'update-variants-by-id', UpdateVariantsByIdMessageData>;
-type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>;
-type DeleteAssetJobData = NamedJobData<'delete-asset', UpdateAssetMessageData>;
-type AssignProductToChannelJobData = NamedJobData<'assign-product-to-channel', ProductChannelMessageData>;
-type RemoveProductFromChannelJobData = NamedJobData<'remove-product-from-channel', ProductChannelMessageData>;
-type AssignVariantToChannelJobData = NamedJobData<'assign-variant-to-channel', VariantChannelMessageData>;
-type RemoveVariantFromChannelJobData = NamedJobData<'remove-variant-from-channel', VariantChannelMessageData>;
+export type UpdateProductJobData = NamedJobData<'update-product', UpdateProductMessageData>;
+export type UpdateVariantsJobData = NamedJobData<'update-variants', UpdateVariantMessageData>;
+export type DeleteProductJobData = NamedJobData<'delete-product', UpdateProductMessageData>;
+export type DeleteVariantJobData = NamedJobData<'delete-variant', UpdateVariantMessageData>;
+export type UpdateVariantsByIdJobData = NamedJobData<'update-variants-by-id', UpdateVariantsByIdMessageData>;
+export type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>;
+export type DeleteAssetJobData = NamedJobData<'delete-asset', UpdateAssetMessageData>;
+export type AssignProductToChannelJobData = NamedJobData<
+    'assign-product-to-channel',
+    ProductChannelMessageData
+>;
+export type RemoveProductFromChannelJobData = NamedJobData<
+    'remove-product-from-channel',
+    ProductChannelMessageData
+>;
+export type AssignVariantToChannelJobData = NamedJobData<
+    'assign-variant-to-channel',
+    VariantChannelMessageData
+>;
+export type RemoveVariantFromChannelJobData = NamedJobData<
+    'remove-variant-from-channel',
+    VariantChannelMessageData
+>;
 export type UpdateIndexQueueJobData =
     | ReindexJobData
     | UpdateProductJobData

+ 1 - 1
packages/core/src/service/services/collection.service.ts

@@ -44,7 +44,7 @@ import { AssetService } from './asset.service';
 import { ChannelService } from './channel.service';
 import { FacetValueService } from './facet-value.service';
 
-type ApplyCollectionFiltersJobData = {
+export type ApplyCollectionFiltersJobData = {
     ctx: SerializedRequestContext;
     collectionIds: ID[];
     applyToChangedVariantsOnly?: boolean;