소스 검색

feat(core): Create JobBuffer infrastructure

Relates to #1137
Michael Bromley 4 년 전
부모
커밋
d6aa20fe6c
23개의 변경된 파일312개의 추가작업 그리고 116개의 파일을 삭제
  1. 21 18
      packages/admin-ui/src/lib/core/src/common/generated-types.ts
  2. 18 17
      packages/asset-server-plugin/e2e/graphql/generated-e2e-asset-server-plugin-types.ts
  3. 1 8
      packages/common/src/generated-shop-types.ts
  4. 21 18
      packages/common/src/generated-types.ts
  5. 18 17
      packages/core/e2e/graphql/generated-e2e-admin-types.ts
  6. 1 7
      packages/core/e2e/graphql/generated-e2e-shop-types.ts
  7. 22 1
      packages/core/src/api/resolvers/admin/job.resolver.ts
  8. 7 0
      packages/core/src/api/schema/admin-api/job.api.graphql
  9. 2 1
      packages/core/src/config/config.module.ts
  10. 2 0
      packages/core/src/config/default-config.ts
  11. 2 0
      packages/core/src/config/vendure-config.ts
  12. 45 0
      packages/core/src/job-queue/job-buffer/in-memory-job-buffer-storage-strategy.ts
  13. 8 0
      packages/core/src/job-queue/job-buffer/job-buffer-processor.ts
  14. 8 0
      packages/core/src/job-queue/job-buffer/job-buffer-storage-strategy.ts
  15. 68 0
      packages/core/src/job-queue/job-buffer/job-buffer.ts
  16. 25 0
      packages/core/src/job-queue/job-buffer/sql-job-buffer-storage-strategy.ts
  17. 3 2
      packages/core/src/job-queue/job-queue.module.ts
  18. 3 2
      packages/core/src/job-queue/job-queue.service.ts
  19. 17 6
      packages/core/src/job-queue/job-queue.ts
  20. 2 2
      packages/core/src/service/services/collection.service.ts
  21. 18 17
      packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts
  22. 0 0
      schema-admin.json
  23. 0 0
      schema-shop.json

+ 21 - 18
packages/admin-ui/src/lib/core/src/common/generated-types.ts

@@ -647,17 +647,13 @@ export type CreatePaymentMethodInput = {
   handler: ConfigurableOperationInput;
   handler: ConfigurableOperationInput;
 };
 };
 
 
-export type CreateProductCustomFieldsInput = {
-  coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type CreateProductInput = {
 export type CreateProductInput = {
   featuredAssetId?: Maybe<Scalars['ID']>;
   featuredAssetId?: Maybe<Scalars['ID']>;
   enabled?: Maybe<Scalars['Boolean']>;
   enabled?: Maybe<Scalars['Boolean']>;
   assetIds?: Maybe<Array<Scalars['ID']>>;
   assetIds?: Maybe<Array<Scalars['ID']>>;
   facetValueIds?: Maybe<Array<Scalars['ID']>>;
   facetValueIds?: Maybe<Array<Scalars['ID']>>;
   translations: Array<ProductTranslationInput>;
   translations: Array<ProductTranslationInput>;
-  customFields?: Maybe<CreateProductCustomFieldsInput>;
+  customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type CreateProductOptionGroupInput = {
 export type CreateProductOptionGroupInput = {
@@ -1675,6 +1671,12 @@ export type Job = Node & {
   attempts: Scalars['Int'];
   attempts: Scalars['Int'];
 };
 };
 
 
+export type JobBufferSize = {
+  __typename?: 'JobBufferSize';
+  processorId: Scalars['String'];
+  size: Scalars['Int'];
+};
+
 export type JobFilterParameter = {
 export type JobFilterParameter = {
   createdAt?: Maybe<DateOperators>;
   createdAt?: Maybe<DateOperators>;
   startedAt?: Maybe<DateOperators>;
   startedAt?: Maybe<DateOperators>;
@@ -2276,6 +2278,7 @@ export type Mutation = {
   deleteTaxRate: DeletionResponse;
   deleteTaxRate: DeletionResponse;
   /** Delete a Zone */
   /** Delete a Zone */
   deleteZone: DeletionResponse;
   deleteZone: DeletionResponse;
+  flushBufferedJobs: Success;
   importProducts?: Maybe<ImportInfo>;
   importProducts?: Maybe<ImportInfo>;
   /** Authenticates the user using the native authentication strategy. This mutation is an alias for `authenticate({ native: { ... }})` */
   /** Authenticates the user using the native authentication strategy. This mutation is an alias for `authenticate({ native: { ... }})` */
   login: NativeAuthenticationResult;
   login: NativeAuthenticationResult;
@@ -2679,6 +2682,11 @@ export type MutationDeleteZoneArgs = {
 };
 };
 
 
 
 
+export type MutationFlushBufferedJobsArgs = {
+  processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
+
 export type MutationImportProductsArgs = {
 export type MutationImportProductsArgs = {
   csvFile: Scalars['Upload'];
   csvFile: Scalars['Upload'];
 };
 };
@@ -3603,7 +3611,7 @@ export type Product = Node & {
   facetValues: Array<FacetValue>;
   facetValues: Array<FacetValue>;
   translations: Array<ProductTranslation>;
   translations: Array<ProductTranslation>;
   collections: Array<Collection>;
   collections: Array<Collection>;
-  customFields?: Maybe<ProductCustomFields>;
+  customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 
 
@@ -3611,11 +3619,6 @@ export type ProductVariantListArgs = {
   options?: Maybe<ProductVariantListOptions>;
   options?: Maybe<ProductVariantListOptions>;
 };
 };
 
 
-export type ProductCustomFields = {
-  __typename?: 'ProductCustomFields';
-  coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type ProductFilterParameter = {
 export type ProductFilterParameter = {
   enabled?: Maybe<BooleanOperators>;
   enabled?: Maybe<BooleanOperators>;
   createdAt?: Maybe<DateOperators>;
   createdAt?: Maybe<DateOperators>;
@@ -3624,7 +3627,6 @@ export type ProductFilterParameter = {
   name?: Maybe<StringOperators>;
   name?: Maybe<StringOperators>;
   slug?: Maybe<StringOperators>;
   slug?: Maybe<StringOperators>;
   description?: Maybe<StringOperators>;
   description?: Maybe<StringOperators>;
-  coffeeProfileWithMilk?: Maybe<StringOperators>;
 };
 };
 
 
 export type ProductList = PaginatedList & {
 export type ProductList = PaginatedList & {
@@ -3714,7 +3716,6 @@ export type ProductSortParameter = {
   name?: Maybe<SortOrder>;
   name?: Maybe<SortOrder>;
   slug?: Maybe<SortOrder>;
   slug?: Maybe<SortOrder>;
   description?: Maybe<SortOrder>;
   description?: Maybe<SortOrder>;
-  coffeeProfileWithMilk?: Maybe<SortOrder>;
 };
 };
 
 
 export type ProductTranslation = {
 export type ProductTranslation = {
@@ -3920,6 +3921,7 @@ export type Query = {
   fulfillmentHandlers: Array<ConfigurableOperationDefinition>;
   fulfillmentHandlers: Array<ConfigurableOperationDefinition>;
   globalSettings: GlobalSettings;
   globalSettings: GlobalSettings;
   job?: Maybe<Job>;
   job?: Maybe<Job>;
+  jobBufferSize: Array<JobBufferSize>;
   jobQueues: Array<JobQueue>;
   jobQueues: Array<JobQueue>;
   jobs: JobList;
   jobs: JobList;
   jobsById: Array<Job>;
   jobsById: Array<Job>;
@@ -4048,6 +4050,11 @@ export type QueryJobArgs = {
 };
 };
 
 
 
 
+export type QueryJobBufferSizeArgs = {
+  processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
+
 export type QueryJobsArgs = {
 export type QueryJobsArgs = {
   options?: Maybe<JobListOptions>;
   options?: Maybe<JobListOptions>;
 };
 };
@@ -4941,10 +4948,6 @@ export type UpdatePaymentMethodInput = {
   handler?: Maybe<ConfigurableOperationInput>;
   handler?: Maybe<ConfigurableOperationInput>;
 };
 };
 
 
-export type UpdateProductCustomFieldsInput = {
-  coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type UpdateProductInput = {
 export type UpdateProductInput = {
   id: Scalars['ID'];
   id: Scalars['ID'];
   enabled?: Maybe<Scalars['Boolean']>;
   enabled?: Maybe<Scalars['Boolean']>;
@@ -4952,7 +4955,7 @@ export type UpdateProductInput = {
   assetIds?: Maybe<Array<Scalars['ID']>>;
   assetIds?: Maybe<Array<Scalars['ID']>>;
   facetValueIds?: Maybe<Array<Scalars['ID']>>;
   facetValueIds?: Maybe<Array<Scalars['ID']>>;
   translations?: Maybe<Array<ProductTranslationInput>>;
   translations?: Maybe<Array<ProductTranslationInput>>;
-  customFields?: Maybe<UpdateProductCustomFieldsInput>;
+  customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type UpdateProductOptionGroupInput = {
 export type UpdateProductOptionGroupInput = {

+ 18 - 17
packages/asset-server-plugin/e2e/graphql/generated-e2e-asset-server-plugin-types.ts

@@ -632,17 +632,13 @@ export type CreatePaymentMethodInput = {
     handler: ConfigurableOperationInput;
     handler: ConfigurableOperationInput;
 };
 };
 
 
-export type CreateProductCustomFieldsInput = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type CreateProductInput = {
 export type CreateProductInput = {
     featuredAssetId?: Maybe<Scalars['ID']>;
     featuredAssetId?: Maybe<Scalars['ID']>;
     enabled?: Maybe<Scalars['Boolean']>;
     enabled?: Maybe<Scalars['Boolean']>;
     assetIds?: Maybe<Array<Scalars['ID']>>;
     assetIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     translations: Array<ProductTranslationInput>;
     translations: Array<ProductTranslationInput>;
-    customFields?: Maybe<CreateProductCustomFieldsInput>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type CreateProductOptionGroupInput = {
 export type CreateProductOptionGroupInput = {
@@ -1624,6 +1620,11 @@ export type Job = Node & {
     attempts: Scalars['Int'];
     attempts: Scalars['Int'];
 };
 };
 
 
+export type JobBufferSize = {
+    processorId: Scalars['String'];
+    size: Scalars['Int'];
+};
+
 export type JobFilterParameter = {
 export type JobFilterParameter = {
     createdAt?: Maybe<DateOperators>;
     createdAt?: Maybe<DateOperators>;
     startedAt?: Maybe<DateOperators>;
     startedAt?: Maybe<DateOperators>;
@@ -2194,6 +2195,7 @@ export type Mutation = {
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     removeSettledJobs: Scalars['Int'];
     removeSettledJobs: Scalars['Int'];
     cancelJob: Job;
     cancelJob: Job;
+    flushBufferedJobs: Success;
     settlePayment: SettlePaymentResult;
     settlePayment: SettlePaymentResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     cancelOrder: CancelOrderResult;
     cancelOrder: CancelOrderResult;
@@ -2501,6 +2503,10 @@ export type MutationCancelJobArgs = {
     jobId: Scalars['ID'];
     jobId: Scalars['ID'];
 };
 };
 
 
+export type MutationFlushBufferedJobsArgs = {
+    processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
 export type MutationSettlePaymentArgs = {
 export type MutationSettlePaymentArgs = {
     id: Scalars['ID'];
     id: Scalars['ID'];
 };
 };
@@ -3375,17 +3381,13 @@ export type Product = Node & {
     facetValues: Array<FacetValue>;
     facetValues: Array<FacetValue>;
     translations: Array<ProductTranslation>;
     translations: Array<ProductTranslation>;
     collections: Array<Collection>;
     collections: Array<Collection>;
-    customFields?: Maybe<ProductCustomFields>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type ProductVariantListArgs = {
 export type ProductVariantListArgs = {
     options?: Maybe<ProductVariantListOptions>;
     options?: Maybe<ProductVariantListOptions>;
 };
 };
 
 
-export type ProductCustomFields = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type ProductFilterParameter = {
 export type ProductFilterParameter = {
     enabled?: Maybe<BooleanOperators>;
     enabled?: Maybe<BooleanOperators>;
     createdAt?: Maybe<DateOperators>;
     createdAt?: Maybe<DateOperators>;
@@ -3394,7 +3396,6 @@ export type ProductFilterParameter = {
     name?: Maybe<StringOperators>;
     name?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
-    coffeeProfileWithMilk?: Maybe<StringOperators>;
 };
 };
 
 
 export type ProductList = PaginatedList & {
 export type ProductList = PaginatedList & {
@@ -3478,7 +3479,6 @@ export type ProductSortParameter = {
     name?: Maybe<SortOrder>;
     name?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
-    coffeeProfileWithMilk?: Maybe<SortOrder>;
 };
 };
 
 
 export type ProductTranslation = {
 export type ProductTranslation = {
@@ -3678,6 +3678,7 @@ export type Query = {
     jobs: JobList;
     jobs: JobList;
     jobsById: Array<Job>;
     jobsById: Array<Job>;
     jobQueues: Array<JobQueue>;
     jobQueues: Array<JobQueue>;
+    jobBufferSize: Array<JobBufferSize>;
     order?: Maybe<Order>;
     order?: Maybe<Order>;
     orders: OrderList;
     orders: OrderList;
     paymentMethods: PaymentMethodList;
     paymentMethods: PaymentMethodList;
@@ -3791,6 +3792,10 @@ export type QueryJobsByIdArgs = {
     jobIds: Array<Scalars['ID']>;
     jobIds: Array<Scalars['ID']>;
 };
 };
 
 
+export type QueryJobBufferSizeArgs = {
+    processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
 export type QueryOrderArgs = {
 export type QueryOrderArgs = {
     id: Scalars['ID'];
     id: Scalars['ID'];
 };
 };
@@ -4623,10 +4628,6 @@ export type UpdatePaymentMethodInput = {
     handler?: Maybe<ConfigurableOperationInput>;
     handler?: Maybe<ConfigurableOperationInput>;
 };
 };
 
 
-export type UpdateProductCustomFieldsInput = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type UpdateProductInput = {
 export type UpdateProductInput = {
     id: Scalars['ID'];
     id: Scalars['ID'];
     enabled?: Maybe<Scalars['Boolean']>;
     enabled?: Maybe<Scalars['Boolean']>;
@@ -4634,7 +4635,7 @@ export type UpdateProductInput = {
     assetIds?: Maybe<Array<Scalars['ID']>>;
     assetIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     translations?: Maybe<Array<ProductTranslationInput>>;
     translations?: Maybe<Array<ProductTranslationInput>>;
-    customFields?: Maybe<UpdateProductCustomFieldsInput>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type UpdateProductOptionGroupInput = {
 export type UpdateProductOptionGroupInput = {

+ 1 - 8
packages/common/src/generated-shop-types.ts

@@ -2317,18 +2317,13 @@ export type Product = Node & {
     facetValues: Array<FacetValue>;
     facetValues: Array<FacetValue>;
     translations: Array<ProductTranslation>;
     translations: Array<ProductTranslation>;
     collections: Array<Collection>;
     collections: Array<Collection>;
-    customFields?: Maybe<ProductCustomFields>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type ProductVariantListArgs = {
 export type ProductVariantListArgs = {
     options?: Maybe<ProductVariantListOptions>;
     options?: Maybe<ProductVariantListOptions>;
 };
 };
 
 
-export type ProductCustomFields = {
-    __typename?: 'ProductCustomFields';
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type ProductFilterParameter = {
 export type ProductFilterParameter = {
     createdAt?: Maybe<DateOperators>;
     createdAt?: Maybe<DateOperators>;
     updatedAt?: Maybe<DateOperators>;
     updatedAt?: Maybe<DateOperators>;
@@ -2336,7 +2331,6 @@ export type ProductFilterParameter = {
     name?: Maybe<StringOperators>;
     name?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
-    coffeeProfileWithMilk?: Maybe<StringOperators>;
 };
 };
 
 
 export type ProductList = PaginatedList & {
 export type ProductList = PaginatedList & {
@@ -2404,7 +2398,6 @@ export type ProductSortParameter = {
     name?: Maybe<SortOrder>;
     name?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
-    coffeeProfileWithMilk?: Maybe<SortOrder>;
 };
 };
 
 
 export type ProductTranslation = {
 export type ProductTranslation = {

+ 21 - 18
packages/common/src/generated-types.ts

@@ -646,17 +646,13 @@ export type CreatePaymentMethodInput = {
   handler: ConfigurableOperationInput;
   handler: ConfigurableOperationInput;
 };
 };
 
 
-export type CreateProductCustomFieldsInput = {
-  coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type CreateProductInput = {
 export type CreateProductInput = {
   featuredAssetId?: Maybe<Scalars['ID']>;
   featuredAssetId?: Maybe<Scalars['ID']>;
   enabled?: Maybe<Scalars['Boolean']>;
   enabled?: Maybe<Scalars['Boolean']>;
   assetIds?: Maybe<Array<Scalars['ID']>>;
   assetIds?: Maybe<Array<Scalars['ID']>>;
   facetValueIds?: Maybe<Array<Scalars['ID']>>;
   facetValueIds?: Maybe<Array<Scalars['ID']>>;
   translations: Array<ProductTranslationInput>;
   translations: Array<ProductTranslationInput>;
-  customFields?: Maybe<CreateProductCustomFieldsInput>;
+  customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type CreateProductOptionGroupInput = {
 export type CreateProductOptionGroupInput = {
@@ -1667,6 +1663,12 @@ export type Job = Node & {
   attempts: Scalars['Int'];
   attempts: Scalars['Int'];
 };
 };
 
 
+export type JobBufferSize = {
+  __typename?: 'JobBufferSize';
+  processorId: Scalars['String'];
+  size: Scalars['Int'];
+};
+
 export type JobFilterParameter = {
 export type JobFilterParameter = {
   createdAt?: Maybe<DateOperators>;
   createdAt?: Maybe<DateOperators>;
   startedAt?: Maybe<DateOperators>;
   startedAt?: Maybe<DateOperators>;
@@ -2239,6 +2241,7 @@ export type Mutation = {
   /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
   /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
   removeSettledJobs: Scalars['Int'];
   removeSettledJobs: Scalars['Int'];
   cancelJob: Job;
   cancelJob: Job;
+  flushBufferedJobs: Success;
   settlePayment: SettlePaymentResult;
   settlePayment: SettlePaymentResult;
   addFulfillmentToOrder: AddFulfillmentToOrderResult;
   addFulfillmentToOrder: AddFulfillmentToOrderResult;
   cancelOrder: CancelOrderResult;
   cancelOrder: CancelOrderResult;
@@ -2593,6 +2596,11 @@ export type MutationCancelJobArgs = {
 };
 };
 
 
 
 
+export type MutationFlushBufferedJobsArgs = {
+  processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
+
 export type MutationSettlePaymentArgs = {
 export type MutationSettlePaymentArgs = {
   id: Scalars['ID'];
   id: Scalars['ID'];
 };
 };
@@ -3550,7 +3558,7 @@ export type Product = Node & {
   facetValues: Array<FacetValue>;
   facetValues: Array<FacetValue>;
   translations: Array<ProductTranslation>;
   translations: Array<ProductTranslation>;
   collections: Array<Collection>;
   collections: Array<Collection>;
-  customFields?: Maybe<ProductCustomFields>;
+  customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 
 
@@ -3558,11 +3566,6 @@ export type ProductVariantListArgs = {
   options?: Maybe<ProductVariantListOptions>;
   options?: Maybe<ProductVariantListOptions>;
 };
 };
 
 
-export type ProductCustomFields = {
-  __typename?: 'ProductCustomFields';
-  coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type ProductFilterParameter = {
 export type ProductFilterParameter = {
   enabled?: Maybe<BooleanOperators>;
   enabled?: Maybe<BooleanOperators>;
   createdAt?: Maybe<DateOperators>;
   createdAt?: Maybe<DateOperators>;
@@ -3571,7 +3574,6 @@ export type ProductFilterParameter = {
   name?: Maybe<StringOperators>;
   name?: Maybe<StringOperators>;
   slug?: Maybe<StringOperators>;
   slug?: Maybe<StringOperators>;
   description?: Maybe<StringOperators>;
   description?: Maybe<StringOperators>;
-  coffeeProfileWithMilk?: Maybe<StringOperators>;
 };
 };
 
 
 export type ProductList = PaginatedList & {
 export type ProductList = PaginatedList & {
@@ -3661,7 +3663,6 @@ export type ProductSortParameter = {
   name?: Maybe<SortOrder>;
   name?: Maybe<SortOrder>;
   slug?: Maybe<SortOrder>;
   slug?: Maybe<SortOrder>;
   description?: Maybe<SortOrder>;
   description?: Maybe<SortOrder>;
-  coffeeProfileWithMilk?: Maybe<SortOrder>;
 };
 };
 
 
 export type ProductTranslation = {
 export type ProductTranslation = {
@@ -3870,6 +3871,7 @@ export type Query = {
   jobs: JobList;
   jobs: JobList;
   jobsById: Array<Job>;
   jobsById: Array<Job>;
   jobQueues: Array<JobQueue>;
   jobQueues: Array<JobQueue>;
+  jobBufferSize: Array<JobBufferSize>;
   order?: Maybe<Order>;
   order?: Maybe<Order>;
   orders: OrderList;
   orders: OrderList;
   paymentMethods: PaymentMethodList;
   paymentMethods: PaymentMethodList;
@@ -4002,6 +4004,11 @@ export type QueryJobsByIdArgs = {
 };
 };
 
 
 
 
+export type QueryJobBufferSizeArgs = {
+  processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
+
 export type QueryOrderArgs = {
 export type QueryOrderArgs = {
   id: Scalars['ID'];
   id: Scalars['ID'];
 };
 };
@@ -4878,10 +4885,6 @@ export type UpdatePaymentMethodInput = {
   handler?: Maybe<ConfigurableOperationInput>;
   handler?: Maybe<ConfigurableOperationInput>;
 };
 };
 
 
-export type UpdateProductCustomFieldsInput = {
-  coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type UpdateProductInput = {
 export type UpdateProductInput = {
   id: Scalars['ID'];
   id: Scalars['ID'];
   enabled?: Maybe<Scalars['Boolean']>;
   enabled?: Maybe<Scalars['Boolean']>;
@@ -4889,7 +4892,7 @@ export type UpdateProductInput = {
   assetIds?: Maybe<Array<Scalars['ID']>>;
   assetIds?: Maybe<Array<Scalars['ID']>>;
   facetValueIds?: Maybe<Array<Scalars['ID']>>;
   facetValueIds?: Maybe<Array<Scalars['ID']>>;
   translations?: Maybe<Array<ProductTranslationInput>>;
   translations?: Maybe<Array<ProductTranslationInput>>;
-  customFields?: Maybe<UpdateProductCustomFieldsInput>;
+  customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type UpdateProductOptionGroupInput = {
 export type UpdateProductOptionGroupInput = {

+ 18 - 17
packages/core/e2e/graphql/generated-e2e-admin-types.ts

@@ -632,17 +632,13 @@ export type CreatePaymentMethodInput = {
     handler: ConfigurableOperationInput;
     handler: ConfigurableOperationInput;
 };
 };
 
 
-export type CreateProductCustomFieldsInput = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type CreateProductInput = {
 export type CreateProductInput = {
     featuredAssetId?: Maybe<Scalars['ID']>;
     featuredAssetId?: Maybe<Scalars['ID']>;
     enabled?: Maybe<Scalars['Boolean']>;
     enabled?: Maybe<Scalars['Boolean']>;
     assetIds?: Maybe<Array<Scalars['ID']>>;
     assetIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     translations: Array<ProductTranslationInput>;
     translations: Array<ProductTranslationInput>;
-    customFields?: Maybe<CreateProductCustomFieldsInput>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type CreateProductOptionGroupInput = {
 export type CreateProductOptionGroupInput = {
@@ -1624,6 +1620,11 @@ export type Job = Node & {
     attempts: Scalars['Int'];
     attempts: Scalars['Int'];
 };
 };
 
 
+export type JobBufferSize = {
+    processorId: Scalars['String'];
+    size: Scalars['Int'];
+};
+
 export type JobFilterParameter = {
 export type JobFilterParameter = {
     createdAt?: Maybe<DateOperators>;
     createdAt?: Maybe<DateOperators>;
     startedAt?: Maybe<DateOperators>;
     startedAt?: Maybe<DateOperators>;
@@ -2194,6 +2195,7 @@ export type Mutation = {
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     removeSettledJobs: Scalars['Int'];
     removeSettledJobs: Scalars['Int'];
     cancelJob: Job;
     cancelJob: Job;
+    flushBufferedJobs: Success;
     settlePayment: SettlePaymentResult;
     settlePayment: SettlePaymentResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     cancelOrder: CancelOrderResult;
     cancelOrder: CancelOrderResult;
@@ -2501,6 +2503,10 @@ export type MutationCancelJobArgs = {
     jobId: Scalars['ID'];
     jobId: Scalars['ID'];
 };
 };
 
 
+export type MutationFlushBufferedJobsArgs = {
+    processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
 export type MutationSettlePaymentArgs = {
 export type MutationSettlePaymentArgs = {
     id: Scalars['ID'];
     id: Scalars['ID'];
 };
 };
@@ -3375,17 +3381,13 @@ export type Product = Node & {
     facetValues: Array<FacetValue>;
     facetValues: Array<FacetValue>;
     translations: Array<ProductTranslation>;
     translations: Array<ProductTranslation>;
     collections: Array<Collection>;
     collections: Array<Collection>;
-    customFields?: Maybe<ProductCustomFields>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type ProductVariantListArgs = {
 export type ProductVariantListArgs = {
     options?: Maybe<ProductVariantListOptions>;
     options?: Maybe<ProductVariantListOptions>;
 };
 };
 
 
-export type ProductCustomFields = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type ProductFilterParameter = {
 export type ProductFilterParameter = {
     enabled?: Maybe<BooleanOperators>;
     enabled?: Maybe<BooleanOperators>;
     createdAt?: Maybe<DateOperators>;
     createdAt?: Maybe<DateOperators>;
@@ -3394,7 +3396,6 @@ export type ProductFilterParameter = {
     name?: Maybe<StringOperators>;
     name?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
-    coffeeProfileWithMilk?: Maybe<StringOperators>;
 };
 };
 
 
 export type ProductList = PaginatedList & {
 export type ProductList = PaginatedList & {
@@ -3478,7 +3479,6 @@ export type ProductSortParameter = {
     name?: Maybe<SortOrder>;
     name?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
-    coffeeProfileWithMilk?: Maybe<SortOrder>;
 };
 };
 
 
 export type ProductTranslation = {
 export type ProductTranslation = {
@@ -3678,6 +3678,7 @@ export type Query = {
     jobs: JobList;
     jobs: JobList;
     jobsById: Array<Job>;
     jobsById: Array<Job>;
     jobQueues: Array<JobQueue>;
     jobQueues: Array<JobQueue>;
+    jobBufferSize: Array<JobBufferSize>;
     order?: Maybe<Order>;
     order?: Maybe<Order>;
     orders: OrderList;
     orders: OrderList;
     paymentMethods: PaymentMethodList;
     paymentMethods: PaymentMethodList;
@@ -3791,6 +3792,10 @@ export type QueryJobsByIdArgs = {
     jobIds: Array<Scalars['ID']>;
     jobIds: Array<Scalars['ID']>;
 };
 };
 
 
+export type QueryJobBufferSizeArgs = {
+    processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
 export type QueryOrderArgs = {
 export type QueryOrderArgs = {
     id: Scalars['ID'];
     id: Scalars['ID'];
 };
 };
@@ -4623,10 +4628,6 @@ export type UpdatePaymentMethodInput = {
     handler?: Maybe<ConfigurableOperationInput>;
     handler?: Maybe<ConfigurableOperationInput>;
 };
 };
 
 
-export type UpdateProductCustomFieldsInput = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type UpdateProductInput = {
 export type UpdateProductInput = {
     id: Scalars['ID'];
     id: Scalars['ID'];
     enabled?: Maybe<Scalars['Boolean']>;
     enabled?: Maybe<Scalars['Boolean']>;
@@ -4634,7 +4635,7 @@ export type UpdateProductInput = {
     assetIds?: Maybe<Array<Scalars['ID']>>;
     assetIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     translations?: Maybe<Array<ProductTranslationInput>>;
     translations?: Maybe<Array<ProductTranslationInput>>;
-    customFields?: Maybe<UpdateProductCustomFieldsInput>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type UpdateProductOptionGroupInput = {
 export type UpdateProductOptionGroupInput = {

+ 1 - 7
packages/core/e2e/graphql/generated-e2e-shop-types.ts

@@ -2240,17 +2240,13 @@ export type Product = Node & {
     facetValues: Array<FacetValue>;
     facetValues: Array<FacetValue>;
     translations: Array<ProductTranslation>;
     translations: Array<ProductTranslation>;
     collections: Array<Collection>;
     collections: Array<Collection>;
-    customFields?: Maybe<ProductCustomFields>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type ProductVariantListArgs = {
 export type ProductVariantListArgs = {
     options?: Maybe<ProductVariantListOptions>;
     options?: Maybe<ProductVariantListOptions>;
 };
 };
 
 
-export type ProductCustomFields = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type ProductFilterParameter = {
 export type ProductFilterParameter = {
     createdAt?: Maybe<DateOperators>;
     createdAt?: Maybe<DateOperators>;
     updatedAt?: Maybe<DateOperators>;
     updatedAt?: Maybe<DateOperators>;
@@ -2258,7 +2254,6 @@ export type ProductFilterParameter = {
     name?: Maybe<StringOperators>;
     name?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
-    coffeeProfileWithMilk?: Maybe<StringOperators>;
 };
 };
 
 
 export type ProductList = PaginatedList & {
 export type ProductList = PaginatedList & {
@@ -2321,7 +2316,6 @@ export type ProductSortParameter = {
     name?: Maybe<SortOrder>;
     name?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
-    coffeeProfileWithMilk?: Maybe<SortOrder>;
 };
 };
 
 
 export type ProductTranslation = {
 export type ProductTranslation = {

+ 22 - 1
packages/core/src/api/resolvers/admin/job.resolver.ts

@@ -2,20 +2,27 @@ import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
 import {
 import {
     JobQueue,
     JobQueue,
     MutationCancelJobArgs,
     MutationCancelJobArgs,
+    MutationFlushBufferedJobsArgs,
     MutationRemoveSettledJobsArgs,
     MutationRemoveSettledJobsArgs,
     Permission,
     Permission,
     QueryJobArgs,
     QueryJobArgs,
+    QueryJobBufferSizeArgs,
     QueryJobsArgs,
     QueryJobsArgs,
     QueryJobsByIdArgs,
     QueryJobsByIdArgs,
 } from '@vendure/common/lib/generated-types';
 } from '@vendure/common/lib/generated-types';
 
 
 import { ConfigService, InspectableJobQueueStrategy, isInspectableJobQueueStrategy } from '../../../config';
 import { ConfigService, InspectableJobQueueStrategy, isInspectableJobQueueStrategy } from '../../../config';
 import { JobQueueService } from '../../../job-queue';
 import { JobQueueService } from '../../../job-queue';
+import { JobBuffer } from '../../../job-queue/job-buffer/job-buffer';
 import { Allow } from '../../decorators/allow.decorator';
 import { Allow } from '../../decorators/allow.decorator';
 
 
 @Resolver()
 @Resolver()
 export class JobResolver {
 export class JobResolver {
-    constructor(private configService: ConfigService, private jobService: JobQueueService) {}
+    constructor(
+        private configService: ConfigService,
+        private jobService: JobQueueService,
+        private jobBuffer: JobBuffer,
+    ) {}
 
 
     @Query()
     @Query()
     @Allow(Permission.ReadSettings, Permission.ReadSystem)
     @Allow(Permission.ReadSettings, Permission.ReadSystem)
@@ -76,6 +83,20 @@ export class JobResolver {
         return strategy.cancelJob(args.jobId);
         return strategy.cancelJob(args.jobId);
     }
     }
 
 
+    @Query()
+    @Allow(Permission.ReadSettings, Permission.ReadSystem)
+    async jobBufferSize(@Args() args: QueryJobBufferSizeArgs) {
+        const bufferSizes = this.jobBuffer.bufferSize(args.processorIds);
+        return Object.entries(bufferSizes).map(([processorId, size]) => ({ processorId, size }));
+    }
+
+    @Mutation()
+    @Allow(Permission.UpdateSettings, Permission.UpdateSystem)
+    async flushBufferedJobs(@Args() args: MutationFlushBufferedJobsArgs) {
+        await this.jobBuffer.flush(args.processorIds);
+        return { success: true };
+    }
+
     private requireInspectableJobQueueStrategy(): InspectableJobQueueStrategy | undefined {
     private requireInspectableJobQueueStrategy(): InspectableJobQueueStrategy | undefined {
         if (!isInspectableJobQueueStrategy(this.configService.jobQueueOptions.jobQueueStrategy)) {
         if (!isInspectableJobQueueStrategy(this.configService.jobQueueOptions.jobQueueStrategy)) {
             return;
             return;

+ 7 - 0
packages/core/src/api/schema/admin-api/job.api.graphql

@@ -3,12 +3,19 @@ type Query {
     jobs(options: JobListOptions): JobList!
     jobs(options: JobListOptions): JobList!
     jobsById(jobIds: [ID!]!): [Job!]!
     jobsById(jobIds: [ID!]!): [Job!]!
     jobQueues: [JobQueue!]!
     jobQueues: [JobQueue!]!
+    jobBufferSize(processorIds: [String!]): [JobBufferSize!]!
 }
 }
 
 
 type Mutation {
 type Mutation {
     "Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted."
     "Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted."
     removeSettledJobs(queueNames: [String!], olderThan: DateTime): Int!
     removeSettledJobs(queueNames: [String!], olderThan: DateTime): Int!
     cancelJob(jobId: ID!): Job!
     cancelJob(jobId: ID!): Job!
+    flushBufferedJobs(processorIds: [String!]): Success!
+}
+
+type JobBufferSize {
+    processorId: String!
+    size: Int!
 }
 }
 
 
 """
 """

+ 2 - 1
packages/core/src/config/config.module.ts

@@ -67,7 +67,7 @@ export class ConfigModule implements OnApplicationBootstrap, OnApplicationShutdo
             passwordHashingStrategy,
             passwordHashingStrategy,
         } = this.configService.authOptions;
         } = this.configService.authOptions;
         const { taxZoneStrategy } = this.configService.taxOptions;
         const { taxZoneStrategy } = this.configService.taxOptions;
-        const { jobQueueStrategy } = this.configService.jobQueueOptions;
+        const { jobQueueStrategy, jobBufferStorageStrategy } = this.configService.jobQueueOptions;
         const {
         const {
             mergeStrategy,
             mergeStrategy,
             checkoutMergeStrategy,
             checkoutMergeStrategy,
@@ -91,6 +91,7 @@ export class ConfigModule implements OnApplicationBootstrap, OnApplicationShutdo
             assetStorageStrategy,
             assetStorageStrategy,
             taxZoneStrategy,
             taxZoneStrategy,
             jobQueueStrategy,
             jobQueueStrategy,
+            jobBufferStorageStrategy,
             mergeStrategy,
             mergeStrategy,
             checkoutMergeStrategy,
             checkoutMergeStrategy,
             orderCodeStrategy,
             orderCodeStrategy,

+ 2 - 0
packages/core/src/config/default-config.ts

@@ -6,6 +6,7 @@ import {
 } from '@vendure/common/lib/shared-constants';
 } from '@vendure/common/lib/shared-constants';
 
 
 import { InMemoryJobQueueStrategy } from '../job-queue/in-memory-job-queue-strategy';
 import { InMemoryJobQueueStrategy } from '../job-queue/in-memory-job-queue-strategy';
+import { InMemoryJobBufferStorageStrategy } from '../job-queue/job-buffer/in-memory-job-buffer-storage-strategy';
 
 
 import { DefaultAssetNamingStrategy } from './asset-naming-strategy/default-asset-naming-strategy';
 import { DefaultAssetNamingStrategy } from './asset-naming-strategy/default-asset-naming-strategy';
 import { NoAssetPreviewStrategy } from './asset-preview-strategy/no-asset-preview-strategy';
 import { NoAssetPreviewStrategy } from './asset-preview-strategy/no-asset-preview-strategy';
@@ -145,6 +146,7 @@ export const defaultConfig: RuntimeVendureConfig = {
     },
     },
     jobQueueOptions: {
     jobQueueOptions: {
         jobQueueStrategy: new InMemoryJobQueueStrategy(),
         jobQueueStrategy: new InMemoryJobQueueStrategy(),
+        jobBufferStorageStrategy: new InMemoryJobBufferStorageStrategy(),
         activeQueues: [],
         activeQueues: [],
         enableWorkerHealthCheck: false,
         enableWorkerHealthCheck: false,
     },
     },

+ 2 - 0
packages/core/src/config/vendure-config.ts

@@ -8,6 +8,7 @@ import { ConnectionOptions } from 'typeorm';
 
 
 import { Middleware } from '../common';
 import { Middleware } from '../common';
 import { PermissionDefinition } from '../common/permission-definition';
 import { PermissionDefinition } from '../common/permission-definition';
+import { JobBufferStorageStrategy } from '../job-queue/job-buffer/job-buffer-storage-strategy';
 
 
 import { AssetNamingStrategy } from './asset-naming-strategy/asset-naming-strategy';
 import { AssetNamingStrategy } from './asset-naming-strategy/asset-naming-strategy';
 import { AssetPreviewStrategy } from './asset-preview-strategy/asset-preview-strategy';
 import { AssetPreviewStrategy } from './asset-preview-strategy/asset-preview-strategy';
@@ -745,6 +746,7 @@ export interface JobQueueOptions {
      * @default InMemoryJobQueueStrategy
      * @default InMemoryJobQueueStrategy
      */
      */
     jobQueueStrategy?: JobQueueStrategy;
     jobQueueStrategy?: JobQueueStrategy;
+    jobBufferStorageStrategy?: JobBufferStorageStrategy;
     /**
     /**
      * @description
      * @description
      * Defines the queues that will run in this process.
      * Defines the queues that will run in this process.

+ 45 - 0
packages/core/src/job-queue/job-buffer/in-memory-job-buffer-storage-strategy.ts

@@ -0,0 +1,45 @@
+import { Job } from '../job';
+
+import { JobBufferStorageStrategy } from './job-buffer-storage-strategy';
+
+export class InMemoryJobBufferStorageStrategy implements JobBufferStorageStrategy {
+    private bufferStorage = new Map<string, Set<Job>>();
+
+    async add(processorId: string, job: Job): Promise<Job> {
+        const set = this.getSet(processorId);
+        set.add(job);
+        return job;
+    }
+
+    async bufferSize(processorIds?: string[]): Promise<{ [processorId: string]: number }> {
+        const ids = processorIds ?? Array.from(this.bufferStorage.keys());
+        const result: { [processorId: string]: number } = {};
+        for (const id of ids) {
+            const size = this.bufferStorage.get(id)?.size ?? 0;
+            result[id] = size;
+        }
+        return result;
+    }
+
+    async flush(processorIds?: string[]): Promise<{ [processorId: string]: Job[] }> {
+        const ids = processorIds ?? Array.from(this.bufferStorage.keys());
+        const result: { [processorId: string]: Job[] } = {};
+        for (const id of ids) {
+            const jobs = Array.from(this.bufferStorage.get(id) ?? []);
+            this.bufferStorage.get(id)?.clear();
+            result[id] = jobs;
+        }
+        return result;
+    }
+
+    private getSet(processorId: string): Set<Job> {
+        const set = this.bufferStorage.get(processorId);
+        if (set) {
+            return set;
+        } else {
+            const newSet = new Set<Job>();
+            this.bufferStorage.set(processorId, newSet);
+            return newSet;
+        }
+    }
+}

+ 8 - 0
packages/core/src/job-queue/job-buffer/job-buffer-processor.ts

@@ -0,0 +1,8 @@
+import { Job } from '../job';
+import { JobData } from '../types';
+
+export interface JobBufferProcessor<Data extends JobData<Data> = {}> {
+    readonly id: string;
+    collect(job: Job<Data>): boolean | Promise<boolean>;
+    reduce(collectedJobs: Array<Job<Data>>): Array<Job<Data>> | Promise<Array<Job<Data>>>;
+}

+ 8 - 0
packages/core/src/job-queue/job-buffer/job-buffer-storage-strategy.ts

@@ -0,0 +1,8 @@
+import { InjectableStrategy } from '../../common/types/injectable-strategy';
+import { Job } from '../job';
+
+export interface JobBufferStorageStrategy extends InjectableStrategy {
+    add(processorId: string, job: Job): Promise<Job>;
+    bufferSize(processorIds?: string[]): Promise<{ [processorId: string]: number }>;
+    flush(processorIds?: string[]): Promise<{ [processorId: string]: Job[] }>;
+}

+ 68 - 0
packages/core/src/job-queue/job-buffer/job-buffer.ts

@@ -0,0 +1,68 @@
+import { Injectable } from '@nestjs/common';
+
+import { InternalServerError } from '../../common/error/errors';
+import { ConfigService } from '../../config/config.service';
+import { Job } from '../job';
+
+import { JobBufferProcessor } from './job-buffer-processor';
+import { JobBufferStorageStrategy } from './job-buffer-storage-strategy';
+
+@Injectable()
+export class JobBuffer {
+    private processors = new Set<JobBufferProcessor>();
+    private storageStrategy: JobBufferStorageStrategy;
+
+    constructor(private configService: ConfigService) {
+        this.storageStrategy = configService.jobQueueOptions.jobBufferStorageStrategy;
+    }
+
+    addProcessor(processor: JobBufferProcessor) {
+        const idAlreadyExists = Array.from(this.processors).find(p => p.id === processor.id);
+        if (idAlreadyExists) {
+            throw new InternalServerError(
+                `There is already a JobBufferProcessor with the id "${processor.id}". Ids must be unique`,
+            );
+        }
+        this.processors.add(processor);
+    }
+
+    removeProcessor(processor: JobBufferProcessor) {
+        this.processors.delete(processor);
+    }
+
+    async add(job: Job): Promise<boolean> {
+        let collected = false;
+        for (const processor of this.processors) {
+            const shouldCollect = await processor.collect(job);
+            if (shouldCollect) {
+                collected = true;
+                await this.storageStrategy.add(processor.id, job);
+            }
+        }
+        return collected;
+    }
+
+    bufferSize(
+        forProcessors?: Array<JobBufferProcessor | string>,
+    ): Promise<{ [processorId: string]: number }> {
+        const processors = forProcessors ?? Array.from(this.processors);
+        return this.storageStrategy.bufferSize(processors.map(p => (typeof p === 'string' ? p : p.id)));
+    }
+
+    async flush(forProcessors?: Array<JobBufferProcessor | string>): Promise<void> {
+        const { jobQueueStrategy } = this.configService.jobQueueOptions;
+        const processors = forProcessors ?? Array.from(this.processors);
+        const flushResult = await this.storageStrategy.flush(
+            processors.map(p => (typeof p === 'string' ? p : p.id)),
+        );
+        for (const processor of this.processors) {
+            const jobsForProcessor = flushResult[processor.id];
+            if (jobsForProcessor?.length) {
+                const reducedJobs = await processor.reduce(jobsForProcessor);
+                for (const job of reducedJobs) {
+                    await jobQueueStrategy.add(job);
+                }
+            }
+        }
+    }
+}

+ 25 - 0
packages/core/src/job-queue/job-buffer/sql-job-buffer-storage-strategy.ts

@@ -0,0 +1,25 @@
+import { Injector } from '../../common/injector';
+import { TransactionalConnection } from '../../connection/transactional-connection';
+import { Job } from '../job';
+
+import { JobBufferStorageStrategy } from './job-buffer-storage-strategy';
+
+export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
+    private connection: TransactionalConnection;
+
+    init(injector: Injector) {
+        this.connection = injector.get(TransactionalConnection);
+    }
+
+    add(processorId: string, job: Job): Promise<Job> {
+        return Promise.resolve(job);
+    }
+
+    bufferSize(processorIds?: string[]): Promise<number> {
+        return Promise.resolve(0);
+    }
+
+    flush(processorIds?: string[]): Promise<void> {
+        return Promise.resolve(undefined);
+    }
+}

+ 3 - 2
packages/core/src/job-queue/job-queue.module.ts

@@ -2,11 +2,12 @@ import { Module } from '@nestjs/common';
 
 
 import { ConfigModule } from '../config/config.module';
 import { ConfigModule } from '../config/config.module';
 
 
+import { JobBuffer } from './job-buffer/job-buffer';
 import { JobQueueService } from './job-queue.service';
 import { JobQueueService } from './job-queue.service';
 
 
 @Module({
 @Module({
     imports: [ConfigModule],
     imports: [ConfigModule],
-    providers: [JobQueueService],
-    exports: [JobQueueService],
+    providers: [JobQueueService, JobBuffer],
+    exports: [JobQueueService, JobBuffer],
 })
 })
 export class JobQueueModule {}
 export class JobQueueModule {}

+ 3 - 2
packages/core/src/job-queue/job-queue.service.ts

@@ -4,6 +4,7 @@ import { JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types
 import { ConfigService, JobQueueStrategy, Logger } from '../config';
 import { ConfigService, JobQueueStrategy, Logger } from '../config';
 
 
 import { loggerCtx } from './constants';
 import { loggerCtx } from './constants';
+import { JobBuffer } from './job-buffer/job-buffer';
 import { JobQueue } from './job-queue';
 import { JobQueue } from './job-queue';
 import { CreateQueueOptions, JobData } from './types';
 import { CreateQueueOptions, JobData } from './types';
 
 
@@ -51,7 +52,7 @@ export class JobQueueService implements OnModuleDestroy {
         return this.configService.jobQueueOptions.jobQueueStrategy;
         return this.configService.jobQueueOptions.jobQueueStrategy;
     }
     }
 
 
-    constructor(private configService: ConfigService) {}
+    constructor(private configService: ConfigService, private jobBuffer: JobBuffer) {}
 
 
     /** @internal */
     /** @internal */
     onModuleDestroy() {
     onModuleDestroy() {
@@ -66,7 +67,7 @@ export class JobQueueService implements OnModuleDestroy {
     async createQueue<Data extends JobData<Data>>(
     async createQueue<Data extends JobData<Data>>(
         options: CreateQueueOptions<Data>,
         options: CreateQueueOptions<Data>,
     ): Promise<JobQueue<Data>> {
     ): Promise<JobQueue<Data>> {
-        const queue = new JobQueue(options, this.jobQueueStrategy);
+        const queue = new JobQueue(options, this.jobQueueStrategy, this.jobBuffer);
         if (this.hasStarted && this.shouldStartQueue(queue.name)) {
         if (this.hasStarted && this.shouldStartQueue(queue.name)) {
             await queue.start();
             await queue.start();
         }
         }

+ 17 - 6
packages/core/src/job-queue/job-queue.ts

@@ -6,6 +6,7 @@ import { JobQueueStrategy } from '../config';
 import { Logger } from '../config/logger/vendure-logger';
 import { Logger } from '../config/logger/vendure-logger';
 
 
 import { Job } from './job';
 import { Job } from './job';
+import { JobBuffer } from './job-buffer/job-buffer';
 import { SubscribableJob } from './subscribable-job';
 import { SubscribableJob } from './subscribable-job';
 import { CreateQueueOptions, JobConfig, JobData } from './types';
 import { CreateQueueOptions, JobConfig, JobData } from './types';
 
 
@@ -32,7 +33,11 @@ export class JobQueue<Data extends JobData<Data> = {}> {
         return this.running;
         return this.running;
     }
     }
 
 
-    constructor(private options: CreateQueueOptions<Data>, private jobQueueStrategy: JobQueueStrategy) {}
+    constructor(
+        private options: CreateQueueOptions<Data>,
+        private jobQueueStrategy: JobQueueStrategy,
+        private jobBuffer: JobBuffer,
+    ) {}
 
 
     /** @internal */
     /** @internal */
     async start() {
     async start() {
@@ -91,11 +96,17 @@ export class JobQueue<Data extends JobData<Data> = {}> {
             queueName: this.options.name,
             queueName: this.options.name,
             retries: options?.retries ?? 0,
             retries: options?.retries ?? 0,
         });
         });
-        try {
-            const addedJob = await this.jobQueueStrategy.add(job);
-            return new SubscribableJob(addedJob, this.jobQueueStrategy);
-        } catch (err) {
-            Logger.error(`Could not add Job to "${this.name}" queue`, undefined, err.stack);
+
+        const isBuffered = await this.jobBuffer.add(job);
+        if (!isBuffered) {
+            try {
+                const addedJob = await this.jobQueueStrategy.add(job);
+                return new SubscribableJob(addedJob, this.jobQueueStrategy);
+            } catch (err) {
+                Logger.error(`Could not add Job to "${this.name}" queue`, undefined, err.stack);
+                return new SubscribableJob(job, this.jobQueueStrategy);
+            }
+        } else {
             return new SubscribableJob(job, this.jobQueueStrategy);
             return new SubscribableJob(job, this.jobQueueStrategy);
         }
         }
     }
     }

+ 2 - 2
packages/core/src/service/services/collection.service.ts

@@ -442,8 +442,8 @@ export class CollectionService implements OnModuleInit {
     /**
     /**
      * Applies the CollectionFilters
      * Applies the CollectionFilters
      *
      *
-     * If applyToChangedVariantsOnly (default: true) is true, than apply collection job will process only changed variants
-     * If applyToChangedVariantsOnly (default: true) is false, than apply collection job will process all variants
+     * If applyToChangedVariantsOnly (default: true) is true, then apply collection job will process only changed variants
+     * If applyToChangedVariantsOnly (default: true) is false, then apply collection job will process all variants
      * This param is used when we update collection and collection filters are changed to update all
      * This param is used when we update collection and collection filters are changed to update all
      * variants (because other attributes of collection can be changed https://github.com/vendure-ecommerce/vendure/issues/1015)
      * variants (because other attributes of collection can be changed https://github.com/vendure-ecommerce/vendure/issues/1015)
      */
      */

+ 18 - 17
packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts

@@ -632,17 +632,13 @@ export type CreatePaymentMethodInput = {
     handler: ConfigurableOperationInput;
     handler: ConfigurableOperationInput;
 };
 };
 
 
-export type CreateProductCustomFieldsInput = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type CreateProductInput = {
 export type CreateProductInput = {
     featuredAssetId?: Maybe<Scalars['ID']>;
     featuredAssetId?: Maybe<Scalars['ID']>;
     enabled?: Maybe<Scalars['Boolean']>;
     enabled?: Maybe<Scalars['Boolean']>;
     assetIds?: Maybe<Array<Scalars['ID']>>;
     assetIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     translations: Array<ProductTranslationInput>;
     translations: Array<ProductTranslationInput>;
-    customFields?: Maybe<CreateProductCustomFieldsInput>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type CreateProductOptionGroupInput = {
 export type CreateProductOptionGroupInput = {
@@ -1624,6 +1620,11 @@ export type Job = Node & {
     attempts: Scalars['Int'];
     attempts: Scalars['Int'];
 };
 };
 
 
+export type JobBufferSize = {
+    processorId: Scalars['String'];
+    size: Scalars['Int'];
+};
+
 export type JobFilterParameter = {
 export type JobFilterParameter = {
     createdAt?: Maybe<DateOperators>;
     createdAt?: Maybe<DateOperators>;
     startedAt?: Maybe<DateOperators>;
     startedAt?: Maybe<DateOperators>;
@@ -2194,6 +2195,7 @@ export type Mutation = {
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     removeSettledJobs: Scalars['Int'];
     removeSettledJobs: Scalars['Int'];
     cancelJob: Job;
     cancelJob: Job;
+    flushBufferedJobs: Success;
     settlePayment: SettlePaymentResult;
     settlePayment: SettlePaymentResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     cancelOrder: CancelOrderResult;
     cancelOrder: CancelOrderResult;
@@ -2501,6 +2503,10 @@ export type MutationCancelJobArgs = {
     jobId: Scalars['ID'];
     jobId: Scalars['ID'];
 };
 };
 
 
+export type MutationFlushBufferedJobsArgs = {
+    processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
 export type MutationSettlePaymentArgs = {
 export type MutationSettlePaymentArgs = {
     id: Scalars['ID'];
     id: Scalars['ID'];
 };
 };
@@ -3375,17 +3381,13 @@ export type Product = Node & {
     facetValues: Array<FacetValue>;
     facetValues: Array<FacetValue>;
     translations: Array<ProductTranslation>;
     translations: Array<ProductTranslation>;
     collections: Array<Collection>;
     collections: Array<Collection>;
-    customFields?: Maybe<ProductCustomFields>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type ProductVariantListArgs = {
 export type ProductVariantListArgs = {
     options?: Maybe<ProductVariantListOptions>;
     options?: Maybe<ProductVariantListOptions>;
 };
 };
 
 
-export type ProductCustomFields = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type ProductFilterParameter = {
 export type ProductFilterParameter = {
     enabled?: Maybe<BooleanOperators>;
     enabled?: Maybe<BooleanOperators>;
     createdAt?: Maybe<DateOperators>;
     createdAt?: Maybe<DateOperators>;
@@ -3394,7 +3396,6 @@ export type ProductFilterParameter = {
     name?: Maybe<StringOperators>;
     name?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     slug?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
     description?: Maybe<StringOperators>;
-    coffeeProfileWithMilk?: Maybe<StringOperators>;
 };
 };
 
 
 export type ProductList = PaginatedList & {
 export type ProductList = PaginatedList & {
@@ -3478,7 +3479,6 @@ export type ProductSortParameter = {
     name?: Maybe<SortOrder>;
     name?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     slug?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
     description?: Maybe<SortOrder>;
-    coffeeProfileWithMilk?: Maybe<SortOrder>;
 };
 };
 
 
 export type ProductTranslation = {
 export type ProductTranslation = {
@@ -3678,6 +3678,7 @@ export type Query = {
     jobs: JobList;
     jobs: JobList;
     jobsById: Array<Job>;
     jobsById: Array<Job>;
     jobQueues: Array<JobQueue>;
     jobQueues: Array<JobQueue>;
+    jobBufferSize: Array<JobBufferSize>;
     order?: Maybe<Order>;
     order?: Maybe<Order>;
     orders: OrderList;
     orders: OrderList;
     paymentMethods: PaymentMethodList;
     paymentMethods: PaymentMethodList;
@@ -3791,6 +3792,10 @@ export type QueryJobsByIdArgs = {
     jobIds: Array<Scalars['ID']>;
     jobIds: Array<Scalars['ID']>;
 };
 };
 
 
+export type QueryJobBufferSizeArgs = {
+    processorIds?: Maybe<Array<Scalars['String']>>;
+};
+
 export type QueryOrderArgs = {
 export type QueryOrderArgs = {
     id: Scalars['ID'];
     id: Scalars['ID'];
 };
 };
@@ -4623,10 +4628,6 @@ export type UpdatePaymentMethodInput = {
     handler?: Maybe<ConfigurableOperationInput>;
     handler?: Maybe<ConfigurableOperationInput>;
 };
 };
 
 
-export type UpdateProductCustomFieldsInput = {
-    coffeeProfileWithMilk?: Maybe<Scalars['String']>;
-};
-
 export type UpdateProductInput = {
 export type UpdateProductInput = {
     id: Scalars['ID'];
     id: Scalars['ID'];
     enabled?: Maybe<Scalars['Boolean']>;
     enabled?: Maybe<Scalars['Boolean']>;
@@ -4634,7 +4635,7 @@ export type UpdateProductInput = {
     assetIds?: Maybe<Array<Scalars['ID']>>;
     assetIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     facetValueIds?: Maybe<Array<Scalars['ID']>>;
     translations?: Maybe<Array<ProductTranslationInput>>;
     translations?: Maybe<Array<ProductTranslationInput>>;
-    customFields?: Maybe<UpdateProductCustomFieldsInput>;
+    customFields?: Maybe<Scalars['JSON']>;
 };
 };
 
 
 export type UpdateProductOptionGroupInput = {
 export type UpdateProductOptionGroupInput = {

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 0 - 0
schema-admin.json


파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 0 - 0
schema-shop.json


이 변경점에서 너무 많은 파일들이 변경되어 몇몇 파일들은 표시되지 않았습니다.