Просмотр исходного кода

feat(core): Expose pending search index updates operations in Admin API

Relates to #1137
Michael Bromley 4 лет назад
Родитель
Сommit
53a1943560

+ 2 - 0
packages/admin-ui/src/lib/core/src/common/generated-types.ts

@@ -2308,6 +2308,7 @@ export type Mutation = {
   removeSettledJobs: Scalars['Int'];
   requestCompleted: Scalars['Int'];
   requestStarted: Scalars['Int'];
+  runPendingSearchIndexUpdates: Success;
   setActiveChannel: UserStatus;
   setAsLoggedIn: UserStatus;
   setAsLoggedOut: UserStatus;
@@ -3933,6 +3934,7 @@ export type Query = {
   paymentMethodEligibilityCheckers: Array<ConfigurableOperationDefinition>;
   paymentMethodHandlers: Array<ConfigurableOperationDefinition>;
   paymentMethods: PaymentMethodList;
+  pendingSearchIndexUpdates: Scalars['Int'];
   /** Get a Product either by id or slug. If neither id nor slug is speicified, an error will result. */
   product?: Maybe<Product>;
   productOptionGroup?: Maybe<ProductOptionGroup>;

+ 2 - 0
packages/asset-server-plugin/e2e/graphql/generated-e2e-asset-server-plugin-types.ts

@@ -2235,6 +2235,7 @@ export type Mutation = {
     /** Create a new ProductOption within a ProductOptionGroup */
     updateProductOption: ProductOption;
     reindex: Job;
+    runPendingSearchIndexUpdates: Success;
     /** Create a new Product */
     createProduct: Product;
     /** Update an existing Product */
@@ -3688,6 +3689,7 @@ export type Query = {
     productOptionGroups: Array<ProductOptionGroup>;
     productOptionGroup?: Maybe<ProductOptionGroup>;
     search: SearchResponse;
+    pendingSearchIndexUpdates: Scalars['Int'];
     /** List Products */
     products: ProductList;
     /** Get a Product either by id or slug. If neither id nor slug is speicified, an error will result. */

+ 2 - 0
packages/common/src/generated-types.ts

@@ -2281,6 +2281,7 @@ export type Mutation = {
   /** Create a new ProductOption within a ProductOptionGroup */
   updateProductOption: ProductOption;
   reindex: Job;
+  runPendingSearchIndexUpdates: Success;
   /** Create a new Product */
   createProduct: Product;
   /** Update an existing Product */
@@ -3881,6 +3882,7 @@ export type Query = {
   productOptionGroups: Array<ProductOptionGroup>;
   productOptionGroup?: Maybe<ProductOptionGroup>;
   search: SearchResponse;
+  pendingSearchIndexUpdates: Scalars['Int'];
   /** List Products */
   products: ProductList;
   /** Get a Product either by id or slug. If neither id nor slug is speicified, an error will result. */

+ 2 - 0
packages/core/e2e/graphql/generated-e2e-admin-types.ts

@@ -2235,6 +2235,7 @@ export type Mutation = {
     /** Create a new ProductOption within a ProductOptionGroup */
     updateProductOption: ProductOption;
     reindex: Job;
+    runPendingSearchIndexUpdates: Success;
     /** Create a new Product */
     createProduct: Product;
     /** Update an existing Product */
@@ -3688,6 +3689,7 @@ export type Query = {
     productOptionGroups: Array<ProductOptionGroup>;
     productOptionGroup?: Maybe<ProductOptionGroup>;
     search: SearchResponse;
+    pendingSearchIndexUpdates: Scalars['Int'];
     /** List Products */
     products: ProductList;
     /** Get a Product either by id or slug. If neither id nor slug is speicified, an error will result. */

+ 1 - 1
packages/core/src/api/resolvers/admin/job.resolver.ts

@@ -87,7 +87,7 @@ export class JobResolver {
     @Allow(Permission.ReadSettings, Permission.ReadSystem)
     async jobBufferSize(@Args() args: QueryJobBufferSizeArgs) {
         const bufferSizes = await this.jobBufferService.bufferSize(args.bufferIds);
-        return Object.entries(bufferSizes).map(([processorId, size]) => ({ processorId, size }));
+        return Object.entries(bufferSizes).map(([bufferId, size]) => ({ bufferId, size }));
     }
 
     @Mutation()

+ 12 - 0
packages/core/src/api/resolvers/admin/search.resolver.ts

@@ -30,4 +30,16 @@ export class SearchResolver {
     async reindex(...args: any[]): Promise<any> {
         throw new InternalServerError(`error.no-search-plugin-configured`);
     }
+
+    @Query()
+    @Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
+    async pendingSearchIndexUpdates(...args: any[]): Promise<any> {
+        throw new InternalServerError(`error.no-search-plugin-configured`);
+    }
+
+    @Mutation()
+    @Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
+    async runPendingSearchIndexUpdates(...args: any[]): Promise<any> {
+        throw new InternalServerError(`error.no-search-plugin-configured`);
+    }
 }

+ 3 - 0
packages/core/src/api/schema/admin-api/product-search.api.graphql

@@ -1,7 +1,10 @@
 type Query {
     search(input: SearchInput!): SearchResponse!
+    pendingSearchIndexUpdates: Int!
 }
 
 type Mutation {
     reindex: Job!
+    runPendingSearchIndexUpdates: Success!
 }
+

+ 4 - 2
packages/core/src/job-queue/job-buffer/job-buffer.service.ts

@@ -48,12 +48,13 @@ export class JobBufferService {
         return this.storageStrategy.bufferSize(buffer.map(p => (typeof p === 'string' ? p : p.id)));
     }
 
-    async flush(forBuffers?: Array<JobBuffer | string>): Promise<void> {
+    async flush(forBuffers?: Array<JobBuffer | string>): Promise<Job[]> {
         const { jobQueueStrategy } = this.configService.jobQueueOptions;
         const buffers = forBuffers ?? Array.from(this.buffers);
         const flushResult = await this.storageStrategy.flush(
             buffers.map(p => (typeof p === 'string' ? p : p.id)),
         );
+        const result: Job[] = [];
         for (const buffer of this.buffers) {
             const jobsForBuffer = flushResult[buffer.id];
             if (jobsForBuffer?.length) {
@@ -68,9 +69,10 @@ export class JobBufferService {
                     );
                 }
                 for (const job of jobsToAdd) {
-                    await jobQueueStrategy.add(job);
+                    result.push(await jobQueueStrategy.add(job));
                 }
             }
         }
+        return result;
     }
 }

+ 3 - 2
packages/core/src/job-queue/job-queue.service.ts

@@ -4,6 +4,7 @@ import { JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types
 import { ConfigService, JobQueueStrategy, Logger } from '../config';
 
 import { loggerCtx } from './constants';
+import { Job } from './job';
 import { JobBuffer } from './job-buffer/job-buffer';
 import { JobBufferService } from './job-buffer/job-buffer.service';
 import { JobQueue } from './job-queue';
@@ -94,11 +95,11 @@ export class JobQueueService implements OnModuleDestroy {
         this.jobBufferService.removeBuffer(buffer);
     }
 
-    bufferSize(forBuffers?: Array<JobBuffer | string>): Promise<{ [bufferId: string]: number }> {
+    bufferSize(...forBuffers: Array<JobBuffer<any> | string>): Promise<{ [bufferId: string]: number }> {
         return this.jobBufferService.bufferSize(forBuffers);
     }
 
-    flush(forBuffers?: Array<JobBuffer | string>): Promise<void> {
+    flush(...forBuffers: Array<JobBuffer<any> | string>): Promise<Job[]> {
         return this.jobBufferService.flush(forBuffers);
     }
 

+ 1 - 0
packages/core/src/plugin/default-search-plugin/constants.ts

@@ -0,0 +1 @@
+export const PLUGIN_INIT_OPTIONS = Symbol('PLUGIN_INIT_OPTIONS');

+ 21 - 8
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -1,6 +1,6 @@
-import { OnApplicationBootstrap } from '@nestjs/common';
+import { Inject, OnApplicationBootstrap } from '@nestjs/common';
 import { SearchReindexResponse } from '@vendure/common/lib/generated-types';
-import { ID } from '@vendure/common/lib/shared-types';
+import { ID, Type } from '@vendure/common/lib/shared-types';
 import { buffer, debounceTime, delay, filter, map } from 'rxjs/operators';
 
 import { idsAreEqual } from '../../common/utils';
@@ -17,13 +17,16 @@ import { JobQueueService } from '../../job-queue/job-queue.service';
 import { PluginCommonModule } from '../plugin-common.module';
 import { VendurePlugin } from '../vendure-plugin';
 
-import { CollectionJobBuffer } from './collection-job-buffer';
+import { PLUGIN_INIT_OPTIONS } from './constants';
 import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
 import { FulltextSearchService } from './fulltext-search.service';
 import { IndexerController } from './indexer/indexer.controller';
 import { SearchIndexService } from './indexer/search-index.service';
 import { SearchIndexItem } from './search-index-item.entity';
-import { SearchJobBuffer } from './search-job-buffer';
+import { CollectionJobBuffer } from './search-job-buffer/collection-job-buffer';
+import { SearchIndexJobBuffer } from './search-job-buffer/search-index-job-buffer';
+import { SearchJobBufferService } from './search-job-buffer/search-job-buffer.service';
+import { DefaultSearchPluginInitOptions } from './types';
 
 export interface DefaultSearchReindexResponse extends SearchReindexResponse {
     timeTaken: number;
@@ -61,12 +64,20 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
  */
 @VendurePlugin({
     imports: [PluginCommonModule],
-    providers: [FulltextSearchService, SearchIndexService, IndexerController],
+    providers: [
+        FulltextSearchService,
+        SearchIndexService,
+        IndexerController,
+        SearchJobBufferService,
+        { provide: PLUGIN_INIT_OPTIONS, useFactory: () => DefaultSearchPlugin.options },
+    ],
     adminApiExtensions: { resolvers: [AdminFulltextSearchResolver] },
     shopApiExtensions: { resolvers: [ShopFulltextSearchResolver] },
     entities: [SearchIndexItem],
 })
 export class DefaultSearchPlugin implements OnApplicationBootstrap {
+    static options: DefaultSearchPluginInitOptions = {};
+
     /** @internal */
     constructor(
         private eventBus: EventBus,
@@ -74,11 +85,13 @@ export class DefaultSearchPlugin implements OnApplicationBootstrap {
         private jobQueueService: JobQueueService,
     ) {}
 
+    static init(options: DefaultSearchPluginInitOptions): Type<DefaultSearchPlugin> {
+        this.options = options;
+        return DefaultSearchPlugin;
+    }
+
     /** @internal */
     async onApplicationBootstrap() {
-        this.jobQueueService.addBuffer(new SearchJobBuffer());
-        this.jobQueueService.addBuffer(new CollectionJobBuffer());
-
         this.eventBus.ofType(ProductEvent).subscribe(event => {
             if (event.type === 'deleted') {
                 return this.searchIndexService.deleteProduct(event.ctx, event.product);

+ 23 - 3
packages/core/src/plugin/default-search-plugin/fulltext-search.resolver.ts

@@ -11,12 +11,16 @@ import { RequestContext } from '../../api/common/request-context';
 import { Allow } from '../../api/decorators/allow.decorator';
 import { Ctx } from '../../api/decorators/request-context.decorator';
 import { SearchResolver as BaseSearchResolver } from '../../api/resolvers/admin/search.resolver';
+import { InternalServerError } from '../../common/error/errors';
 import { Collection, FacetValue } from '../../entity';
 
 import { FulltextSearchService } from './fulltext-search.service';
+import { SearchJobBufferService } from './search-job-buffer/search-job-buffer.service';
 
 @Resolver('SearchResponse')
-export class ShopFulltextSearchResolver implements Omit<BaseSearchResolver, 'reindex'> {
+export class ShopFulltextSearchResolver
+    implements Pick<BaseSearchResolver, 'search' | 'facetValues' | 'collections'>
+{
     constructor(private fulltextSearchService: FulltextSearchService) {}
 
     @Query()
@@ -52,14 +56,17 @@ export class ShopFulltextSearchResolver implements Omit<BaseSearchResolver, 'rei
 
 @Resolver('SearchResponse')
 export class AdminFulltextSearchResolver implements BaseSearchResolver {
-    constructor(private fulltextSearchService: FulltextSearchService) {}
+    constructor(
+        private fulltextSearchService: FulltextSearchService,
+        private searchJobBufferService: SearchJobBufferService,
+    ) {}
 
     @Query()
     @Allow(Permission.ReadCatalog, Permission.ReadProduct)
     async search(
         @Ctx() ctx: RequestContext,
         @Args() args: QuerySearchArgs,
-    ): Promise<Omit<SearchResponse, 'facetValues'| 'collections'>> {
+    ): Promise<Omit<SearchResponse, 'facetValues' | 'collections'>> {
         const result = await this.fulltextSearchService.search(ctx, args.input, false);
         // ensure the facetValues property resolver has access to the input args
         (result as any).input = args.input;
@@ -87,4 +94,17 @@ export class AdminFulltextSearchResolver implements BaseSearchResolver {
     async reindex(@Ctx() ctx: RequestContext) {
         return this.fulltextSearchService.reindex(ctx);
     }
+
+    @Query()
+    @Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
+    async pendingSearchIndexUpdates(...args: any[]): Promise<any> {
+        return this.searchJobBufferService.getPendingSearchUpdates();
+    }
+
+    @Mutation()
+    @Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
+    async runPendingSearchIndexUpdates(...args: any[]): Promise<any> {
+        await this.searchJobBufferService.runPendingSearchUpdates();
+        return { success: true };
+    }
 }

+ 3 - 4
packages/core/src/plugin/default-search-plugin/collection-job-buffer.ts → packages/core/src/plugin/default-search-plugin/search-job-buffer/collection-job-buffer.ts

@@ -1,10 +1,9 @@
 import { ID } from '@vendure/common/lib/shared-types';
 import { unique } from '@vendure/common/lib/unique';
 
-import { Job, JobBuffer } from '../../job-queue';
-import { ApplyCollectionFiltersJobData } from '../../service/services/collection.service';
-
-import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types';
+import { Job, JobBuffer } from '../../../job-queue/index';
+import { ApplyCollectionFiltersJobData } from '../../../service/services/collection.service';
+import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from '../types';
 
 export class CollectionJobBuffer implements JobBuffer<ApplyCollectionFiltersJobData> {
     readonly id = 'search-plugin-apply-collection-filters';

+ 3 - 4
packages/core/src/plugin/default-search-plugin/search-job-buffer.ts → packages/core/src/plugin/default-search-plugin/search-job-buffer/search-index-job-buffer.ts

@@ -1,11 +1,10 @@
 import { ID } from '@vendure/common/lib/shared-types';
 import { unique } from '@vendure/common/lib/unique';
 
-import { Job, JobBuffer } from '../../job-queue';
+import { Job, JobBuffer } from '../../../job-queue/index';
+import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from '../types';
 
-import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types';
-
-export class SearchJobBuffer implements JobBuffer<UpdateIndexQueueJobData> {
+export class SearchIndexJobBuffer implements JobBuffer<UpdateIndexQueueJobData> {
     readonly id = 'search-plugin-update-search-index';
 
     collect(job: Job<UpdateIndexQueueJobData>): boolean | Promise<boolean> {

+ 64 - 0
packages/core/src/plugin/default-search-plugin/search-job-buffer/search-job-buffer.service.ts

@@ -0,0 +1,64 @@
+import { Inject, Injectable, OnApplicationBootstrap } from '@nestjs/common';
+import { forkJoin } from 'rxjs';
+
+import { ConfigService } from '../../../config/config.service';
+import { isInspectableJobQueueStrategy } from '../../../config/job-queue/inspectable-job-queue-strategy';
+import { JobQueueService } from '../../../job-queue/job-queue.service';
+import { SubscribableJob } from '../../../job-queue/subscribable-job';
+import { PLUGIN_INIT_OPTIONS } from '../constants';
+import { DefaultSearchPluginInitOptions } from '../types';
+
+import { CollectionJobBuffer } from './collection-job-buffer';
+import { SearchIndexJobBuffer } from './search-index-job-buffer';
+
+@Injectable()
+export class SearchJobBufferService implements OnApplicationBootstrap {
+    readonly searchIndexJobBuffer = new SearchIndexJobBuffer();
+    readonly collectionJobBuffer = new CollectionJobBuffer();
+
+    constructor(
+        private jobQueueService: JobQueueService,
+        private configService: ConfigService,
+        @Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions,
+    ) {}
+
+    onApplicationBootstrap(): any {
+        if (this.options.bufferUpdates === true) {
+            this.jobQueueService.addBuffer(this.searchIndexJobBuffer);
+            this.jobQueueService.addBuffer(this.collectionJobBuffer);
+        }
+    }
+
+    async getPendingSearchUpdates(): Promise<number> {
+        if (!this.options.bufferUpdates) {
+            return 0;
+        }
+        const bufferSizes = await this.jobQueueService.bufferSize(
+            this.searchIndexJobBuffer,
+            this.collectionJobBuffer,
+        );
+        return (
+            (bufferSizes[this.searchIndexJobBuffer.id] ?? 0) + (bufferSizes[this.collectionJobBuffer.id] ?? 0)
+        );
+    }
+
+    async runPendingSearchUpdates(): Promise<void> {
+        if (!this.options.bufferUpdates) {
+            return;
+        }
+        const { jobQueueStrategy } = this.configService.jobQueueOptions;
+
+        const collectionFilterJobs = await this.jobQueueService.flush(this.collectionJobBuffer);
+        if (collectionFilterJobs.length && isInspectableJobQueueStrategy(jobQueueStrategy)) {
+            const subscribableCollectionJobs = collectionFilterJobs.map(
+                job => new SubscribableJob(job, jobQueueStrategy),
+            );
+            await forkJoin(
+                ...subscribableCollectionJobs.map(sj =>
+                    sj.updates({ pollInterval: 500, timeoutMs: 3 * 60 * 1000 }),
+                ),
+            ).toPromise();
+        }
+        await this.jobQueueService.flush(this.searchIndexJobBuffer);
+    }
+}

+ 4 - 0
packages/core/src/plugin/default-search-plugin/types.ts

@@ -3,6 +3,10 @@ import { ID, JsonCompatible } from '@vendure/common/lib/shared-types';
 import { SerializedRequestContext } from '../../api/common/request-context';
 import { Asset } from '../../entity/asset/asset.entity';
 
+export interface DefaultSearchPluginInitOptions {
+    bufferUpdates?: boolean;
+}
+
 export type ReindexMessageResponse = {
     total: number;
     completed: number;

+ 3 - 5
packages/dev-server/dev-config.ts

@@ -3,7 +3,6 @@ import { AdminUiPlugin } from '@vendure/admin-ui-plugin';
 import { AssetServerPlugin } from '@vendure/asset-server-plugin';
 import { ADMIN_API_PATH, API_PORT, SHOP_API_PATH } from '@vendure/common/lib/shared-constants';
 import {
-    DefaultJobQueuePlugin,
     DefaultLogger,
     DefaultSearchPlugin,
     dummyPaymentHandler,
@@ -15,8 +14,6 @@ import { BullMQJobQueuePlugin } from '@vendure/job-queue-plugin/package/bullmq';
 import path from 'path';
 import { ConnectionOptions } from 'typeorm';
 
-import { JobQueueTestPlugin } from './test-plugins/job-queue-test/job-queue-test-plugin';
-
 /**
  * Config settings used during development
  */
@@ -46,6 +43,7 @@ export const devConfig: VendureConfig = {
         cookieOptions: {
             secret: 'abc',
         },
+        // passwordHashingStrategy: new PlaintextHashingStrategy(),
     },
     dbConnectionOptions: {
         synchronize: false,
@@ -57,7 +55,7 @@ export const devConfig: VendureConfig = {
         paymentMethodHandlers: [dummyPaymentHandler],
     },
     customFields: {},
-    logger: new DefaultLogger({ level: LogLevel.Info }),
+    logger: new DefaultLogger({ level: LogLevel.Debug }),
     importExportOptions: {
         importAssetsDir: path.join(__dirname, 'import-assets'),
     },
@@ -66,7 +64,7 @@ export const devConfig: VendureConfig = {
             route: 'assets',
             assetUploadDir: path.join(__dirname, 'assets'),
         }),
-        DefaultSearchPlugin,
+        DefaultSearchPlugin.init({ bufferUpdates: true }),
         BullMQJobQueuePlugin.init({}),
         // DefaultJobQueuePlugin,
         // JobQueueTestPlugin.init({ queueCount: 10 }),

+ 2 - 0
packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts

@@ -2235,6 +2235,7 @@ export type Mutation = {
     /** Create a new ProductOption within a ProductOptionGroup */
     updateProductOption: ProductOption;
     reindex: Job;
+    runPendingSearchIndexUpdates: Success;
     /** Create a new Product */
     createProduct: Product;
     /** Update an existing Product */
@@ -3688,6 +3689,7 @@ export type Query = {
     productOptionGroups: Array<ProductOptionGroup>;
     productOptionGroup?: Maybe<ProductOptionGroup>;
     search: SearchResponse;
+    pendingSearchIndexUpdates: Scalars['Int'];
     /** List Products */
     products: ProductList;
     /** Get a Product either by id or slug. If neither id nor slug is speicified, an error will result. */

Разница между файлами не показана из-за своего большого размера
+ 0 - 0
schema-admin.json


Некоторые файлы не были показаны из-за большого количества измененных файлов