Просмотр исходного кода

feat(core): Redesign JobQueue to allow persistence, concurrency etc

Relates to #282

BREAKING CHANGE: The JobQueueService has been completely re-designed. In the unlikely event that you are using this service in your Plugins, please see the API documentation on how to use it now.
Michael Bromley 5 лет назад
Родитель
Сommit
7acf5325ca
50 измененных файлов с 1530 добавлено и 809 удалено
  1. 6 0
      e2e-common/test-config.ts
  2. 65 25
      packages/admin-ui/src/lib/core/src/common/generated-types.ts
  3. 6 0
      packages/admin-ui/src/lib/core/src/common/introspection-result.ts
  4. 6 4
      packages/admin-ui/src/lib/core/src/data/definitions/settings-definitions.ts
  5. 49 14
      packages/asset-server-plugin/e2e/graphql/generated-e2e-asset-server-plugin-types.ts
  6. 50 14
      packages/common/src/generated-types.ts
  7. 19 2
      packages/common/src/shared-types.ts
  8. 28 27
      packages/core/e2e/collection.e2e-spec.ts
  9. 54 16
      packages/core/e2e/graphql/generated-e2e-admin-types.ts
  10. 7 2
      packages/core/e2e/graphql/shared-definitions.ts
  11. 2 2
      packages/core/e2e/utils/await-running-jobs.ts
  12. 2 0
      packages/core/src/api/api-internal-modules.ts
  13. 8 9
      packages/core/src/api/common/request-context.ts
  14. 16 5
      packages/core/src/api/resolvers/admin/job.resolver.ts
  15. 16 10
      packages/core/src/api/schema/admin-api/job.api.graphql
  16. 1 1
      packages/core/src/api/schema/admin-api/product-search.api.graphql
  17. 1 0
      packages/core/src/config/config.service.mock.ts
  18. 5 0
      packages/core/src/config/config.service.ts
  19. 6 0
      packages/core/src/config/default-config.ts
  20. 24 0
      packages/core/src/config/job-queue/job-queue-strategy.ts
  21. 27 0
      packages/core/src/config/vendure-config.ts
  22. 1 0
      packages/core/src/index.ts
  23. 4 0
      packages/core/src/job-queue/index.ts
  24. 12 0
      packages/core/src/job-queue/job-queue.module.ts
  25. 285 0
      packages/core/src/job-queue/job-queue.service.spec.ts
  26. 84 0
      packages/core/src/job-queue/job-queue.service.ts
  27. 99 0
      packages/core/src/job-queue/job-queue.ts
  28. 173 0
      packages/core/src/job-queue/job.ts
  29. 29 0
      packages/core/src/job-queue/sql-job-queue-strategy.ts
  30. 62 0
      packages/core/src/job-queue/testing-job-queue-strategy.ts
  31. 63 0
      packages/core/src/job-queue/types.ts
  32. 28 22
      packages/core/src/plugin/default-search-plugin/default-search-plugin.ts
  33. 2 3
      packages/core/src/plugin/default-search-plugin/fulltext-search.resolver.ts
  34. 5 7
      packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts
  35. 82 82
      packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts
  36. 38 11
      packages/core/src/plugin/default-search-plugin/types.ts
  37. 3 2
      packages/core/src/plugin/plugin-common.module.ts
  38. 0 185
      packages/core/src/service/helpers/job-manager/job-manager.spec.ts
  39. 0 96
      packages/core/src/service/helpers/job-manager/job-manager.ts
  40. 0 31
      packages/core/src/service/helpers/job-manager/job.spec.ts
  41. 0 50
      packages/core/src/service/helpers/job-manager/job.ts
  42. 0 2
      packages/core/src/service/index.ts
  43. 2 3
      packages/core/src/service/service.module.ts
  44. 74 54
      packages/core/src/service/services/collection.service.ts
  45. 0 74
      packages/core/src/service/services/job.service.ts
  46. 9 14
      packages/core/src/service/services/search.service.ts
  47. 3 0
      packages/core/src/service/types/collection-messages.ts
  48. 22 19
      packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts
  49. 52 23
      packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts
  50. 0 0
      schema-admin.json

+ 6 - 0
e2e-common/test-config.ts

@@ -3,6 +3,8 @@ import { MysqlInitializer, PostgresInitializer, registerInitializer, SqljsInitia
 import path from 'path';
 import { ConnectionOptions } from 'typeorm';
 
+import { TestingJobQueueStrategy } from '../packages/core/src/job-queue/testing-job-queue-strategy';
+
 import { getPackageDir } from './get-package-dir';
 
 /**
@@ -32,6 +34,10 @@ export const testConfig = mergeConfig(defaultTestConfig, {
     importExportOptions: {
         importAssetsDir: path.join(packageDir, 'fixtures/assets'),
     },
+    jobQueueOptions: {
+        jobQueueStrategy: new TestingJobQueueStrategy(),
+        pollInterval: 10,
+    },
     dbConnectionOptions: getDbConfig(),
 });
 

+ 65 - 25
packages/admin-ui/src/lib/core/src/common/generated-types.ts

@@ -1266,28 +1266,58 @@ export type IntCustomFieldConfig = CustomField & {
   step?: Maybe<Scalars['Int']>;
 };
 
-export type JobInfo = {
-   __typename?: 'JobInfo';
-  id: Scalars['String'];
+export type Job = Node & {
+   __typename?: 'Job';
+  id: Scalars['ID'];
   name: Scalars['String'];
   state: JobState;
   progress: Scalars['Float'];
   metadata?: Maybe<Scalars['JSON']>;
   result?: Maybe<Scalars['JSON']>;
-  started?: Maybe<Scalars['DateTime']>;
-  ended?: Maybe<Scalars['DateTime']>;
-  duration?: Maybe<Scalars['Int']>;
+  error?: Maybe<Scalars['JSON']>;
+  started: Scalars['DateTime'];
+  settled?: Maybe<Scalars['DateTime']>;
+  isSettled: Scalars['Boolean'];
+  duration: Scalars['Int'];
+};
+
+export type JobFilterParameter = {
+  name?: Maybe<StringOperators>;
+  state?: Maybe<StringOperators>;
+  progress?: Maybe<NumberOperators>;
+  started?: Maybe<DateOperators>;
+  settled?: Maybe<DateOperators>;
+  isSettled?: Maybe<BooleanOperators>;
+  duration?: Maybe<NumberOperators>;
+};
+
+export type JobList = PaginatedList & {
+   __typename?: 'JobList';
+  items: Array<Job>;
+  totalItems: Scalars['Int'];
 };
 
-export type JobListInput = {
-  state?: Maybe<JobState>;
-  ids?: Maybe<Array<Scalars['String']>>;
+export type JobListOptions = {
+  skip?: Maybe<Scalars['Int']>;
+  take?: Maybe<Scalars['Int']>;
+  sort?: Maybe<JobSortParameter>;
+  filter?: Maybe<JobFilterParameter>;
+};
+
+export type JobSortParameter = {
+  id?: Maybe<SortOrder>;
+  name?: Maybe<SortOrder>;
+  progress?: Maybe<SortOrder>;
+  started?: Maybe<SortOrder>;
+  settled?: Maybe<SortOrder>;
+  duration?: Maybe<SortOrder>;
 };
 
 export enum JobState {
   PENDING = 'PENDING',
   RUNNING = 'RUNNING',
   COMPLETED = 'COMPLETED',
+  RETRYING = 'RETRYING',
   FAILED = 'FAILED'
 }
 
@@ -1786,7 +1816,7 @@ export type Mutation = {
   /** Move a Collection to a different parent or index */
   moveCollection: Collection;
   refundOrder: Refund;
-  reindex: JobInfo;
+  reindex: Job;
   /** Remove Customers from a CustomerGroup */
   removeCustomersFromGroup: CustomerGroup;
   /** Remove members from a Zone */
@@ -2808,8 +2838,9 @@ export type Query = {
   facet?: Maybe<Facet>;
   facets: FacetList;
   globalSettings: GlobalSettings;
-  job?: Maybe<JobInfo>;
-  jobs: Array<JobInfo>;
+  job?: Maybe<Job>;
+  jobs: JobList;
+  jobsById: Array<Job>;
   me?: Maybe<CurrentUser>;
   networkStatus: NetworkStatus;
   order?: Maybe<Order>;
@@ -2916,12 +2947,17 @@ export type QueryFacetsArgs = {
 
 
 export type QueryJobArgs = {
-  jobId: Scalars['String'];
+  jobId: Scalars['ID'];
 };
 
 
 export type QueryJobsArgs = {
-  input?: Maybe<JobListInput>;
+  input?: Maybe<JobListOptions>;
+};
+
+
+export type QueryJobsByIdArgs = {
+  jobIds: Array<Scalars['ID']>;
 };
 
 
@@ -6057,34 +6093,37 @@ export type GetServerConfigQuery = (
 );
 
 export type JobInfoFragment = (
-  { __typename?: 'JobInfo' }
-  & Pick<JobInfo, 'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'>
+  { __typename?: 'Job' }
+  & Pick<Job, 'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'>
 );
 
 export type GetJobInfoQueryVariables = {
-  id: Scalars['String'];
+  id: Scalars['ID'];
 };
 
 
 export type GetJobInfoQuery = (
   { __typename?: 'Query' }
   & { job?: Maybe<(
-    { __typename?: 'JobInfo' }
+    { __typename?: 'Job' }
     & JobInfoFragment
   )> }
 );
 
 export type GetAllJobsQueryVariables = {
-  input?: Maybe<JobListInput>;
+  input?: Maybe<JobListOptions>;
 };
 
 
 export type GetAllJobsQuery = (
   { __typename?: 'Query' }
-  & { jobs: Array<(
-    { __typename?: 'JobInfo' }
-    & JobInfoFragment
-  )> }
+  & { jobs: (
+    { __typename?: 'JobList' }
+    & { items: Array<(
+      { __typename?: 'Job' }
+      & JobInfoFragment
+    )> }
+  ) }
 );
 
 export type ReindexMutationVariables = {};
@@ -6093,7 +6132,7 @@ export type ReindexMutationVariables = {};
 export type ReindexMutation = (
   { __typename?: 'Mutation' }
   & { reindex: (
-    { __typename?: 'JobInfo' }
+    { __typename?: 'Job' }
     & JobInfoFragment
   ) }
 );
@@ -7260,7 +7299,8 @@ export namespace GetJobInfo {
 export namespace GetAllJobs {
   export type Variables = GetAllJobsQueryVariables;
   export type Query = GetAllJobsQuery;
-  export type Jobs = JobInfoFragment;
+  export type Jobs = GetAllJobsQuery['jobs'];
+  export type Items = JobInfoFragment;
 }
 
 export namespace Reindex {

+ 6 - 0
packages/admin-ui/src/lib/core/src/common/introspection-result.ts

@@ -108,6 +108,9 @@ const result: IntrospectionResultData = {
                     {
                         name: 'Facet',
                     },
+                    {
+                        name: 'Job',
+                    },
                     {
                         name: 'PaymentMethod',
                     },
@@ -150,6 +153,9 @@ const result: IntrospectionResultData = {
                     {
                         name: 'FacetList',
                     },
+                    {
+                        name: 'JobList',
+                    },
                     {
                         name: 'PaymentMethodList',
                     },

+ 6 - 4
packages/admin-ui/src/lib/core/src/data/definitions/settings-definitions.ts

@@ -571,7 +571,7 @@ export const GET_SERVER_CONFIG = gql`
 `;
 
 export const JOB_INFO_FRAGMENT = gql`
-    fragment JobInfo on JobInfo {
+    fragment JobInfo on Job {
         id
         name
         state
@@ -582,7 +582,7 @@ export const JOB_INFO_FRAGMENT = gql`
 `;
 
 export const GET_JOB_INFO = gql`
-    query GetJobInfo($id: String!) {
+    query GetJobInfo($id: ID!) {
         job(jobId: $id) {
             ...JobInfo
         }
@@ -591,9 +591,11 @@ export const GET_JOB_INFO = gql`
 `;
 
 export const GET_ALL_JOBS = gql`
-    query GetAllJobs($input: JobListInput) {
+    query GetAllJobs($input: JobListOptions) {
         jobs(input: $input) {
-            ...JobInfo
+            items {
+                ...JobInfo
+            }
         }
     }
     ${JOB_INFO_FRAGMENT}

+ 49 - 14
packages/asset-server-plugin/e2e/graphql/generated-e2e-asset-server-plugin-types.ts

@@ -1262,28 +1262,58 @@ export type IntCustomFieldConfig = CustomField & {
     step?: Maybe<Scalars['Int']>;
 };
 
-export type JobInfo = {
-    __typename?: 'JobInfo';
-    id: Scalars['String'];
+export type Job = Node & {
+    __typename?: 'Job';
+    id: Scalars['ID'];
     name: Scalars['String'];
     state: JobState;
     progress: Scalars['Float'];
     metadata?: Maybe<Scalars['JSON']>;
     result?: Maybe<Scalars['JSON']>;
-    started?: Maybe<Scalars['DateTime']>;
-    ended?: Maybe<Scalars['DateTime']>;
-    duration?: Maybe<Scalars['Int']>;
+    error?: Maybe<Scalars['JSON']>;
+    started: Scalars['DateTime'];
+    settled?: Maybe<Scalars['DateTime']>;
+    isSettled: Scalars['Boolean'];
+    duration: Scalars['Int'];
+};
+
+export type JobFilterParameter = {
+    name?: Maybe<StringOperators>;
+    state?: Maybe<StringOperators>;
+    progress?: Maybe<NumberOperators>;
+    started?: Maybe<DateOperators>;
+    settled?: Maybe<DateOperators>;
+    isSettled?: Maybe<BooleanOperators>;
+    duration?: Maybe<NumberOperators>;
+};
+
+export type JobList = PaginatedList & {
+    __typename?: 'JobList';
+    items: Array<Job>;
+    totalItems: Scalars['Int'];
 };
 
-export type JobListInput = {
-    state?: Maybe<JobState>;
-    ids?: Maybe<Array<Scalars['String']>>;
+export type JobListOptions = {
+    skip?: Maybe<Scalars['Int']>;
+    take?: Maybe<Scalars['Int']>;
+    sort?: Maybe<JobSortParameter>;
+    filter?: Maybe<JobFilterParameter>;
+};
+
+export type JobSortParameter = {
+    id?: Maybe<SortOrder>;
+    name?: Maybe<SortOrder>;
+    progress?: Maybe<SortOrder>;
+    started?: Maybe<SortOrder>;
+    settled?: Maybe<SortOrder>;
+    duration?: Maybe<SortOrder>;
 };
 
 export enum JobState {
     PENDING = 'PENDING',
     RUNNING = 'RUNNING',
     COMPLETED = 'COMPLETED',
+    RETRYING = 'RETRYING',
     FAILED = 'FAILED',
 }
 
@@ -1776,7 +1806,7 @@ export type Mutation = {
     createProductOption: ProductOption;
     /** Create a new ProductOption within a ProductOptionGroup */
     updateProductOption: ProductOption;
-    reindex: JobInfo;
+    reindex: Job;
     /** Create a new Product */
     createProduct: Product;
     /** Update an existing Product */
@@ -2695,8 +2725,9 @@ export type Query = {
     facets: FacetList;
     facet?: Maybe<Facet>;
     globalSettings: GlobalSettings;
-    job?: Maybe<JobInfo>;
-    jobs: Array<JobInfo>;
+    job?: Maybe<Job>;
+    jobs: JobList;
+    jobsById: Array<Job>;
     order?: Maybe<Order>;
     orders: OrderList;
     paymentMethods: PaymentMethodList;
@@ -2784,11 +2815,15 @@ export type QueryFacetArgs = {
 };
 
 export type QueryJobArgs = {
-    jobId: Scalars['String'];
+    jobId: Scalars['ID'];
 };
 
 export type QueryJobsArgs = {
-    input?: Maybe<JobListInput>;
+    input?: Maybe<JobListOptions>;
+};
+
+export type QueryJobsByIdArgs = {
+    jobIds: Array<Scalars['ID']>;
 };
 
 export type QueryOrderArgs = {

+ 50 - 14
packages/common/src/generated-types.ts

@@ -1258,28 +1258,58 @@ export type IntCustomFieldConfig = CustomField & {
   step?: Maybe<Scalars['Int']>;
 };
 
-export type JobInfo = {
-   __typename?: 'JobInfo';
-  id: Scalars['String'];
+export type Job = Node & {
+   __typename?: 'Job';
+  id: Scalars['ID'];
   name: Scalars['String'];
   state: JobState;
   progress: Scalars['Float'];
   metadata?: Maybe<Scalars['JSON']>;
   result?: Maybe<Scalars['JSON']>;
-  started?: Maybe<Scalars['DateTime']>;
-  ended?: Maybe<Scalars['DateTime']>;
-  duration?: Maybe<Scalars['Int']>;
+  error?: Maybe<Scalars['JSON']>;
+  started: Scalars['DateTime'];
+  settled?: Maybe<Scalars['DateTime']>;
+  isSettled: Scalars['Boolean'];
+  duration: Scalars['Int'];
 };
 
-export type JobListInput = {
-  state?: Maybe<JobState>;
-  ids?: Maybe<Array<Scalars['String']>>;
+export type JobFilterParameter = {
+  name?: Maybe<StringOperators>;
+  state?: Maybe<StringOperators>;
+  progress?: Maybe<NumberOperators>;
+  started?: Maybe<DateOperators>;
+  settled?: Maybe<DateOperators>;
+  isSettled?: Maybe<BooleanOperators>;
+  duration?: Maybe<NumberOperators>;
+};
+
+export type JobList = PaginatedList & {
+   __typename?: 'JobList';
+  items: Array<Job>;
+  totalItems: Scalars['Int'];
+};
+
+export type JobListOptions = {
+  skip?: Maybe<Scalars['Int']>;
+  take?: Maybe<Scalars['Int']>;
+  sort?: Maybe<JobSortParameter>;
+  filter?: Maybe<JobFilterParameter>;
+};
+
+export type JobSortParameter = {
+  id?: Maybe<SortOrder>;
+  name?: Maybe<SortOrder>;
+  progress?: Maybe<SortOrder>;
+  started?: Maybe<SortOrder>;
+  settled?: Maybe<SortOrder>;
+  duration?: Maybe<SortOrder>;
 };
 
 export enum JobState {
   PENDING = 'PENDING',
   RUNNING = 'RUNNING',
   COMPLETED = 'COMPLETED',
+  RETRYING = 'RETRYING',
   FAILED = 'FAILED'
 }
 
@@ -1773,7 +1803,7 @@ export type Mutation = {
   createProductOption: ProductOption;
   /** Create a new ProductOption within a ProductOptionGroup */
   updateProductOption: ProductOption;
-  reindex: JobInfo;
+  reindex: Job;
   /** Create a new Product */
   createProduct: Product;
   /** Update an existing Product */
@@ -2769,8 +2799,9 @@ export type Query = {
   facets: FacetList;
   facet?: Maybe<Facet>;
   globalSettings: GlobalSettings;
-  job?: Maybe<JobInfo>;
-  jobs: Array<JobInfo>;
+  job?: Maybe<Job>;
+  jobs: JobList;
+  jobsById: Array<Job>;
   order?: Maybe<Order>;
   orders: OrderList;
   paymentMethods: PaymentMethodList;
@@ -2873,12 +2904,17 @@ export type QueryFacetArgs = {
 
 
 export type QueryJobArgs = {
-  jobId: Scalars['String'];
+  jobId: Scalars['ID'];
 };
 
 
 export type QueryJobsArgs = {
-  input?: Maybe<JobListInput>;
+  input?: Maybe<JobListOptions>;
+};
+
+
+export type QueryJobsByIdArgs = {
+  jobIds: Array<Scalars['ID']>;
 };
 
 

+ 19 - 2
packages/common/src/shared-types.ts

@@ -13,7 +13,7 @@ export type DeepPartial<T> = {
               ? Array<DeepPartial<U>>
               : T[P] extends ReadonlyArray<infer U>
               ? ReadonlyArray<DeepPartial<U>>
-              : DeepPartial<T[P]>)
+              : DeepPartial<T[P]>);
 };
 // tslint:enable:no-shadowed-variable
 
@@ -26,7 +26,7 @@ export type DeepRequired<T, U extends object | undefined = undefined> = T extend
     ? {
           [P in keyof T]-?: NonNullable<T[P]> extends NonNullable<U | Function | Type<any>>
               ? NonNullable<T[P]>
-              : DeepRequired<NonNullable<T[P]>, U>
+              : DeepRequired<NonNullable<T[P]>, U>;
       }
     : T;
 // tslint:enable:ban-types
@@ -39,6 +39,23 @@ export interface Type<T> extends Function {
     new (...args: any[]): T;
 }
 
+export type Json = null | boolean | number | string | Json[] | { [prop: string]: Json };
+
+/**
+ * @description
+ * A type representing JSON-compatible values.
+ * From https://github.com/microsoft/TypeScript/issues/1897#issuecomment-580962081
+ *
+ * @docsCategory common
+ */
+export type JsonCompatible<T> = {
+    [P in keyof T]: T[P] extends Json
+        ? T[P]
+        : Pick<T, P> extends Required<Pick<T, P>>
+        ? never
+        : JsonCompatible<T[P]>;
+};
+
 /**
  * A type describing the shape of a paginated list response
  */

+ 28 - 27
packages/core/e2e/collection.e2e-spec.ts

@@ -224,7 +224,7 @@ describe('Collection resolver', () => {
                 },
             });
 
-            expect(updateCollection.assets.map(a => a.id)).toEqual([assets[3].id, assets[0].id]);
+            expect(updateCollection.assets.map((a) => a.id)).toEqual([assets[3].id, assets[0].id]);
         });
 
         it('removes all assets', async () => {
@@ -342,7 +342,7 @@ describe('Collection resolver', () => {
             expect(result.moveCollection.parent!.id).toBe(electronicsCollection.id);
 
             const positions = await getChildrenOf(electronicsCollection.id);
-            expect(positions.map(i => i.id)).toEqual([pearCollection.id, computersCollection.id]);
+            expect(positions.map((i) => i.id)).toEqual([pearCollection.id, computersCollection.id]);
         });
 
         it('re-evaluates Collection contents on move', async () => {
@@ -350,7 +350,7 @@ describe('Collection resolver', () => {
                 GetCollectionProducts.Query,
                 GetCollectionProducts.Variables
             >(GET_COLLECTION_PRODUCT_VARIANTS, { id: pearCollection.id });
-            expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+            expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                 'Laptop 13 inch 8GB',
                 'Laptop 15 inch 8GB',
                 'Laptop 13 inch 16GB',
@@ -369,7 +369,7 @@ describe('Collection resolver', () => {
             });
 
             const afterResult = await getChildrenOf(electronicsCollection.id);
-            expect(afterResult.map(i => i.id)).toEqual([computersCollection.id, pearCollection.id]);
+            expect(afterResult.map((i) => i.id)).toEqual([computersCollection.id, pearCollection.id]);
         });
 
         it('alters the position in the current parent 2', async () => {
@@ -382,7 +382,7 @@ describe('Collection resolver', () => {
             });
 
             const afterResult = await getChildrenOf(electronicsCollection.id);
-            expect(afterResult.map(i => i.id)).toEqual([pearCollection.id, computersCollection.id]);
+            expect(afterResult.map((i) => i.id)).toEqual([pearCollection.id, computersCollection.id]);
         });
 
         it('corrects an out-of-bounds negative index value', async () => {
@@ -395,7 +395,7 @@ describe('Collection resolver', () => {
             });
 
             const afterResult = await getChildrenOf(electronicsCollection.id);
-            expect(afterResult.map(i => i.id)).toEqual([pearCollection.id, computersCollection.id]);
+            expect(afterResult.map((i) => i.id)).toEqual([pearCollection.id, computersCollection.id]);
         });
 
         it('corrects an out-of-bounds positive index value', async () => {
@@ -408,7 +408,7 @@ describe('Collection resolver', () => {
             });
 
             const afterResult = await getChildrenOf(electronicsCollection.id);
-            expect(afterResult.map(i => i.id)).toEqual([computersCollection.id, pearCollection.id]);
+            expect(afterResult.map((i) => i.id)).toEqual([computersCollection.id, pearCollection.id]);
         });
 
         it(
@@ -443,7 +443,7 @@ describe('Collection resolver', () => {
 
         async function getChildrenOf(parentId: string): Promise<Array<{ name: string; id: string }>> {
             const result = await adminClient.query<GetCollections.Query>(GET_COLLECTIONS);
-            return result.collections.items.filter(i => i.parent!.id === parentId);
+            return result.collections.items.filter((i) => i.parent!.id === parentId);
         }
     });
 
@@ -495,6 +495,7 @@ describe('Collection resolver', () => {
                 },
             );
             collectionToDeleteChild = result2.createCollection;
+            await awaitRunningJobs(adminClient);
         });
 
         it(
@@ -610,7 +611,7 @@ describe('Collection resolver', () => {
                 >(GET_COLLECTION_PRODUCT_VARIANTS, {
                     id: electronicsCollection.id,
                 });
-                expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Laptop 13 inch 8GB',
                     'Laptop 15 inch 8GB',
                     'Laptop 13 inch 16GB',
@@ -642,7 +643,7 @@ describe('Collection resolver', () => {
                 >(GET_COLLECTION_PRODUCT_VARIANTS, {
                     id: computersCollection.id,
                 });
-                expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Laptop 13 inch 8GB',
                     'Laptop 15 inch 8GB',
                     'Laptop 13 inch 16GB',
@@ -702,7 +703,7 @@ describe('Collection resolver', () => {
                     },
                 );
 
-                expect(collection!.productVariants.items.map(i => i.name)).toEqual(['Instant Camera']);
+                expect(collection!.productVariants.items.map((i) => i.name)).toEqual(['Instant Camera']);
             });
 
             it('photo OR pear', async () => {
@@ -744,7 +745,7 @@ describe('Collection resolver', () => {
                     },
                 );
 
-                expect(collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Laptop 13 inch 8GB',
                     'Laptop 15 inch 8GB',
                     'Laptop 13 inch 16GB',
@@ -799,7 +800,7 @@ describe('Collection resolver', () => {
                     },
                 );
 
-                expect(collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Laptop 13 inch 8GB',
                     'Laptop 15 inch 8GB',
                     'Laptop 13 inch 16GB',
@@ -855,7 +856,7 @@ describe('Collection resolver', () => {
                 >(GET_COLLECTION_PRODUCT_VARIANTS, {
                     id: collection.id,
                 });
-                expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Instant Camera',
                     'Camera Lens',
                     'SLR Camera',
@@ -871,7 +872,7 @@ describe('Collection resolver', () => {
                 >(GET_COLLECTION_PRODUCT_VARIANTS, {
                     id: collection.id,
                 });
-                expect(result.collection!.productVariants.items.map(i => i.name)).toEqual(['Camera Lens']);
+                expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual(['Camera Lens']);
             });
 
             it('endsWith operator', async () => {
@@ -883,7 +884,7 @@ describe('Collection resolver', () => {
                 >(GET_COLLECTION_PRODUCT_VARIANTS, {
                     id: collection.id,
                 });
-                expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Instant Camera',
                     'SLR Camera',
                 ]);
@@ -898,7 +899,7 @@ describe('Collection resolver', () => {
                 >(GET_COLLECTION_PRODUCT_VARIANTS, {
                     id: collection.id,
                 });
-                expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Laptop 13 inch 8GB',
                     'Laptop 15 inch 8GB',
                     'Laptop 13 inch 16GB',
@@ -961,7 +962,7 @@ describe('Collection resolver', () => {
                     GetCollectionProducts.Query,
                     GetCollectionProducts.Variables
                 >(GET_COLLECTION_PRODUCT_VARIANTS, { id: pearCollection.id });
-                expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Laptop 13 inch 8GB',
                     'Laptop 15 inch 8GB',
                     'Laptop 13 inch 16GB',
@@ -974,8 +975,8 @@ describe('Collection resolver', () => {
 
             it('updates contents when ProductVariant is updated', async () => {
                 const gamingPc240GB = products
-                    .find(p => p.name === 'Gaming PC')!
-                    .variants.find(v => v.name.includes('240GB'))!;
+                    .find((p) => p.name === 'Gaming PC')!
+                    .variants.find((v) => v.name.includes('240GB'))!;
                 await adminClient.query<UpdateProductVariants.Mutation, UpdateProductVariants.Variables>(
                     UPDATE_PRODUCT_VARIANTS,
                     {
@@ -994,7 +995,7 @@ describe('Collection resolver', () => {
                     GetCollectionProducts.Query,
                     GetCollectionProducts.Variables
                 >(GET_COLLECTION_PRODUCT_VARIANTS, { id: pearCollection.id });
-                expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Laptop 13 inch 8GB',
                     'Laptop 15 inch 8GB',
                     'Laptop 13 inch 16GB',
@@ -1008,8 +1009,8 @@ describe('Collection resolver', () => {
 
             it('correctly filters when ProductVariant and Product both have matching FacetValue', async () => {
                 const gamingPc240GB = products
-                    .find(p => p.name === 'Gaming PC')!
-                    .variants.find(v => v.name.includes('240GB'))!;
+                    .find((p) => p.name === 'Gaming PC')!
+                    .variants.find((v) => v.name.includes('240GB'))!;
                 await adminClient.query<UpdateProductVariants.Mutation, UpdateProductVariants.Variables>(
                     UPDATE_PRODUCT_VARIANTS,
                     {
@@ -1028,7 +1029,7 @@ describe('Collection resolver', () => {
                     GetCollectionProducts.Query,
                     GetCollectionProducts.Variables
                 >(GET_COLLECTION_PRODUCT_VARIANTS, { id: pearCollection.id });
-                expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+                expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                     'Laptop 13 inch 8GB',
                     'Laptop 15 inch 8GB',
                     'Laptop 13 inch 16GB',
@@ -1078,7 +1079,7 @@ describe('Collection resolver', () => {
                 GetCollectionProducts.Query,
                 GetCollectionProducts.Variables
             >(GET_COLLECTION_PRODUCT_VARIANTS, { id: pearElectronics.id });
-            expect(result.collection!.productVariants.items.map(i => i.name)).toEqual([
+            expect(result.collection!.productVariants.items.map((i) => i.name)).toEqual([
                 'Laptop 13 inch 8GB',
                 'Laptop 15 inch 8GB',
                 'Laptop 13 inch 16GB',
@@ -1120,7 +1121,7 @@ describe('Collection resolver', () => {
         >(GET_COLLECTION_PRODUCT_VARIANTS, {
             id: pearCollection.id,
         });
-        expect(collection!.productVariants.items.map(i => i.name)).toEqual([
+        expect(collection!.productVariants.items.map((i) => i.name)).toEqual([
             'Laptop 13 inch 8GB',
             'Laptop 15 inch 8GB',
             'Laptop 13 inch 16GB',
@@ -1131,7 +1132,7 @@ describe('Collection resolver', () => {
     });
 
     function getFacetValueId(code: string): string {
-        const match = facetValues.find(fv => fv.code === code);
+        const match = facetValues.find((fv) => fv.code === code);
         if (!match) {
             throw new Error(`Could not find a FacetValue with the code "${code}"`);
         }

+ 54 - 16
packages/core/e2e/graphql/generated-e2e-admin-types.ts

@@ -1262,28 +1262,58 @@ export type IntCustomFieldConfig = CustomField & {
     step?: Maybe<Scalars['Int']>;
 };
 
-export type JobInfo = {
-    __typename?: 'JobInfo';
-    id: Scalars['String'];
+export type Job = Node & {
+    __typename?: 'Job';
+    id: Scalars['ID'];
     name: Scalars['String'];
     state: JobState;
     progress: Scalars['Float'];
     metadata?: Maybe<Scalars['JSON']>;
     result?: Maybe<Scalars['JSON']>;
-    started?: Maybe<Scalars['DateTime']>;
-    ended?: Maybe<Scalars['DateTime']>;
-    duration?: Maybe<Scalars['Int']>;
+    error?: Maybe<Scalars['JSON']>;
+    started: Scalars['DateTime'];
+    settled?: Maybe<Scalars['DateTime']>;
+    isSettled: Scalars['Boolean'];
+    duration: Scalars['Int'];
 };
 
-export type JobListInput = {
-    state?: Maybe<JobState>;
-    ids?: Maybe<Array<Scalars['String']>>;
+export type JobFilterParameter = {
+    name?: Maybe<StringOperators>;
+    state?: Maybe<StringOperators>;
+    progress?: Maybe<NumberOperators>;
+    started?: Maybe<DateOperators>;
+    settled?: Maybe<DateOperators>;
+    isSettled?: Maybe<BooleanOperators>;
+    duration?: Maybe<NumberOperators>;
+};
+
+export type JobList = PaginatedList & {
+    __typename?: 'JobList';
+    items: Array<Job>;
+    totalItems: Scalars['Int'];
+};
+
+export type JobListOptions = {
+    skip?: Maybe<Scalars['Int']>;
+    take?: Maybe<Scalars['Int']>;
+    sort?: Maybe<JobSortParameter>;
+    filter?: Maybe<JobFilterParameter>;
+};
+
+export type JobSortParameter = {
+    id?: Maybe<SortOrder>;
+    name?: Maybe<SortOrder>;
+    progress?: Maybe<SortOrder>;
+    started?: Maybe<SortOrder>;
+    settled?: Maybe<SortOrder>;
+    duration?: Maybe<SortOrder>;
 };
 
 export enum JobState {
     PENDING = 'PENDING',
     RUNNING = 'RUNNING',
     COMPLETED = 'COMPLETED',
+    RETRYING = 'RETRYING',
     FAILED = 'FAILED',
 }
 
@@ -1776,7 +1806,7 @@ export type Mutation = {
     createProductOption: ProductOption;
     /** Create a new ProductOption within a ProductOptionGroup */
     updateProductOption: ProductOption;
-    reindex: JobInfo;
+    reindex: Job;
     /** Create a new Product */
     createProduct: Product;
     /** Update an existing Product */
@@ -2695,8 +2725,9 @@ export type Query = {
     facets: FacetList;
     facet?: Maybe<Facet>;
     globalSettings: GlobalSettings;
-    job?: Maybe<JobInfo>;
-    jobs: Array<JobInfo>;
+    job?: Maybe<Job>;
+    jobs: JobList;
+    jobsById: Array<Job>;
     order?: Maybe<Order>;
     orders: OrderList;
     paymentMethods: PaymentMethodList;
@@ -2784,11 +2815,15 @@ export type QueryFacetArgs = {
 };
 
 export type QueryJobArgs = {
-    jobId: Scalars['String'];
+    jobId: Scalars['ID'];
 };
 
 export type QueryJobsArgs = {
-    input?: Maybe<JobListInput>;
+    input?: Maybe<JobListOptions>;
+};
+
+export type QueryJobsByIdArgs = {
+    jobIds: Array<Scalars['ID']>;
 };
 
 export type QueryOrderArgs = {
@@ -4541,7 +4576,9 @@ export type GetStockMovementQuery = { __typename?: 'Query' } & {
 export type GetRunningJobsQueryVariables = {};
 
 export type GetRunningJobsQuery = { __typename?: 'Query' } & {
-    jobs: Array<{ __typename?: 'JobInfo' } & Pick<JobInfo, 'name' | 'state'>>;
+    jobs: { __typename?: 'JobList' } & {
+        items: Array<{ __typename?: 'Job' } & Pick<Job, 'id' | 'name' | 'state' | 'isSettled' | 'duration'>>;
+    };
 };
 
 export type CreatePromotionMutationVariables = {
@@ -6053,7 +6090,8 @@ export namespace GetStockMovement {
 export namespace GetRunningJobs {
     export type Variables = GetRunningJobsQueryVariables;
     export type Query = GetRunningJobsQuery;
-    export type Jobs = NonNullable<GetRunningJobsQuery['jobs'][0]>;
+    export type Jobs = GetRunningJobsQuery['jobs'];
+    export type Items = NonNullable<GetRunningJobsQuery['jobs']['items'][0]>;
 }
 
 export namespace CreatePromotion {

+ 7 - 2
packages/core/e2e/graphql/shared-definitions.ts

@@ -270,8 +270,13 @@ export const GET_STOCK_MOVEMENT = gql`
 export const GET_RUNNING_JOBS = gql`
     query GetRunningJobs {
         jobs {
-            name
-            state
+            items {
+                id
+                name
+                state
+                isSettled
+                duration
+            }
         }
     }
 `;

+ 2 - 2
packages/core/e2e/utils/await-running-jobs.ts

@@ -13,10 +13,10 @@ export async function awaitRunningJobs(adminClient: SimpleGraphQLClient, timeout
     let timedOut = false;
     // Allow a brief period for the jobs to start in the case that
     // e.g. event debouncing is used before triggering the job.
-    await new Promise(resolve => setTimeout(resolve, 100));
+    await new Promise((resolve) => setTimeout(resolve, 100));
     do {
         const { jobs } = await adminClient.query<GetRunningJobs.Query>(GET_RUNNING_JOBS);
-        runningJobs = jobs.filter(job => job.state !== JobState.COMPLETED).length;
+        runningJobs = jobs.items.filter((job) => !job.isSettled).length;
         timedOut = timeout < +new Date() - startTime;
     } while (runningJobs > 0 && !timedOut);
 }

+ 2 - 0
packages/core/src/api/api-internal-modules.ts

@@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
 
 import { ConfigModule } from '../config/config.module';
 import { DataImportModule } from '../data-import/data-import.module';
+import { JobQueueModule } from '../job-queue/job-queue.module';
 import { createDynamicGraphQlModulesForPlugins } from '../plugin/dynamic-plugin-api.module';
 import { ServiceModule } from '../service/service.module';
 
@@ -119,6 +120,7 @@ export class ApiSharedModule {}
 @Module({
     imports: [
         ApiSharedModule,
+        JobQueueModule,
         ServiceModule.forRoot(),
         DataImportModule,
         ...createDynamicGraphQlModulesForPlugins('admin'),

+ 8 - 9
packages/core/src/api/common/request-context.ts

@@ -1,6 +1,6 @@
 import { LanguageCode } from '@vendure/common/lib/generated-types';
-import { ID, Type } from '@vendure/common/lib/shared-types';
-import i18next, { TFunction } from 'i18next';
+import { ID, JsonCompatible } from '@vendure/common/lib/shared-types';
+import { TFunction } from 'i18next';
 
 import { DEFAULT_LANGUAGE_CODE } from '../../common/constants';
 import { Channel } from '../../entity/channel/channel.entity';
@@ -8,21 +8,20 @@ import { AnonymousSession } from '../../entity/session/anonymous-session.entity'
 import { AuthenticatedSession } from '../../entity/session/authenticated-session.entity';
 import { Session } from '../../entity/session/session.entity';
 import { User } from '../../entity/user/user.entity';
+import { JobData } from '../../job-queue/types';
 
 import { ApiType } from './get-api-type';
 
-export type ObjectOf<T> = { [K in keyof T]: T[K] };
-
-export interface SerializedRequestContext {
-    _session?: ObjectOf<Session> & {
-        user?: ObjectOf<User>;
+export type SerializedRequestContext = {
+    _session: JsonCompatible<Session> & {
+        user: JsonCompatible<User>;
     };
     _apiType: ApiType;
-    _channel: ObjectOf<Channel>;
+    _channel: JsonCompatible<Channel>;
     _languageCode: LanguageCode;
     _isAuthorized: boolean;
     _authorizedAsOwnerOnly: boolean;
-}
+};
 
 /**
  * @description

+ 16 - 5
packages/core/src/api/resolvers/admin/job.resolver.ts

@@ -1,22 +1,33 @@
 import { Args, Query, Resolver } from '@nestjs/graphql';
-import { Permission, QueryJobArgs, QueryJobsArgs } from '@vendure/common/lib/generated-types';
+import {
+    Permission,
+    QueryJobArgs,
+    QueryJobsArgs,
+    QueryJobsByIdArgs,
+} from '@vendure/common/lib/generated-types';
 
-import { JobService } from '../../../service/services/job.service';
+import { JobQueueService } from '../../../job-queue/job-queue.service';
 import { Allow } from '../../decorators/allow.decorator';
 
 @Resolver()
 export class JobResolver {
-    constructor(private jobService: JobService) {}
+    constructor(private jobService: JobQueueService) {}
 
     @Query()
     @Allow(Permission.Authenticated)
     job(@Args() args: QueryJobArgs) {
-        return this.jobService.getOne(args.jobId);
+        return this.jobService.getJob(args.jobId);
     }
 
     @Query()
     @Allow(Permission.Authenticated)
     jobs(@Args() args: QueryJobsArgs) {
-        return this.jobService.getAll(args.input || undefined);
+        return this.jobService.getJobs(args.input || undefined);
+    }
+
+    @Query()
+    @Allow(Permission.Authenticated)
+    jobsById(@Args() args: QueryJobsByIdArgs) {
+        return this.jobService.getJobsById(args.jobIds || undefined);
     }
 }

+ 16 - 10
packages/core/src/api/schema/admin-api/job.api.graphql

@@ -1,28 +1,34 @@
 type Query {
-    job(jobId: String!): JobInfo
-    jobs(input: JobListInput): [JobInfo!]!
+    job(jobId: ID!): Job
+    jobs(input: JobListOptions): JobList!
+    jobsById(jobIds: [ID!]!): [Job!]!
 }
 
 enum JobState {
     PENDING
     RUNNING
     COMPLETED
+    RETRYING
     FAILED
 }
 
-input JobListInput {
-    state: JobState
-    ids: [String!]
+input JobListOptions
+
+type JobList implements PaginatedList {
+    items: [Job!]!
+    totalItems: Int!
 }
 
-type JobInfo {
-    id: String!
+type Job implements Node {
+    id: ID!
     name: String!
     state: JobState!
     progress: Float!
     metadata: JSON
     result: JSON
-    started: DateTime
-    ended: DateTime
-    duration: Int
+    error: JSON
+    started: DateTime!
+    settled: DateTime
+    isSettled: Boolean!
+    duration: Int!
 }

+ 1 - 1
packages/core/src/api/schema/admin-api/product-search.api.graphql

@@ -3,7 +3,7 @@ type Query {
 }
 
 type Mutation {
-    reindex: JobInfo!
+    reindex: Job!
 }
 
 type SearchResult {

+ 1 - 0
packages/core/src/config/config.service.mock.ts

@@ -38,6 +38,7 @@ export class MockConfigService implements MockClass<ConfigService> {
     logger = {} as any;
     apolloServerPlugins = [];
     plugins = [];
+    jobQueueOptions = {};
 }
 
 export const ENCODED = 'encoded';

+ 5 - 0
packages/core/src/config/config.service.ts

@@ -13,6 +13,7 @@ import {
     AssetOptions,
     AuthOptions,
     ImportExportOptions,
+    JobQueueOptions,
     OrderOptions,
     PaymentOptions,
     PromotionOptions,
@@ -126,4 +127,8 @@ export class ConfigService implements VendureConfig {
     get workerOptions(): WorkerOptions {
         return this.activeConfig.workerOptions;
     }
+
+    get jobQueueOptions(): Required<JobQueueOptions> {
+        return this.activeConfig.jobQueueOptions;
+    }
 }

+ 6 - 0
packages/core/src/config/default-config.ts

@@ -3,6 +3,8 @@ import { LanguageCode } from '@vendure/common/lib/generated-types';
 import { DEFAULT_AUTH_TOKEN_HEADER_KEY } from '@vendure/common/lib/shared-constants';
 
 import { generatePublicId } from '../common/generate-public-id';
+import { SqlJobQueueStrategy } from '../job-queue/sql-job-queue-strategy';
+import { TestingJobQueueStrategy } from '../job-queue/testing-job-queue-strategy';
 
 import { DefaultAssetNamingStrategy } from './asset-naming-strategy/default-asset-naming-strategy';
 import { NoAssetPreviewStrategy } from './asset-preview-strategy/no-asset-preview-strategy';
@@ -92,6 +94,10 @@ export const defaultConfig: RuntimeVendureConfig = {
             port: 3020,
         },
     },
+    jobQueueOptions: {
+        jobQueueStrategy: new TestingJobQueueStrategy(),
+        pollInterval: 100,
+    },
     customFields: {
         Address: [],
         Collection: [],

+ 24 - 0
packages/core/src/config/job-queue/job-queue-strategy.ts

@@ -0,0 +1,24 @@
+import { ModuleRef } from '@nestjs/core';
+import { PaginatedList } from '@vendure/common/lib/shared-types';
+import { JobListOptions } from '@vendure/common/src/generated-types';
+import { Connection } from 'typeorm';
+
+import { Job } from '../../job-queue/job';
+
+/**
+ * @description
+ * Defines how the jobs in the {@link JobQueueService} are persisted and
+ * accessed. Custom strategies can be defined to make use of external
+ * services such as Redis.
+ *
+ * @docsCateogry JobQueue
+ */
+export interface JobQueueStrategy {
+    init?(moduleRef: ModuleRef): void | Promise<void>;
+    add(job: Job): Promise<Job>;
+    next(queueName: string): Promise<Job | undefined>;
+    update(job: Job): Promise<void>;
+    findOne(id: string): Promise<Job | undefined>;
+    findMany(options?: JobListOptions): Promise<PaginatedList<Job>>;
+    findManyById(ids: string[]): Promise<Job[]>;
+}

+ 27 - 0
packages/core/src/config/vendure-config.ts

@@ -17,6 +17,7 @@ import { AssetPreviewStrategy } from './asset-preview-strategy/asset-preview-str
 import { AssetStorageStrategy } from './asset-storage-strategy/asset-storage-strategy';
 import { CustomFields } from './custom-field/custom-field-types';
 import { EntityIdStrategy } from './entity-id-strategy/entity-id-strategy';
+import { JobQueueStrategy } from './job-queue/job-queue-strategy';
 import { VendureLogger } from './logger/vendure-logger';
 import { OrderMergeStrategy } from './order-merge-strategy/order-merge-strategy';
 import { PaymentMethodHandler } from './payment-method/payment-method-handler';
@@ -380,6 +381,26 @@ export interface WorkerOptions {
     options?: ClientOptions['options'];
 }
 
+/**
+ * @description
+ * Options related to the built-in job queue.
+ */
+export interface JobQueueOptions {
+    /**
+     * @description
+     * Defines how the jobs in the queue are persisted and accessed.
+     */
+    jobQueueStrategy?: JobQueueStrategy;
+    /**
+     * @description
+     * Defines the interval in ms used by the JobService to poll for new
+     * jobs in the queue to process.
+     *
+     * @default 100
+     */
+    pollInterval?: number;
+}
+
 /**
  * @description
  * All possible configuration options are defined by the
@@ -546,6 +567,11 @@ export interface VendureConfig {
      * Configures the Vendure Worker, which is used for long-running background tasks.
      */
     workerOptions?: WorkerOptions;
+    /**
+     * @description
+     * Configures how the job queue is persisted and processed.
+     */
+    jobQueueOptions?: JobQueueOptions;
 }
 
 /**
@@ -562,6 +588,7 @@ export interface RuntimeVendureConfig extends Required<VendureConfig> {
     importExportOptions: Required<ImportExportOptions>;
     orderOptions: Required<OrderOptions>;
     workerOptions: Required<WorkerOptions>;
+    jobQueueOptions: Required<JobQueueOptions>;
 }
 
 type DeepPartialSimple<T> = {

+ 1 - 0
packages/core/src/index.ts

@@ -4,6 +4,7 @@ export * from './api/index';
 export * from './common/index';
 export * from './config/index';
 export * from './event-bus/index';
+export * from './job-queue/index';
 export * from './plugin/index';
 export * from './entity/index';
 export * from './data-import/index';

+ 4 - 0
packages/core/src/job-queue/index.ts

@@ -0,0 +1,4 @@
+export * from './job';
+export * from './job-queue';
+export * from './job-queue.service';
+export * from './types';

+ 12 - 0
packages/core/src/job-queue/job-queue.module.ts

@@ -0,0 +1,12 @@
+import { Module } from '@nestjs/common';
+
+import { ConfigModule } from '../config/config.module';
+
+import { JobQueueService } from './job-queue.service';
+
+@Module({
+    imports: [ConfigModule],
+    providers: [JobQueueService],
+    exports: [JobQueueService],
+})
+export class JobQueueModule {}

+ 285 - 0
packages/core/src/job-queue/job-queue.service.spec.ts

@@ -0,0 +1,285 @@
+import { Test, TestingModule } from '@nestjs/testing';
+import { JobState } from '@vendure/common/lib/generated-types';
+import { Subject } from 'rxjs';
+
+import { ConfigService } from '../config/config.service';
+
+import { Job } from './job';
+import { JobQueueService } from './job-queue.service';
+import { TestingJobQueueStrategy } from './testing-job-queue-strategy';
+
+const queuePollInterval = 10;
+
+describe('JobQueueService', () => {
+    let jobQueueService: JobQueueService;
+    let module: TestingModule;
+
+    beforeEach(async () => {
+        module = await Test.createTestingModule({
+            providers: [{ provide: ConfigService, useClass: MockConfigService }, JobQueueService],
+        }).compile();
+
+        jobQueueService = module.get(JobQueueService);
+        await module.init();
+    });
+
+    afterEach(async () => {
+        await module.close();
+    });
+
+    it('data is passed into job', (cb) => {
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 1,
+            process: (job) => {
+                job.complete();
+                expect(job.data).toBe('hello');
+                cb();
+            },
+        });
+
+        testQueue.add('hello');
+    });
+
+    it('job marked as complete', async () => {
+        const subject = new Subject();
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 1,
+            process: (job) => {
+                subject.subscribe(() => {
+                    job.complete('yay');
+                });
+            },
+        });
+
+        const testJob = await testQueue.add('hello');
+        expect(testJob.state).toBe(JobState.PENDING);
+
+        await tick(queuePollInterval);
+        expect(testJob.state).toBe(JobState.RUNNING);
+
+        subject.next();
+        expect(testJob.state).toBe(JobState.COMPLETED);
+        expect(testJob.result).toBe('yay');
+
+        subject.complete();
+    });
+
+    it('job marked as failed when .fail() called', async () => {
+        const subject = new Subject();
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 1,
+            process: (job) => {
+                subject.subscribe(() => {
+                    job.fail('uh oh');
+                });
+            },
+        });
+
+        const testJob = await testQueue.add('hello');
+        expect(testJob.state).toBe(JobState.PENDING);
+
+        await tick(queuePollInterval);
+        expect(testJob.state).toBe(JobState.RUNNING);
+
+        subject.next();
+        expect(testJob.state).toBe(JobState.FAILED);
+        expect(testJob.error).toBe('uh oh');
+
+        subject.complete();
+    });
+
+    it('job marked as failed when sync error thrown', async () => {
+        const subject = new Subject();
+        const err = new Error('something bad happened');
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 1,
+            process: (job) => {
+                throw err;
+            },
+        });
+
+        const testJob = await testQueue.add('hello');
+        expect(testJob.state).toBe(JobState.PENDING);
+
+        await tick(queuePollInterval);
+        expect(testJob.state).toBe(JobState.FAILED);
+        expect(testJob.error).toBe(err.toString());
+
+        subject.complete();
+    });
+
+    it('job marked as failed when async error thrown', async () => {
+        const subject = new Subject();
+        const err = new Error('something bad happened');
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 1,
+            process: async (job) => {
+                throw err;
+            },
+        });
+
+        const testJob = await testQueue.add('hello');
+        expect(testJob.state).toBe(JobState.PENDING);
+
+        await tick(queuePollInterval);
+        expect(testJob.state).toBe(JobState.FAILED);
+        expect(testJob.error).toBe(err.toString());
+
+        subject.complete();
+    });
+
+    it('jobs processed in FIFO queue', async () => {
+        const subject = new Subject();
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 1,
+            process: (job) => {
+                subject.subscribe(() => {
+                    job.complete();
+                });
+            },
+        });
+
+        const testJob1 = await testQueue.add('1');
+        const testJob2 = await testQueue.add('2');
+        const testJob3 = await testQueue.add('3');
+
+        const getStates = () => [testJob1.state, testJob2.state, testJob3.state];
+
+        await tick(queuePollInterval);
+
+        expect(getStates()).toEqual([JobState.RUNNING, JobState.PENDING, JobState.PENDING]);
+
+        subject.next();
+        expect(getStates()).toEqual([JobState.COMPLETED, JobState.PENDING, JobState.PENDING]);
+
+        await tick(queuePollInterval);
+        expect(getStates()).toEqual([JobState.COMPLETED, JobState.RUNNING, JobState.PENDING]);
+
+        subject.next();
+        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
+
+        await tick(queuePollInterval);
+        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
+
+        subject.next();
+        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
+
+        subject.complete();
+    });
+
+    it('with concurrency', async () => {
+        const subject = new Subject();
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 2,
+            process: (job) => {
+                subject.subscribe(() => {
+                    job.complete();
+                });
+            },
+        });
+
+        const testJob1 = await testQueue.add('1');
+        const testJob2 = await testQueue.add('2');
+        const testJob3 = await testQueue.add('3');
+
+        const getStates = () => [testJob1.state, testJob2.state, testJob3.state];
+
+        await tick(queuePollInterval);
+
+        expect(getStates()).toEqual([JobState.RUNNING, JobState.RUNNING, JobState.PENDING]);
+
+        subject.next();
+        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
+
+        await tick(queuePollInterval);
+        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
+
+        subject.next();
+        expect(getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
+
+        subject.complete();
+    });
+
+    it('processes existing jobs on start', async () => {
+        const testingJobQueueStrategy = module.get(ConfigService).jobQueueOptions
+            .jobQueueStrategy as TestingJobQueueStrategy;
+
+        testingJobQueueStrategy.prePopulate([
+            new Job<any>({
+                queueName: 'test',
+                data: {},
+                id: 'job-1',
+            }),
+            new Job<any>({
+                queueName: 'test',
+                data: {},
+                id: 'job-2',
+            }),
+        ]);
+
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 1,
+            process: (job) => {
+                job.complete();
+            },
+        });
+
+        const job1 = await jobQueueService.getJob('job-1');
+        const job2 = await jobQueueService.getJob('job-2');
+        expect(job1?.state).toBe(JobState.COMPLETED);
+        expect(job2?.state).toBe(JobState.PENDING);
+
+        await tick(queuePollInterval);
+        expect(job2?.state).toBe(JobState.COMPLETED);
+    });
+
+    it('retries', async () => {
+        const subject = new Subject<boolean>();
+        const testQueue = jobQueueService.createQueue<string>({
+            name: 'test',
+            concurrency: 1,
+            process: (job) => {
+                subject.subscribe((success) => (success ? job.complete() : job.fail()));
+            },
+        });
+
+        const testJob = await testQueue.add('hello', { retries: 2 });
+
+        await tick(queuePollInterval);
+        expect(testJob.state).toBe(JobState.RUNNING);
+        expect(testJob.isSettled).toBe(false);
+
+        subject.next(false);
+        expect(testJob.state).toBe(JobState.RETRYING);
+        expect(testJob.isSettled).toBe(false);
+
+        await tick(queuePollInterval);
+        subject.next(false);
+        expect(testJob.state).toBe(JobState.RETRYING);
+        expect(testJob.isSettled).toBe(false);
+
+        await tick(queuePollInterval);
+        subject.next(false);
+        expect(testJob.state).toBe(JobState.FAILED);
+        expect(testJob.isSettled).toBe(true);
+    });
+});
+
+function tick(ms: number): Promise<void> {
+    return new Promise<void>((resolve) => setTimeout(resolve, ms));
+}
+
+class MockConfigService {
+    jobQueueOptions = {
+        jobQueueStrategy: new TestingJobQueueStrategy(),
+        pollInterval: queuePollInterval,
+    };
+}

+ 84 - 0
packages/core/src/job-queue/job-queue.service.ts

@@ -0,0 +1,84 @@
+import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
+import { ModuleRef } from '@nestjs/core';
+import { JobListOptions } from '@vendure/common/lib/generated-types';
+
+import { PaginatedList } from '../../../common/src/shared-types';
+import { ConfigService } from '../config/config.service';
+import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+
+import { Job } from './job';
+import { JobQueue } from './job-queue';
+import { CreateQueueOptions, JobData } from './types';
+
+/**
+ * @description
+ * The JobQueueService is used to create new {@link JobQueue} instances and access
+ * existing jobs.
+ *
+ * @docsCateogory JobQueue
+ */
+@Injectable()
+export class JobQueueService implements OnModuleInit, OnModuleDestroy {
+    private cleanJobsTimer: NodeJS.Timeout;
+    private queues: Array<JobQueue<any>> = [];
+
+    private get jobQueueStrategy(): JobQueueStrategy {
+        return this.configService.jobQueueOptions.jobQueueStrategy;
+    }
+
+    constructor(private configService: ConfigService, private moduleRef: ModuleRef) {}
+
+    /** @internal */
+    async onModuleInit() {
+        const { jobQueueStrategy } = this.configService.jobQueueOptions;
+        if (typeof jobQueueStrategy.init === 'function') {
+            await jobQueueStrategy.init(this.moduleRef);
+        }
+    }
+
+    /** @internal */
+    onModuleDestroy() {
+        for (const queue of this.queues) {
+            queue.destroy();
+        }
+    }
+
+    /**
+     * @description
+     * Configures and creates a new {@link JobQueue} instance.
+     */
+    createQueue<Data extends JobData<Data>>(options: CreateQueueOptions<Data>): JobQueue<Data> {
+        const { jobQueueStrategy, pollInterval } = this.configService.jobQueueOptions;
+        const queue = new JobQueue(options, jobQueueStrategy, pollInterval);
+        queue.start();
+        this.queues.push(queue);
+        return queue;
+    }
+
+    /**
+     * @description
+     * Gets a job by id. The implementation is handled by the configured
+     * {@link JobQueueStrategy}.
+     */
+    getJob(id: string): Promise<Job | undefined> {
+        return this.jobQueueStrategy.findOne(id);
+    }
+
+    /**
+     * @description
+     * Gets jobs according to the supplied options. The implementation is handled by the configured
+     * {@link JobQueueStrategy}.
+     */
+    getJobs(options?: JobListOptions): Promise<PaginatedList<Job>> {
+        return this.jobQueueStrategy.findMany(options);
+    }
+
+    /**
+     * @description
+     * Gets jobs by ids. The implementation is handled by the configured
+     * {@link JobQueueStrategy}.
+     */
+    getJobsById(ids: string[]): Promise<Job[]> {
+        return this.jobQueueStrategy.findManyById(ids);
+    }
+}

+ 99 - 0
packages/core/src/job-queue/job-queue.ts

@@ -0,0 +1,99 @@
+import { JobState } from '@vendure/common/lib/generated-types';
+
+import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+
+import { Job } from './job';
+import { CreateQueueOptions, JobConfig, JobData } from './types';
+
+/**
+ * @description
+ * A JobQueue is used to process {@link Job}s. A job is added to the queue via the
+ * `.add()` method, and the queue will then poll for new jobs and process each
+ * according to the process
+ *
+ * @docsCateogory JobQueue
+ */
+export class JobQueue<Data extends JobData<Data> = {}> {
+    private activeJobs: Array<Job<Data>> = [];
+    private timer: any;
+    private fooId: number;
+    get concurrency(): number {
+        return this.options.concurrency;
+    }
+
+    get name(): string {
+        return this.options.name;
+    }
+
+    constructor(
+        private options: CreateQueueOptions<Data>,
+        private jobQueueStrategy: JobQueueStrategy,
+        private pollInterval: number,
+    ) {}
+
+    /** @internal */
+    start() {
+        const runNextJobs = async () => {
+            const concurrency = this.options.concurrency;
+            const runningJobsCount = this.activeJobs.length;
+            for (let i = runningJobsCount; i < concurrency; i++) {
+                const nextJob: Job<Data> | undefined = await this.jobQueueStrategy.next(this.options.name);
+                if (nextJob) {
+                    this.activeJobs.push(nextJob);
+                    nextJob.on('complete', (job) => this.onFailOrComplete(job));
+                    nextJob.on('fail', (job) => this.onFailOrComplete(job));
+                    try {
+                        const returnVal = this.options.process(nextJob);
+                        if (returnVal instanceof Promise) {
+                            returnVal.catch((err) => nextJob.fail(err));
+                        }
+                    } catch (err) {
+                        nextJob.fail(err);
+                    }
+                    await this.jobQueueStrategy.update(nextJob);
+                }
+            }
+            this.timer = setTimeout(runNextJobs, this.pollInterval);
+        };
+
+        runNextJobs();
+    }
+
+    /** @internal */
+    pause() {
+        clearTimeout(this.timer);
+    }
+
+    /** @internal */
+    destroy() {
+        clearTimeout(this.timer);
+    }
+
+    /** @internal */
+    _process(job: Job<Data>) {
+        this.options.process(job);
+    }
+
+    /**
+     * @description
+     * Adds a new {@link Job} to the queue.
+     */
+    add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>) {
+        const job = new Job<any>({
+            data,
+            queueName: this.options.name,
+            retries: options?.retries ?? 0,
+        });
+        return this.jobQueueStrategy.add(job);
+    }
+
+    private async onFailOrComplete(job: Job<Data>) {
+        await this.jobQueueStrategy.update(job);
+        this.removeJobFromActive(job);
+    }
+
+    private removeJobFromActive(job: Job<Data>) {
+        const index = this.activeJobs.indexOf(job);
+        this.activeJobs.splice(index, 1);
+    }
+}

+ 173 - 0
packages/core/src/job-queue/job.ts

@@ -0,0 +1,173 @@
+import { JobState } from '@vendure/common/lib/generated-types';
+
+import { generatePublicId } from '../common/generate-public-id';
+
+import { JobConfig, JobData } from './types';
+
+/**
+ * @description
+ * An event raised by a Job.
+ *
+ * @docsCategory JobQueue
+ * @docsPage Job
+ */
+export type JobEventType = 'start' | 'complete' | 'fail';
+
+/**
+ * @description
+ * The signature of the event handler expected by the `Job.on()` method.
+ *
+ * @docsCategory JobQueue
+ * @docsPage Job
+ */
+export type JobEventListener<T extends JobData<T>> = (job: Job<T>) => void;
+
+/**
+ * @description
+ * A Job represents a piece of work to be run in the background, i.e. outside the request-response cycle.
+ * It is intended to be used for long-running work triggered by API requests. Jobs should now generally
+ * be directly instantiated. Rather, the {@link JobQueue} `add()` method should be used to create and
+ * add a new Job to a queue.
+ *
+ * @docsCategory JobQueue
+ * @docsPage Job
+ */
+export class Job<T extends JobData<T> = any> {
+    readonly id: string;
+    readonly queueName: string;
+    private readonly _data: T;
+    private readonly created: Date;
+    private readonly retries: number;
+    private _state: JobState;
+    private _progress: number;
+    private _result?: any;
+    private _error?: any;
+    private _attempts: number;
+    private _started?: Date;
+    private _settled?: Date;
+    private readonly eventListeners: { [type in JobEventType]: Array<JobEventListener<T>> } = {
+        start: [],
+        complete: [],
+        fail: [],
+    };
+
+    get name(): string {
+        return this.queueName;
+    }
+
+    get data(): T {
+        return this._data;
+    }
+
+    get state(): JobState {
+        return this._state;
+    }
+
+    get progress(): number {
+        return this._progress;
+    }
+
+    get result(): any {
+        return this._result;
+    }
+
+    get error(): any {
+        return this._error;
+    }
+
+    get isSettled(): boolean {
+        return !!this._settled;
+    }
+
+    get started(): Date | undefined {
+        return this._started;
+    }
+
+    get settled(): Date | undefined {
+        return this._settled;
+    }
+
+    get duration(): number {
+        const end = this._settled || new Date();
+        return +end - +(this._started || end);
+    }
+
+    constructor(config: JobConfig<T>) {
+        this.queueName = config.queueName;
+        this._data = config.data;
+        this.id = config.id || generatePublicId();
+        this._state = config.state || JobState.PENDING;
+        this.retries = config.retries || 0;
+        this._attempts = config.attempts || 0;
+        this._progress = config.progress || 0;
+        this.created = config.created || new Date();
+        this._result = config.result;
+        this._started = config.started;
+        this._settled = config.settled;
+    }
+
+    /**
+     * @description
+     * Calling this signifies that the job work has started. This method should be
+     * called in the {@link JobQueueStrategy} `next()` method.
+     */
+    start() {
+        if (this._state === JobState.PENDING || this._state === JobState.RETRYING) {
+            this._state = JobState.RUNNING;
+            this._started = new Date();
+            this._attempts++;
+            this.fireEvent('start');
+        }
+    }
+
+    /**
+     * @description
+     * Sets the progress (0 - 100) of the job.
+     */
+    setProgress(percent: number) {
+        this._progress = Math.min(percent, 100);
+    }
+
+    /**
+     * @description
+     * Calling this method signifies that the job succeeded. The result
+     * will be stored in the `Job.result` property.
+     */
+    complete(result?: any) {
+        this._result = result;
+        this._progress = 100;
+        this._state = JobState.COMPLETED;
+        this._settled = new Date();
+        this.fireEvent('complete');
+    }
+
+    /**
+     * @description
+     * Calling this method signifies that the job failed.
+     */
+    fail(err?: any) {
+        this._error = String(err);
+        this._progress = 0;
+        if (this.retries >= this._attempts) {
+            this._state = JobState.RETRYING;
+        } else {
+            this._state = JobState.FAILED;
+            this._settled = new Date();
+        }
+        this.fireEvent('fail');
+    }
+
+    /**
+     * @description
+     * Used to register event handlers for job events
+     */
+    on(eventType: JobEventType, listener: JobEventListener<T>) {
+        this.eventListeners[eventType].push(listener);
+    }
+
+    private fireEvent(eventType: JobEventType) {
+        for (const listener of this.eventListeners[eventType]) {
+            listener(this);
+        }
+    }
+}

+ 29 - 0
packages/core/src/job-queue/sql-job-queue-strategy.ts

@@ -0,0 +1,29 @@
+import { Connection } from 'typeorm';
+
+import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+
+import { Job } from './job';
+
+export class SqlJobQueueStrategy implements JobQueueStrategy {
+    private connection: Connection;
+
+    init(connection: Connection) {
+        this.connection = connection;
+    }
+    add(job: Job): Promise<Job> {
+        throw new Error('Method not implemented.');
+    }
+    next(queueName: string): Promise<Job> {
+        return {} as any;
+    }
+    update(job: Job<{}>): Promise<void> {
+        return {} as any;
+    }
+    findMany(): Promise<Job[]> {
+        return {} as any;
+    }
+
+    findOne(id: string): Promise<Job> {
+        return {} as any;
+    }
+}

+ 62 - 0
packages/core/src/job-queue/testing-job-queue-strategy.ts

@@ -0,0 +1,62 @@
+import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
+import { PaginatedList } from '@vendure/common/lib/shared-types';
+
+import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+
+import { Job } from './job';
+
+/**
+ * @description
+ * An in-memory {@link JobQueueStrategy} design for testing purposes. Not to be used in production
+ * since all jobs are lost when the server stops.
+ */
+export class TestingJobQueueStrategy implements JobQueueStrategy {
+    private jobs: Job[] = [];
+
+    prePopulate(jobs: Job[]) {
+        this.jobs.push(...jobs);
+    }
+
+    async add(job: Job): Promise<Job> {
+        this.jobs.push(job);
+        return job;
+    }
+
+    async findOne(id: string): Promise<Job | undefined> {
+        return this.jobs.find((j) => j.id === id);
+    }
+
+    async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
+        // The sort, filter, paginate logic is not implemented because
+        // it is not needed for testing purposes.
+        const items = this.jobs;
+        return {
+            items,
+            totalItems: items.length,
+        };
+    }
+
+    async findManyById(ids: string[]): Promise<Job[]> {
+        return this.jobs.filter((job) => ids.includes(job.id));
+    }
+
+    async next(queueName: string): Promise<Job | undefined> {
+        const next = this.jobs.find((job) => {
+            return (
+                (job.state === JobState.PENDING || job.state === JobState.RETRYING) &&
+                job.queueName === queueName
+            );
+        });
+        if (next) {
+            next.start();
+            return next;
+        }
+    }
+
+    async update(job: Job): Promise<void> {
+        const index = this.jobs.findIndex((j) => j.id === job.id);
+        if (-1 < index) {
+            this.jobs.splice(index, 1, job);
+        }
+    }
+}

+ 63 - 0
packages/core/src/job-queue/types.ts

@@ -0,0 +1,63 @@
+import { JobState } from '@vendure/common/lib/generated-types';
+import { JsonCompatible } from '@vendure/common/lib/shared-types';
+
+import { Job } from './job';
+
+/**
+ * @description
+ * Used to configure a new {@link JobQueue} instance.
+ *
+ * @docsCategory JobQueue
+ * @docsPage types
+ */
+export interface CreateQueueOptions<T extends JobData<T>> {
+    /**
+     * @description
+     * The name of the queue, e.g. "image processing", "re-indexing" etc.
+     */
+    name: string;
+    /**
+     * @description
+     * How many jobs of this type may be run concurrently.
+     */
+    concurrency: number;
+    /**
+     * @description
+     * Defines the work to be done for each job in the queue. When the work is complete,
+     * `job.complete()` should be called, and for any errors, `job.fail()` should be called.
+     * Unhandled exceptions will automatically call `job.fail()`.
+     */
+    process: (job: Job<T>) => any | Promise<any>;
+}
+
+/**
+ * @description
+ * A JSON-serializable data type which provides a {@link Job}
+ * with the data it needs to be processed.
+ *
+ * @docsCategory JobQueue
+ * @docsPage types
+ */
+export type JobData<T> = JsonCompatible<T>;
+
+/**
+ * @description
+ * Used to instantiate a new {@link Job}
+ *
+ * @docsCategory JobQueue
+ * @docsPage types
+ */
+export interface JobConfig<T extends JobData<T>> {
+    queueName: string;
+    data: T;
+    retries?: number;
+    attempts?: number;
+    id?: string;
+    state?: JobState;
+    progress?: number;
+    result?: any;
+    error?: any;
+    created?: Date;
+    started?: Date;
+    settled?: Date;
+}

+ 28 - 22
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -67,34 +67,40 @@ export class DefaultSearchPlugin implements OnVendureBootstrap {
 
     /** @internal */
     async onVendureBootstrap() {
-        this.eventBus.ofType(ProductEvent).subscribe(event => {
+        this.searchIndexService.initJobQueue();
+
+        this.eventBus.ofType(ProductEvent).subscribe((event) => {
             if (event.type === 'deleted') {
-                return this.searchIndexService.deleteProduct(event.ctx, event.product).start();
+                return this.searchIndexService.deleteProduct(event.ctx, event.product);
             } else {
-                return this.searchIndexService.updateProduct(event.ctx, event.product).start();
+                return this.searchIndexService.updateProduct(event.ctx, event.product);
             }
         });
-        this.eventBus.ofType(ProductVariantEvent).subscribe(event => {
+        this.eventBus.ofType(ProductVariantEvent).subscribe((event) => {
             if (event.type === 'deleted') {
-                return this.searchIndexService.deleteVariant(event.ctx, event.variants).start();
+                return this.searchIndexService.deleteVariant(event.ctx, event.variants);
             } else {
-                return this.searchIndexService.updateVariants(event.ctx, event.variants).start();
+                return this.searchIndexService.updateVariants(event.ctx, event.variants);
             }
         });
-        this.eventBus.ofType(AssetEvent).subscribe(event => {
+        this.eventBus.ofType(AssetEvent).subscribe((event) => {
             if (event.type === 'updated') {
-                return this.searchIndexService.updateAsset(event.ctx, event.asset).start();
+                return this.searchIndexService.updateAsset(event.ctx, event.asset);
             }
         });
-        this.eventBus.ofType(ProductChannelEvent).subscribe(event => {
+        this.eventBus.ofType(ProductChannelEvent).subscribe((event) => {
             if (event.type === 'assigned') {
-                return this.searchIndexService
-                    .assignProductToChannel(event.ctx, event.product.id, event.channelId)
-                    .start();
+                return this.searchIndexService.assignProductToChannel(
+                    event.ctx,
+                    event.product.id,
+                    event.channelId,
+                );
             } else {
-                return this.searchIndexService
-                    .removeProductFromChannel(event.ctx, event.product.id, event.channelId)
-                    .start();
+                return this.searchIndexService.removeProductFromChannel(
+                    event.ctx,
+                    event.product.id,
+                    event.channelId,
+                );
             }
         });
 
@@ -103,21 +109,21 @@ export class DefaultSearchPlugin implements OnVendureBootstrap {
         collectionModification$
             .pipe(
                 buffer(closingNotifier$),
-                filter(events => 0 < events.length),
-                map(events => ({
+                filter((events) => 0 < events.length),
+                map((events) => ({
                     ctx: events[0].ctx,
                     ids: events.reduce((ids, e) => [...ids, ...e.productVariantIds], [] as ID[]),
                 })),
-                filter(e => 0 < e.ids.length),
+                filter((e) => 0 < e.ids.length),
             )
-            .subscribe(events => {
-                return this.searchIndexService.updateVariantsById(events.ctx, events.ids).start();
+            .subscribe((events) => {
+                return this.searchIndexService.updateVariantsById(events.ctx, events.ids);
             });
 
-        this.eventBus.ofType(TaxRateModificationEvent).subscribe(event => {
+        this.eventBus.ofType(TaxRateModificationEvent).subscribe((event) => {
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
-                return this.searchIndexService.reindex(event.ctx).start();
+                return this.searchIndexService.reindex(event.ctx);
             }
         });
     }

+ 2 - 3
packages/core/src/plugin/default-search-plugin/fulltext-search.resolver.ts

@@ -1,6 +1,5 @@
 import { Args, Mutation, Parent, Query, ResolveField, Resolver } from '@nestjs/graphql';
 import {
-    JobInfo,
     Permission,
     QuerySearchArgs,
     SearchInput,
@@ -38,7 +37,7 @@ export class ShopFulltextSearchResolver implements Omit<BaseSearchResolver, 'rei
         @Parent() parent: { input: SearchInput },
     ): Promise<Array<{ facetValue: FacetValue; count: number }>> {
         const facetValues = await this.fulltextSearchService.facetValues(ctx, parent.input, true);
-        return facetValues.filter(i => !i.facetValue.facet.isPrivate);
+        return facetValues.filter((i) => !i.facetValue.facet.isPrivate);
     }
 }
 
@@ -68,7 +67,7 @@ export class AdminFulltextSearchResolver implements BaseSearchResolver {
 
     @Mutation()
     @Allow(Permission.UpdateCatalog)
-    async reindex(@Ctx() ctx: RequestContext): Promise<JobInfo> {
+    async reindex(@Ctx() ctx: RequestContext) {
         return this.fulltextSearchService.reindex(ctx);
     }
 }

+ 5 - 7
packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts

@@ -1,6 +1,6 @@
 import { Injectable } from '@nestjs/common';
 import { InjectConnection } from '@nestjs/typeorm';
-import { JobInfo, SearchInput, SearchResponse } from '@vendure/common/lib/generated-types';
+import { SearchInput, SearchResponse } from '@vendure/common/lib/generated-types';
 import { Omit } from '@vendure/common/lib/omit';
 import { Connection } from 'typeorm';
 
@@ -8,8 +8,8 @@ import { RequestContext } from '../../api/common/request-context';
 import { InternalServerError } from '../../common/error/errors';
 import { FacetValue } from '../../entity';
 import { EventBus } from '../../event-bus/event-bus';
+import { Job } from '../../job-queue/job';
 import { FacetValueService } from '../../service/services/facet-value.service';
-import { JobService } from '../../service/services/job.service';
 import { ProductVariantService } from '../../service/services/product-variant.service';
 import { SearchService } from '../../service/services/search.service';
 
@@ -30,7 +30,6 @@ export class FulltextSearchService {
 
     constructor(
         @InjectConnection() private connection: Connection,
-        private jobService: JobService,
         private eventBus: EventBus,
         private facetValueService: FacetValueService,
         private productVariantService: ProductVariantService,
@@ -81,10 +80,9 @@ export class FulltextSearchService {
     /**
      * Rebuilds the full search index.
      */
-    async reindex(ctx: RequestContext): Promise<JobInfo> {
-        const job = this.searchIndexService.reindex(ctx);
-        job.start();
-        return job;
+    async reindex(ctx: RequestContext): Promise<Job> {
+        const job = await this.searchIndexService.reindex(ctx);
+        return job as any;
     }
 
     /**

+ 82 - 82
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -6,8 +6,9 @@ import { Logger } from '../../../config/logger/vendure-logger';
 import { Asset } from '../../../entity/asset/asset.entity';
 import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
 import { Product } from '../../../entity/product/product.entity';
-import { Job } from '../../../service/helpers/job-manager/job';
-import { JobReporter, JobService } from '../../../service/services/job.service';
+import { Job } from '../../../job-queue/job';
+import { JobQueue } from '../../../job-queue/job-queue';
+import { JobQueueService } from '../../../job-queue/job-queue.service';
 import { WorkerMessage } from '../../../worker/types';
 import { WorkerService } from '../../../worker/worker.service';
 import {
@@ -18,127 +19,130 @@ import {
     ReindexMessageResponse,
     RemoveProductFromChannelMessage,
     UpdateAssetMessage,
+    UpdateIndexQueueJobData,
     UpdateProductMessage,
     UpdateVariantMessage,
     UpdateVariantsByIdMessage,
 } from '../types';
 
+let updateIndexQueue: JobQueue<UpdateIndexQueueJobData> | undefined;
+
 /**
  * This service is responsible for messaging the {@link IndexerController} with search index updates.
  */
 @Injectable()
 export class SearchIndexService {
-    constructor(private workerService: WorkerService, private jobService: JobService) {}
-
-    reindex(ctx: RequestContext): Job {
-        return this.jobService.createJob({
-            name: 'reindex',
-            singleInstance: true,
-            work: async (reporter) => {
-                Logger.verbose(`sending ReindexMessage`);
-                this.workerService
-                    .send(new ReindexMessage({ ctx: ctx.serialize() }))
-                    .subscribe(this.createObserver(reporter));
+    constructor(private workerService: WorkerService, private jobService: JobQueueService) {}
+
+    initJobQueue() {
+        updateIndexQueue = this.jobService.createQueue({
+            name: 'update-search-index',
+            concurrency: 1,
+            process: (job) => {
+                const data = job.data;
+                switch (data.type) {
+                    case 'reindex':
+                        Logger.verbose(`sending ReindexMessage`);
+                        this.sendMessageWithProgress(job, new ReindexMessage(data));
+                        break;
+                    case 'update-product':
+                        this.sendMessage(job, new UpdateProductMessage(data));
+                        break;
+                    case 'update-variants':
+                        this.sendMessage(job, new UpdateVariantMessage(data));
+                        break;
+                    case 'delete-product':
+                        this.sendMessage(job, new DeleteProductMessage(data));
+                        break;
+                    case 'delete-variant':
+                        this.sendMessage(job, new DeleteVariantMessage(data));
+                        break;
+                    case 'update-variants-by-id':
+                        this.sendMessageWithProgress(job, new UpdateVariantsByIdMessage(data));
+                        break;
+                    case 'update-asset':
+                        this.sendMessage(job, new UpdateAssetMessage(data));
+                        break;
+                    case 'assign-product-to-channel':
+                        this.sendMessage(job, new AssignProductToChannelMessage(data));
+                        break;
+                    case 'remove-product-from-channel':
+                        this.sendMessage(job, new RemoveProductFromChannelMessage(data));
+                        break;
+                }
             },
         });
     }
 
+    reindex(ctx: RequestContext) {
+        return this.addJobToQueue({ type: 'reindex', ctx: ctx.serialize() });
+    }
+
     updateProduct(ctx: RequestContext, product: Product) {
-        const data = { ctx: ctx.serialize(), productId: product.id };
-        return this.createShortWorkerJob(new UpdateProductMessage(data), {
-            entity: 'Product',
-            id: product.id,
-        });
+        this.addJobToQueue({ type: 'update-product', ctx: ctx.serialize(), productId: product.id });
     }
 
     updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map((v) => v.id);
-        const data = { ctx: ctx.serialize(), variantIds };
-        return this.createShortWorkerJob(new UpdateVariantMessage(data), {
-            entity: 'ProductVariant',
-            ids: variantIds,
-        });
+        this.addJobToQueue({ type: 'update-variants', ctx: ctx.serialize(), variantIds });
     }
 
     deleteProduct(ctx: RequestContext, product: Product) {
-        const data = { ctx: ctx.serialize(), productId: product.id };
-        return this.createShortWorkerJob(new DeleteProductMessage(data), {
-            entity: 'Product',
-            id: product.id,
-        });
+        this.addJobToQueue({ type: 'delete-product', ctx: ctx.serialize(), productId: product.id });
     }
 
     deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map((v) => v.id);
-        const data = { ctx: ctx.serialize(), variantIds };
-        return this.createShortWorkerJob(new DeleteVariantMessage(data), {
-            entity: 'ProductVariant',
-            id: variantIds,
-        });
+        this.addJobToQueue({ type: 'delete-variant', ctx: ctx.serialize(), variantIds });
     }
 
     updateVariantsById(ctx: RequestContext, ids: ID[]) {
-        return this.jobService.createJob({
-            name: 'update-variants',
-            metadata: {
-                variantIds: ids,
-            },
-            work: (reporter) => {
-                Logger.verbose(`sending UpdateVariantsByIdMessage`);
-                this.workerService
-                    .send(new UpdateVariantsByIdMessage({ ctx: ctx.serialize(), ids }))
-                    .subscribe(this.createObserver(reporter));
-            },
-        });
+        this.addJobToQueue({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids });
     }
 
     updateAsset(ctx: RequestContext, asset: Asset) {
-        return this.createShortWorkerJob(new UpdateAssetMessage({ ctx: ctx.serialize(), asset }), {
-            entity: 'Asset',
-            id: asset.id,
-        });
+        this.addJobToQueue({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any });
     }
 
     assignProductToChannel(ctx: RequestContext, productId: ID, channelId: ID) {
-        const data = { ctx: ctx.serialize(), productId, channelId };
-        return this.createShortWorkerJob(new AssignProductToChannelMessage(data), {
-            entity: 'Product',
-            id: productId,
+        this.addJobToQueue({
+            type: 'assign-product-to-channel',
+            ctx: ctx.serialize(),
+            productId,
+            channelId,
         });
     }
 
     removeProductFromChannel(ctx: RequestContext, productId: ID, channelId: ID) {
-        const data = { ctx: ctx.serialize(), productId, channelId };
-        return this.createShortWorkerJob(new RemoveProductFromChannelMessage(data), {
-            entity: 'Product',
-            id: productId,
+        this.addJobToQueue({
+            type: 'remove-product-from-channel',
+            ctx: ctx.serialize(),
+            productId,
+            channelId,
         });
     }
 
-    /**
-     * Creates a short-running job that does not expect progress updates.
-     */
-    private createShortWorkerJob<T extends WorkerMessage<any, any>>(message: T, metadata: any) {
-        return this.jobService.createJob({
-            name: 'update-index',
-            metadata,
-            work: (reporter) => {
-                this.workerService.send(message).subscribe({
-                    complete: () => reporter.complete(true),
-                    error: (err) => {
-                        Logger.error(err);
-                        reporter.complete(false);
-                    },
-                });
+    private addJobToQueue(data: UpdateIndexQueueJobData) {
+        if (updateIndexQueue) {
+            return updateIndexQueue.add(data);
+        }
+    }
+
+    private sendMessage(job: Job<any>, message: WorkerMessage<any, any>) {
+        this.workerService.send(message).subscribe({
+            complete: () => job.complete(true),
+            error: (err) => {
+                Logger.error(err);
+                job.fail(err);
             },
         });
     }
 
-    private createObserver(reporter: JobReporter) {
+    private sendMessageWithProgress(job: Job<any>, message: ReindexMessage | UpdateVariantsByIdMessage) {
         let total: number | undefined;
         let duration = 0;
         let completed = 0;
-        return {
+        this.workerService.send(message).subscribe({
             next: (response: ReindexMessageResponse) => {
                 if (!total) {
                     total = response.total;
@@ -146,10 +150,10 @@ export class SearchIndexService {
                 duration = response.duration;
                 completed = response.completed;
                 const progress = Math.ceil((completed / total) * 100);
-                reporter.setProgress(progress);
+                job.setProgress(progress);
             },
             complete: () => {
-                reporter.complete({
+                job.complete({
                     success: true,
                     indexedItemCount: total,
                     timeTaken: duration,
@@ -157,12 +161,8 @@ export class SearchIndexService {
             },
             error: (err: any) => {
                 Logger.error(JSON.stringify(err));
-                reporter.complete({
-                    success: false,
-                    indexedItemCount: 0,
-                    timeTaken: 0,
-                });
+                job.fail();
             },
-        };
+        });
     }
 }

+ 38 - 11
packages/core/src/plugin/default-search-plugin/types.ts

@@ -1,14 +1,18 @@
-import { ID } from '@vendure/common/lib/shared-types';
+import { ID, JsonCompatible } from '@vendure/common/lib/shared-types';
 
 import { SerializedRequestContext } from '../../api/common/request-context';
 import { Asset } from '../../entity/asset/asset.entity';
 import { WorkerMessage } from '../../worker/types';
 
-export interface ReindexMessageResponse {
+export type ReindexMessageResponse = {
     total: number;
     completed: number;
     duration: number;
-}
+};
+
+export type ReindexMessageData = {
+    ctx: SerializedRequestContext;
+};
 
 export type UpdateProductMessageData = {
     ctx: SerializedRequestContext;
@@ -20,22 +24,23 @@ export type UpdateVariantMessageData = {
     variantIds: ID[];
 };
 
-export interface UpdateVariantsByIdMessageData {
+export type UpdateVariantsByIdMessageData = {
     ctx: SerializedRequestContext;
     ids: ID[];
-}
-export interface UpdateAssetMessageData {
+};
+
+export type UpdateAssetMessageData = {
     ctx: SerializedRequestContext;
-    asset: Asset;
-}
+    asset: JsonCompatible<Required<Asset>>;
+};
 
-export interface ProductChannelMessageData {
+export type ProductChannelMessageData = {
     ctx: SerializedRequestContext;
     productId: ID;
     channelId: ID;
-}
+};
 
-export class ReindexMessage extends WorkerMessage<{ ctx: SerializedRequestContext }, ReindexMessageResponse> {
+export class ReindexMessage extends WorkerMessage<ReindexMessageData, ReindexMessageResponse> {
     static readonly pattern = 'Reindex';
 }
 export class UpdateVariantMessage extends WorkerMessage<UpdateVariantMessageData, boolean> {
@@ -65,3 +70,25 @@ export class RemoveProductFromChannelMessage extends WorkerMessage<ProductChanne
 export class UpdateAssetMessage extends WorkerMessage<UpdateAssetMessageData, boolean> {
     static readonly pattern = 'UpdateAsset';
 }
+
+type NamedJobData<Type extends string, MessageData> = { type: Type } & MessageData;
+
+export type ReindexJobData = NamedJobData<'reindex', ReindexMessageData>;
+type UpdateProductJobData = NamedJobData<'update-product', UpdateProductMessageData>;
+type UpdateVariantsJobData = NamedJobData<'update-variants', UpdateVariantMessageData>;
+type DeleteProductJobData = NamedJobData<'delete-product', UpdateProductMessageData>;
+type DeleteVariantJobData = NamedJobData<'delete-variant', UpdateVariantMessageData>;
+type UpdateVariantsByIdJobData = NamedJobData<'update-variants-by-id', UpdateVariantsByIdMessageData>;
+type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>;
+type AssignProductToChannelJobData = NamedJobData<'assign-product-to-channel', ProductChannelMessageData>;
+type RemoveProductFromChannelJobData = NamedJobData<'remove-product-from-channel', ProductChannelMessageData>;
+export type UpdateIndexQueueJobData =
+    | ReindexJobData
+    | UpdateProductJobData
+    | UpdateVariantsJobData
+    | DeleteProductJobData
+    | DeleteVariantJobData
+    | UpdateVariantsByIdJobData
+    | UpdateAssetJobData
+    | AssignProductToChannelJobData
+    | RemoveProductFromChannelJobData;

+ 3 - 2
packages/core/src/plugin/plugin-common.module.ts

@@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
 
 import { ConfigModule } from '../config/config.module';
 import { EventBusModule } from '../event-bus/event-bus.module';
+import { JobQueueModule } from '../job-queue/job-queue.module';
 import { ServiceModule } from '../service/service.module';
 import { WorkerServiceModule } from '../worker/worker-service.module';
 
@@ -21,10 +22,10 @@ import { WorkerServiceModule } from '../worker/worker-service.module';
  * @docsCategory plugin
  */
 @Module({
-    imports: [EventBusModule, ConfigModule, ServiceModule.forPlugin(), WorkerServiceModule],
+    imports: [EventBusModule, ConfigModule, ServiceModule.forPlugin(), WorkerServiceModule, JobQueueModule],
     providers: [
         // TODO: Provide an injectable which defines whether in main or worker context
     ],
-    exports: [EventBusModule, ConfigModule, ServiceModule.forPlugin(), WorkerServiceModule],
+    exports: [EventBusModule, ConfigModule, ServiceModule.forPlugin(), WorkerServiceModule, JobQueueModule],
 })
 export class PluginCommonModule {}

+ 0 - 185
packages/core/src/service/helpers/job-manager/job-manager.spec.ts

@@ -1,185 +0,0 @@
-/* tslint:disable:no-non-null-assertion no-empty */
-import { JobState } from '@vendure/common/lib/generated-types';
-import { pick } from '@vendure/common/lib/pick';
-import { Subject } from 'rxjs';
-
-import { JobReporter, PartialJobReporter } from '../../services/job.service';
-
-import { JobManager } from './job-manager';
-
-describe('JobManager', () => {
-    const noop = () => Promise.resolve();
-    const mockReporter: PartialJobReporter = {
-        complete(result?: any): void {},
-    };
-
-    it('getOne() returns null for invalid id', () => {
-        const jm = new JobManager();
-        expect(jm.getOne('invalid')).toBeNull();
-    });
-
-    it('createJob() returns a job', () => {
-        const jm = new JobManager();
-        const job = jm.createJob('test', noop, mockReporter);
-        expect(job.name).toBe('test');
-    });
-
-    it('getOne() returns job by id', () => {
-        const jm = new JobManager();
-        const job1 = jm.createJob('test', noop, mockReporter);
-        const job2 = jm.getOne(job1.id);
-
-        expect(job1.id).toBe(job2!.id);
-    });
-
-    it('job completes once work fn returns', async () => {
-        const jm = new JobManager();
-        const subject = new Subject();
-        const job = jm.createJob('test', () => subject.toPromise(), mockReporter);
-        job.start();
-        await tick();
-
-        expect(jm.getOne(job.id)!.state).toBe(JobState.RUNNING);
-
-        subject.next('result');
-        subject.complete();
-        await tick();
-
-        const result = jm.getOne(job.id)!;
-        expect(result.state).toBe(JobState.COMPLETED);
-        expect(result.result).toBe('result');
-    });
-
-    it('job fails if work fn throws', async () => {
-        const jm = new JobManager();
-        const subject = new Subject();
-        const job = jm.createJob('test', () => subject.toPromise(), mockReporter);
-        job.start();
-        await tick();
-
-        expect(jm.getOne(job.id)!.state).toBe(JobState.RUNNING);
-
-        subject.error('oh no');
-        await tick();
-
-        const result = jm.getOne(job.id)!;
-        expect(result.state).toBe(JobState.FAILED);
-        expect(result.result).toBe('oh no');
-    });
-
-    it('reporter.setProgress updates job progress', async () => {
-        const jm = new JobManager();
-        const progressSubject = new Subject<number>();
-        const testReporter: PartialJobReporter = {
-            complete(r?: any): void {},
-        };
-        const wrappedWork = () => {
-            return new Promise(async (resolve, reject) => {
-                testReporter.complete = res => resolve(res);
-                progressSubject.subscribe(
-                    val => testReporter.setProgress!(val),
-                    () => testReporter.complete(),
-                    () => testReporter.complete(),
-                );
-                const r = await progressSubject.toPromise();
-            });
-        };
-        const job = jm.createJob('test', wrappedWork, testReporter);
-        job.start();
-        await tick();
-        expect(jm.getOne(job.id)!.progress).toBe(0);
-
-        progressSubject.next(10);
-        expect(jm.getOne(job.id)!.progress).toBe(10);
-
-        progressSubject.next(42);
-        expect(jm.getOne(job.id)!.progress).toBe(42);
-
-        progressSubject.next(500);
-        expect(jm.getOne(job.id)!.progress).toBe(100);
-
-        progressSubject.next(88);
-        expect(jm.getOne(job.id)!.progress).toBe(88);
-
-        progressSubject.complete();
-        await tick();
-
-        const result = jm.getOne(job.id)!;
-        expect(jm.getOne(job.id)!.progress).toBe(100);
-    });
-
-    it('getAll() returns all jobs', () => {
-        const jm = new JobManager();
-        const job1 = jm.createJob('job1', noop, mockReporter);
-        const job2 = jm.createJob('job2', noop, mockReporter);
-        const job3 = jm.createJob('job3', noop, mockReporter);
-
-        expect(jm.getAll().map(j => j.id)).toEqual([job1.id, job2.id, job3.id]);
-    });
-
-    it('getAll() filters by id', () => {
-        const jm = new JobManager();
-        const job1 = jm.createJob('job1', noop, mockReporter);
-        const job2 = jm.createJob('job2', noop, mockReporter);
-        const job3 = jm.createJob('job3', noop, mockReporter);
-
-        expect(jm.getAll({ ids: [job1.id, job3.id] }).map(j => j.id)).toEqual([job1.id, job3.id]);
-    });
-
-    it('getAll() filters by state', async () => {
-        const jm = new JobManager();
-        const subject = new Subject();
-        const job1 = jm.createJob('job1', noop, mockReporter);
-        const job2 = jm.createJob('job2', noop, mockReporter);
-        const job3 = jm.createJob('job3', () => subject.toPromise(), mockReporter);
-        job1.start();
-        job2.start();
-        job3.start();
-
-        await tick();
-
-        expect(jm.getAll({ state: JobState.COMPLETED }).map(j => j.id)).toEqual([job1.id, job2.id]);
-        expect(jm.getAll({ state: JobState.RUNNING }).map(j => j.id)).toEqual([job3.id]);
-    });
-
-    it('clean() removes completed jobs older than maxAge', async () => {
-        const jm = new JobManager('50ms');
-        const subject1 = new Subject();
-        const subject2 = new Subject();
-
-        const job1 = jm.createJob('job1', () => subject1.toPromise(), mockReporter);
-        const job2 = jm.createJob('job2', () => subject2.toPromise(), mockReporter);
-        job1.start();
-        job2.start();
-
-        subject1.complete();
-        await tick();
-
-        jm.clean();
-
-        expect(jm.getAll().map(pick(['name', 'state']))).toEqual([
-            { name: 'job1', state: JobState.COMPLETED },
-            { name: 'job2', state: JobState.RUNNING },
-        ]);
-
-        await tick(75);
-
-        jm.clean();
-
-        expect(jm.getAll().map(pick(['name', 'state']))).toEqual([{ name: 'job2', state: JobState.RUNNING }]);
-    });
-
-    it('findRunningJob() works', async () => {
-        const jm = new JobManager();
-        const subject1 = new Subject();
-
-        const job1 = jm.createJob('job1', () => subject1.toPromise(), mockReporter);
-        job1.start();
-
-        expect(jm.findRunningJob('job1')).toBe(job1);
-    });
-});
-
-function tick(duration: number = 0) {
-    return new Promise(resolve => global.setTimeout(resolve, duration));
-}

+ 0 - 96
packages/core/src/service/helpers/job-manager/job-manager.ts

@@ -1,96 +0,0 @@
-import { JobInfo, JobListInput, JobState } from '@vendure/common/lib/generated-types';
-import { pick } from '@vendure/common/lib/pick';
-import ms = require('ms');
-
-import { PartialJobReporter } from '../../services/job.service';
-
-import { Job } from './job';
-
-/**
- * The JobManager is responsible for creating and monitoring {@link Job} instances.
- */
-export class JobManager {
-    private jobs = new Map<string, Job>();
-    /**
-     * Defines the maximum age of a completed/failed Job before it
-     * is removd when `clean()` is called.
-     */
-    private readonly maxAgeInMs = ms('1d');
-
-    constructor(maxAge?: string) {
-        if (maxAge) {
-            this.maxAgeInMs = ms(maxAge);
-        }
-    }
-
-    /**
-     * Creates a new {@link Job} instance with the given work function. When the function
-     * returns, the job will be completed. The return value is then available as the `result`
-     * property of the job. If the function throws, the job will fail and the `result` property
-     * will be the error thrown.
-     */
-    createJob(
-        name: string,
-        work: () => Promise<any>,
-        reporter: PartialJobReporter,
-        metadata?: Record<string, any>,
-    ): Job {
-        const job = new Job(name, work, reporter, metadata);
-        this.jobs.set(job.id, job);
-        return job;
-    }
-
-    getAll(input?: JobListInput): JobInfo[] {
-        return Array.from(this.jobs.values())
-            .map(this.toJobInfo)
-            .filter(job => {
-                if (input) {
-                    let match = false;
-                    if (input.state) {
-                        match = job.state === input.state;
-                    }
-                    if (input.ids) {
-                        match = input.ids.includes(job.id);
-                    }
-                    return match;
-                } else {
-                    return true;
-                }
-            });
-    }
-
-    getOne(jobId: string): JobInfo | null {
-        const job = this.jobs.get(jobId);
-        if (!job) {
-            return null;
-        }
-        return this.toJobInfo(job);
-    }
-
-    /**
-     * Removes all completed jobs which are older than the maxAge.
-     */
-    clean() {
-        const nowMs = +new Date();
-        Array.from(this.jobs.values()).forEach(job => {
-            if (job.ended) {
-                const delta = nowMs - +job.ended;
-                if (this.maxAgeInMs < delta) {
-                    this.jobs.delete(job.id);
-                }
-            }
-        });
-    }
-
-    findRunningJob(name: string): Job | undefined {
-        return Array.from(this.jobs.values()).find(
-            job => job.name === name && job.state === JobState.RUNNING,
-        );
-    }
-
-    private toJobInfo(job: Job): JobInfo {
-        const info = pick(job, ['id', 'name', 'state', 'progress', 'result', 'started', 'ended', 'metadata']);
-        const duration = job.ended ? +job.ended - +info.started : Date.now() - +info.started;
-        return { ...info, duration };
-    }
-}

+ 0 - 31
packages/core/src/service/helpers/job-manager/job.spec.ts

@@ -1,31 +0,0 @@
-import { PartialJobReporter } from '../../services/job.service';
-
-import { Job } from './job';
-
-describe('Job', () => {
-    it('does not run work more than once', () => {
-        let counter = 0;
-        const mockReporter: PartialJobReporter = {
-            complete: (result?: any) => {
-                /**/
-            },
-        };
-        const job = new Job(
-            'test',
-            () => {
-                counter++;
-                return new Promise(() => {
-                    /**/
-                });
-            },
-            mockReporter,
-        );
-        job.start();
-
-        expect(counter).toBe(1);
-
-        job.start();
-
-        expect(counter).toBe(1);
-    });
-});

+ 0 - 50
packages/core/src/service/helpers/job-manager/job.ts

@@ -1,50 +0,0 @@
-import { JobState } from '@vendure/common/lib/generated-types';
-
-import { generatePublicId } from '../../../common/generate-public-id';
-import { PartialJobReporter } from '../../services/job.service';
-
-/**
- * A Job represents a piece of work to be run in the background, i.e. outside the request-response cycle.
- * It is intended to be used for long-running work triggered by API requests (e.g. a mutation which
- * kicks off a re-build of the search index).
- */
-export class Job {
-    id: string;
-    state: JobState = JobState.PENDING;
-    progress = 0;
-    result: any = null;
-    started: Date;
-    ended: Date;
-
-    constructor(
-        public name: string,
-        public work: () => any | Promise<any>,
-        private reporter: PartialJobReporter,
-        public metadata: Record<string, any> = {},
-    ) {
-        this.id = generatePublicId();
-        this.started = new Date();
-    }
-
-    async start() {
-        if (this.state !== JobState.PENDING) {
-            return;
-        }
-        this.reporter.setProgress = (percentage: number) => {
-            this.progress = Math.max(Math.min(percentage, 100), 0);
-        };
-        let result: any;
-        try {
-            this.state = JobState.RUNNING;
-            result = await this.work();
-            this.progress = 100;
-            this.result = result;
-            this.state = JobState.COMPLETED;
-        } catch (e) {
-            this.state = JobState.FAILED;
-            this.result = e;
-        }
-        this.ended = new Date();
-        return this.result;
-    }
-}

+ 0 - 2
packages/core/src/service/index.ts

@@ -1,4 +1,3 @@
-export * from './helpers/job-manager/job';
 export * from './helpers/utils/translate-entity';
 export * from './helpers/utils/patch-entity';
 export * from './helpers/utils/channel-aware-orm-utils';
@@ -17,7 +16,6 @@ export * from './services/customer-group.service';
 export * from './services/facet.service';
 export * from './services/facet-value.service';
 export * from './services/global-settings.service';
-export * from './services/job.service';
 export * from './services/order.service';
 export * from './services/payment-method.service';
 export * from './services/product.service';

+ 2 - 3
packages/core/src/service/service.module.ts

@@ -4,6 +4,7 @@ import { TypeOrmModule } from '@nestjs/typeorm';
 import { ConfigModule } from '../config/config.module';
 import { ConfigService } from '../config/config.service';
 import { EventBusModule } from '../event-bus/event-bus.module';
+import { JobQueueModule } from '../job-queue/job-queue.module';
 import { WorkerServiceModule } from '../worker/worker-service.module';
 
 import { CollectionController } from './controllers/collection.controller';
@@ -32,7 +33,6 @@ import { FacetValueService } from './services/facet-value.service';
 import { FacetService } from './services/facet.service';
 import { GlobalSettingsService } from './services/global-settings.service';
 import { HistoryService } from './services/history.service';
-import { JobService } from './services/job.service';
 import { OrderTestingService } from './services/order-testing.service';
 import { OrderService } from './services/order.service';
 import { PaymentMethodService } from './services/payment-method.service';
@@ -63,7 +63,6 @@ const services = [
     FacetValueService,
     GlobalSettingsService,
     HistoryService,
-    JobService,
     OrderService,
     OrderTestingService,
     PaymentMethodService,
@@ -108,7 +107,7 @@ let workerTypeOrmModule: DynamicModule;
  * only run a single time.
  */
 @Module({
-    imports: [ConfigModule, EventBusModule, WorkerServiceModule],
+    imports: [ConfigModule, EventBusModule, WorkerServiceModule, JobQueueModule],
     providers: [...services, ...helpers],
     exports: [...services, ...helpers],
 })

+ 74 - 54
packages/core/src/service/services/collection.service.ts

@@ -16,7 +16,7 @@ import { merge } from 'rxjs';
 import { debounceTime } from 'rxjs/operators';
 import { Connection } from 'typeorm';
 
-import { RequestContext } from '../../api/common/request-context';
+import { RequestContext, SerializedRequestContext } from '../../api/common/request-context';
 import { configurableDefToOperation } from '../../common/configurable-operation';
 import { DEFAULT_LANGUAGE_CODE } from '../../common/constants';
 import { IllegalOperationError, UserInputError } from '../../common/error/errors';
@@ -36,6 +36,9 @@ import { EventBus } from '../../event-bus/event-bus';
 import { CollectionModificationEvent } from '../../event-bus/events/collection-modification-event';
 import { ProductEvent } from '../../event-bus/events/product-event';
 import { ProductVariantEvent } from '../../event-bus/events/product-variant-event';
+import { Job } from '../../job-queue/job';
+import { JobQueue } from '../../job-queue/job-queue';
+import { JobQueueService } from '../../job-queue/job-queue.service';
 import { WorkerService } from '../../worker/worker.service';
 import { ListQueryBuilder } from '../helpers/list-query-builder/list-query-builder';
 import { TranslatableSaver } from '../helpers/translatable-saver/translatable-saver';
@@ -43,12 +46,11 @@ import { findOneInChannel } from '../helpers/utils/channel-aware-orm-utils';
 import { getEntityOrThrow } from '../helpers/utils/get-entity-or-throw';
 import { moveToIndex } from '../helpers/utils/move-to-index';
 import { translateDeep } from '../helpers/utils/translate-entity';
-import { ApplyCollectionFiltersMessage } from '../types/collection-messages';
+import { ApplyCollectionFiletersJobData, ApplyCollectionFiltersMessage } from '../types/collection-messages';
 
 import { AssetService } from './asset.service';
 import { ChannelService } from './channel.service';
 import { FacetValueService } from './facet-value.service';
-import { JobService } from './job.service';
 
 export class CollectionService implements OnModuleInit {
     private rootCollection: Collection | undefined;
@@ -56,6 +58,7 @@ export class CollectionService implements OnModuleInit {
         facetValueCollectionFilter,
         variantNameCollectionFilter,
     ];
+    private applyFiltersQueue: JobQueue<ApplyCollectionFiletersJobData>;
 
     constructor(
         @InjectConnection() private connection: Connection,
@@ -66,7 +69,7 @@ export class CollectionService implements OnModuleInit {
         private translatableSaver: TranslatableSaver,
         private eventBus: EventBus,
         private workerService: WorkerService,
-        private jobService: JobService,
+        private jobQueueService: JobQueueService,
     ) {}
 
     onModuleInit() {
@@ -75,12 +78,24 @@ export class CollectionService implements OnModuleInit {
 
         merge(productEvents$, variantEvents$)
             .pipe(debounceTime(50))
-            .subscribe(async event => {
-                const collections = await this.connection.getRepository(Collection).find({
-                    relations: ['productVariants'],
+            .subscribe(async (event) => {
+                const collections = await this.connection.getRepository(Collection).find();
+                this.applyFiltersQueue.add({
+                    ctx: event.ctx.serialize(),
+                    collectionIds: collections.map((c) => c.id),
                 });
-                this.applyCollectionFilters(event.ctx, collections);
             });
+
+        this.applyFiltersQueue = this.jobQueueService.createQueue({
+            name: 'apply-collection-filters',
+            concurrency: 1,
+            process: async (job) => {
+                const collections = await this.connection
+                    .getRepository(Collection)
+                    .findByIds(job.data.collectionIds);
+                this.applyCollectionFilters(job.data.ctx, collections, job);
+            },
+        });
     }
 
     async findAll(
@@ -98,7 +113,7 @@ export class CollectionService implements OnModuleInit {
             })
             .getManyAndCount()
             .then(async ([collections, totalItems]) => {
-                const items = collections.map(collection =>
+                const items = collections.map((collection) =>
                     translateDeep(collection, ctx.languageCode, ['parent']),
                 );
                 return {
@@ -120,7 +135,7 @@ export class CollectionService implements OnModuleInit {
     }
 
     getAvailableFilters(ctx: RequestContext): ConfigurableOperationDefinition[] {
-        return this.availableFilters.map(x => configurableDefToOperation(ctx, x));
+        return this.availableFilters.map((x) => configurableDefToOperation(ctx, x));
     }
 
     async getParent(ctx: RequestContext, collectionId: ID): Promise<Collection | undefined> {
@@ -129,7 +144,7 @@ export class CollectionService implements OnModuleInit {
             .createQueryBuilder('collection')
             .leftJoinAndSelect('collection.translations', 'translation')
             .where(
-                qb =>
+                (qb) =>
                     `collection.id = ${qb
                         .subQuery()
                         .select('child.parentId')
@@ -178,7 +193,7 @@ export class CollectionService implements OnModuleInit {
         }
         const result = await qb.getMany();
 
-        return result.map(collection => translateDeep(collection, ctx.languageCode));
+        return result.map((collection) => translateDeep(collection, ctx.languageCode));
     }
 
     /**
@@ -204,7 +219,7 @@ export class CollectionService implements OnModuleInit {
         };
 
         const descendants = await getChildren(rootId);
-        return descendants.map(c => translateDeep(c, ctx.languageCode));
+        return descendants.map((c) => translateDeep(c, ctx.languageCode));
     }
 
     /**
@@ -236,9 +251,9 @@ export class CollectionService implements OnModuleInit {
 
         return this.connection
             .getRepository(Collection)
-            .findByIds(ancestors.map(c => c.id))
-            .then(categories => {
-                return ctx ? categories.map(c => translateDeep(c, ctx.languageCode)) : categories;
+            .findByIds(ancestors.map((c) => c.id))
+            .then((categories) => {
+                return ctx ? categories.map((c) => translateDeep(c, ctx.languageCode)) : categories;
             });
     }
 
@@ -247,7 +262,7 @@ export class CollectionService implements OnModuleInit {
             input,
             entityType: Collection,
             translationType: CollectionTranslation,
-            beforeSave: async coll => {
+            beforeSave: async (coll) => {
                 await this.channelService.assignToCurrentChannel(coll, ctx);
                 const parent = await this.getParentCollection(ctx, input.parentId);
                 if (parent) {
@@ -259,7 +274,10 @@ export class CollectionService implements OnModuleInit {
             },
         });
         await this.assetService.updateEntityAssets(collection, input);
-        this.applyCollectionFilters(ctx, [collection]);
+        this.applyFiltersQueue.add({
+            ctx: ctx.serialize(),
+            collectionIds: [collection.id],
+        });
         return assertFound(this.findOne(ctx, collection.id));
     }
 
@@ -268,7 +286,7 @@ export class CollectionService implements OnModuleInit {
             input,
             entityType: Collection,
             translationType: CollectionTranslation,
-            beforeSave: async coll => {
+            beforeSave: async (coll) => {
                 if (input.filters) {
                     coll.filters = this.getCollectionFiltersFromInput(input);
                 }
@@ -277,7 +295,10 @@ export class CollectionService implements OnModuleInit {
             },
         });
         if (input.filters) {
-            this.applyCollectionFilters(ctx, [collection]);
+            this.applyFiltersQueue.add({
+                ctx: ctx.serialize(),
+                collectionIds: [collection.id],
+            });
         }
         return assertFound(this.findOne(ctx, collection.id));
     }
@@ -309,7 +330,7 @@ export class CollectionService implements OnModuleInit {
 
         if (
             idsAreEqual(input.parentId, target.id) ||
-            descendants.some(cat => idsAreEqual(input.parentId, cat.id))
+            descendants.some((cat) => idsAreEqual(input.parentId, cat.id))
         ) {
             throw new IllegalOperationError(`error.cannot-move-collection-into-self`);
         }
@@ -328,7 +349,10 @@ export class CollectionService implements OnModuleInit {
         siblings = moveToIndex(input.index, target, siblings);
 
         await this.connection.getRepository(Collection).save(siblings);
-        await this.applyCollectionFilters(ctx, [target]);
+        this.applyFiltersQueue.add({
+            ctx: ctx.serialize(),
+            collectionIds: [target.id],
+        });
         return assertFound(this.findOne(ctx, input.collectionId));
     }
 
@@ -359,37 +383,33 @@ export class CollectionService implements OnModuleInit {
     /**
      * Applies the CollectionFilters and returns an array of all affected ProductVariant ids.
      */
-    private async applyCollectionFilters(ctx: RequestContext, collections: Collection[]): Promise<void> {
-        const collectionIds = collections.map(c => c.id);
-
-        const job = this.jobService.createJob({
-            name: 'apply-collection-filters',
-            metadata: { collectionIds },
-            singleInstance: false,
-            work: async reporter => {
-                Logger.verbose(`sending ApplyCollectionFiltersMessage message`);
-                this.workerService.send(new ApplyCollectionFiltersMessage({ collectionIds })).subscribe({
-                    next: ({ total, completed, duration, collectionId, affectedVariantIds }) => {
-                        const progress = Math.ceil((completed / total) * 100);
-                        const collection = collections.find(c => idsAreEqual(c.id, collectionId));
-                        if (collection) {
-                            this.eventBus.publish(
-                                new CollectionModificationEvent(ctx, collection, affectedVariantIds),
-                            );
-                        }
-                        reporter.setProgress(progress);
-                    },
-                    complete: () => {
-                        reporter.complete();
-                    },
-                    error: err => {
-                        Logger.error(err);
-                        reporter.complete();
-                    },
-                });
+    private async applyCollectionFilters(
+        ctx: SerializedRequestContext,
+        collections: Collection[],
+        job: Job<ApplyCollectionFiletersJobData>,
+    ): Promise<void> {
+        const collectionIds = collections.map((c) => c.id);
+        const requestContext = RequestContext.deserialize(ctx);
+
+        this.workerService.send(new ApplyCollectionFiltersMessage({ collectionIds })).subscribe({
+            next: ({ total, completed, duration, collectionId, affectedVariantIds }) => {
+                const progress = Math.ceil((completed / total) * 100);
+                const collection = collections.find((c) => idsAreEqual(c.id, collectionId));
+                if (collection) {
+                    this.eventBus.publish(
+                        new CollectionModificationEvent(requestContext, collection, affectedVariantIds),
+                    );
+                }
+                job.setProgress(progress);
+            },
+            complete: () => {
+                job.complete();
+            },
+            error: (err) => {
+                Logger.error(err);
+                job.fail(err);
             },
         });
-        await job.start();
     }
 
     /**
@@ -397,14 +417,14 @@ export class CollectionService implements OnModuleInit {
      */
     async getCollectionProductVariantIds(collection: Collection): Promise<ID[]> {
         if (collection.productVariants) {
-            return collection.productVariants.map(v => v.id);
+            return collection.productVariants.map((v) => v.id);
         } else {
             const productVariants = await this.connection
                 .getRepository(ProductVariant)
                 .createQueryBuilder('variant')
                 .innerJoin('variant.collections', 'collection', 'collection.id = :id', { id: collection.id })
                 .getMany();
-            return productVariants.map(v => v.id);
+            return productVariants.map((v) => v.id);
         }
     }
 
@@ -483,7 +503,7 @@ export class CollectionService implements OnModuleInit {
     }
 
     private getFilterByCode(code: string): CollectionFilter<any> {
-        const match = this.availableFilters.find(a => a.code === code);
+        const match = this.availableFilters.find((a) => a.code === code);
         if (!match) {
             throw new UserInputError(`error.adjustment-operation-with-code-not-found`, { code });
         }

+ 0 - 74
packages/core/src/service/services/job.service.ts

@@ -1,74 +0,0 @@
-import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
-import { JobInfo, JobListInput } from '@vendure/common/lib/generated-types';
-import ms = require('ms');
-
-import { Job } from '../helpers/job-manager/job';
-import { JobManager } from '../helpers/job-manager/job-manager';
-
-/**
- * The JobReporter allows a long-running job to update its progress during the
- * duration of the work function. This can then be used in the client application
- * to display a progress indication to the user.
- */
-export interface JobReporter {
-    setProgress(percentage: number): void;
-    complete(result?: any): void;
-}
-
-export type PartialJobReporter = Omit<JobReporter, 'setProgress'> & Partial<Pick<JobReporter, 'setProgress'>>;
-
-export interface CreateJobOptions {
-    name: string;
-    metadata?: any;
-    work: (reporter: JobReporter) => any | Promise<any>;
-    /** Limit this job to a single instance at a time */
-    singleInstance?: boolean;
-}
-
-@Injectable()
-export class JobService implements OnModuleInit, OnModuleDestroy {
-    private manager = new JobManager('1d');
-    private readonly cleanJobsInterval = ms('1d');
-    private cleanJobsTimer: NodeJS.Timeout;
-
-    onModuleInit() {
-        this.cleanJobsTimer = global.setInterval(() => this.manager.clean(), this.cleanJobsInterval);
-    }
-
-    onModuleDestroy() {
-        global.clearInterval(this.cleanJobsTimer);
-    }
-
-    createJob(options: CreateJobOptions): Job {
-        if (options.singleInstance === true) {
-            const runningInstance = this.manager.findRunningJob(options.name);
-            if (runningInstance) {
-                return runningInstance;
-            }
-        }
-        const reporter: PartialJobReporter = {
-            complete: (result: any) => {
-                /* empty */
-            },
-        };
-        const wrappedWork = () => {
-            return new Promise(async (resolve, reject) => {
-                reporter.complete = result => resolve(result);
-                try {
-                    const result = await options.work(reporter as JobReporter);
-                } catch (e) {
-                    reject(e);
-                }
-            });
-        };
-        return this.manager.createJob(options.name, wrappedWork, reporter, options.metadata);
-    }
-
-    getAll(input?: JobListInput): JobInfo[] {
-        return this.manager.getAll(input);
-    }
-
-    getOne(jobId: string): JobInfo | null {
-        return this.manager.getOne(jobId);
-    }
-}

+ 9 - 14
packages/core/src/service/services/search.service.ts

@@ -1,19 +1,13 @@
 import { Injectable } from '@nestjs/common';
-import { JobInfo, JobState } from '@vendure/common/lib/generated-types';
+import { JobState } from '@vendure/common/lib/generated-types';
 
 import { RequestContext } from '../../api/common/request-context';
 import { Logger } from '../../config/logger/vendure-logger';
+import { Job } from '../../job-queue/job';
 
 /**
- * This service should be overridden by a VendurePlugin which implements search.
- *
- * ```
- * defineProviders(): Provider[] {
- *     return [
- *         { provide: SearchService, useClass: MySearchService },
- *     ];
- * }
- * ```
+ * This service allows a concrete search service to override its behaviour
+ * by passing itself to the `adopt()` method.
  */
 @Injectable()
 export class SearchService {
@@ -27,18 +21,19 @@ export class SearchService {
         this.override = override;
     }
 
-    async reindex(ctx: RequestContext): Promise<JobInfo> {
+    async reindex(ctx: RequestContext): Promise<Job> {
         if (this.override) {
             return this.override.reindex(ctx);
         }
         if (!process.env.CI) {
             Logger.warn(`The SearchService should be overridden by an appropriate search plugin.`);
         }
-        return {
+        return new Job({
+            queueName: 'error',
+            data: {},
             id: 'error',
-            name: '',
             state: JobState.FAILED,
             progress: 0,
-        };
+        });
     }
 }

+ 3 - 0
packages/core/src/service/types/collection-messages.ts

@@ -1,5 +1,6 @@
 import { ID } from '@vendure/common/lib/shared-types';
 
+import { SerializedRequestContext } from '../../api/common/request-context';
 import { WorkerMessage } from '../../worker/types';
 
 export interface ProcessCollectionsResponse {
@@ -16,3 +17,5 @@ export class ApplyCollectionFiltersMessage extends WorkerMessage<
 > {
     static readonly pattern = 'ApplyCollectionFilters';
 }
+
+export type ApplyCollectionFiletersJobData = { ctx: SerializedRequestContext; collectionIds: ID[] };

+ 22 - 19
packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts

@@ -128,7 +128,7 @@ describe('Elasticsearch plugin', () => {
                 },
             },
         );
-        expect(result.search.items.map(i => i.productName)).toEqual([
+        expect(result.search.items.map((i) => i.productName)).toEqual([
             'Instant Camera',
             'Camera Lens',
             'SLR Camera',
@@ -148,7 +148,7 @@ describe('Elasticsearch plugin', () => {
                 },
             },
         );
-        expect(result.search.items.map(i => i.productName)).toEqual([
+        expect(result.search.items.map((i) => i.productName)).toEqual([
             'Clacky Keyboard',
             'Curvy Monitor',
             'Gaming PC',
@@ -168,7 +168,7 @@ describe('Elasticsearch plugin', () => {
                 },
             },
         );
-        expect(result.search.items.map(i => i.productName)).toEqual([
+        expect(result.search.items.map((i) => i.productName)).toEqual([
             'Spiky Cactus',
             'Orchid',
             'Bonsai Tree',
@@ -354,7 +354,7 @@ describe('Elasticsearch plugin', () => {
                     },
                 },
             );
-            expect(result.search.items.map(i => i.productVariantId)).toEqual(['T_1', 'T_2', 'T_4']);
+            expect(result.search.items.map((i) => i.productVariantId)).toEqual(['T_1', 'T_2', 'T_4']);
         });
 
         it('encodes collectionIds', async () => {
@@ -392,7 +392,7 @@ describe('Elasticsearch plugin', () => {
             it('updates index when ProductVariants are changed', async () => {
                 await awaitRunningJobs(adminClient);
                 const { search } = await doAdminSearchQuery({ term: 'drive', groupByProduct: false });
-                expect(search.items.map(i => i.sku)).toEqual([
+                expect(search.items.map((i) => i.sku)).toEqual([
                     'IHD455T1',
                     'IHD455T2',
                     'IHD455T3',
@@ -403,7 +403,7 @@ describe('Elasticsearch plugin', () => {
                 await adminClient.query<UpdateProductVariants.Mutation, UpdateProductVariants.Variables>(
                     UPDATE_PRODUCT_VARIANTS,
                     {
-                        input: search.items.map(i => ({
+                        input: search.items.map((i) => ({
                             id: i.productVariantId,
                             sku: i.sku + '_updated',
                         })),
@@ -416,7 +416,7 @@ describe('Elasticsearch plugin', () => {
                     groupByProduct: false,
                 });
 
-                expect(search2.items.map(i => i.sku)).toEqual([
+                expect(search2.items.map((i) => i.sku)).toEqual([
                     'IHD455T1_updated',
                     'IHD455T2_updated',
                     'IHD455T3_updated',
@@ -442,7 +442,7 @@ describe('Elasticsearch plugin', () => {
                     groupByProduct: false,
                 });
 
-                expect(search2.items.map(i => i.sku).sort()).toEqual([
+                expect(search2.items.map((i) => i.sku).sort()).toEqual([
                     'IHD455T2_updated',
                     'IHD455T3_updated',
                     'IHD455T4_updated',
@@ -459,7 +459,7 @@ describe('Elasticsearch plugin', () => {
                 });
                 await awaitRunningJobs(adminClient);
                 const result = await doAdminSearchQuery({ facetValueIds: ['T_2'], groupByProduct: true });
-                expect(result.search.items.map(i => i.productName).sort()).toEqual([
+                expect(result.search.items.map((i) => i.productName).sort()).toEqual([
                     'Clacky Keyboard',
                     'Curvy Monitor',
                     'Gaming PC',
@@ -470,7 +470,7 @@ describe('Elasticsearch plugin', () => {
 
             it('updates index when a Product is deleted', async () => {
                 const { search } = await doAdminSearchQuery({ facetValueIds: ['T_2'], groupByProduct: true });
-                expect(search.items.map(i => i.productId).sort()).toEqual([
+                expect(search.items.map((i) => i.productId).sort()).toEqual([
                     'T_2',
                     'T_3',
                     'T_4',
@@ -485,7 +485,7 @@ describe('Elasticsearch plugin', () => {
                     facetValueIds: ['T_2'],
                     groupByProduct: true,
                 });
-                expect(search2.items.map(i => i.productId).sort()).toEqual(['T_2', 'T_3', 'T_4', 'T_6']);
+                expect(search2.items.map((i) => i.productId).sort()).toEqual(['T_2', 'T_3', 'T_4', 'T_6']);
             });
 
             it('updates index when a Collection is changed', async () => {
@@ -517,7 +517,7 @@ describe('Elasticsearch plugin', () => {
                 await awaitRunningJobs(adminClient);
                 const result = await doAdminSearchQuery({ collectionId: 'T_2', groupByProduct: true });
 
-                expect(result.search.items.map(i => i.productName)).toEqual([
+                expect(result.search.items.map((i) => i.productName)).toEqual([
                     'Road Bike',
                     'Skipping Rope',
                     'Boxing Gloves',
@@ -565,7 +565,7 @@ describe('Elasticsearch plugin', () => {
                     collectionId: createCollection.id,
                     groupByProduct: true,
                 });
-                expect(result.search.items.map(i => i.productName)).toEqual([
+                expect(result.search.items.map((i) => i.productName)).toEqual([
                     'Instant Camera',
                     'Camera Lens',
                     'Tripod',
@@ -644,7 +644,7 @@ describe('Elasticsearch plugin', () => {
                     groupByProduct: false,
                 });
 
-                const variantToDelete = s1.items.find(i => i.sku === 'IHD455T2_updated')!;
+                const variantToDelete = s1.items.find((i) => i.sku === 'IHD455T2_updated')!;
 
                 const { deleteProductVariant } = await adminClient.query<
                     DeleteProductVariant.Mutation,
@@ -677,7 +677,10 @@ describe('Elasticsearch plugin', () => {
                 await adminClient.query<UpdateProductVariants.Mutation, UpdateProductVariants.Variables>(
                     UPDATE_PRODUCT_VARIANTS,
                     {
-                        input: [{ id: 'T_1', enabled: false }, { id: 'T_2', enabled: false }],
+                        input: [
+                            { id: 'T_1', enabled: false },
+                            { id: 'T_2', enabled: false },
+                        ],
                     },
                 );
                 await awaitRunningJobs(adminClient);
@@ -750,7 +753,7 @@ describe('Elasticsearch plugin', () => {
 
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
                 const { search } = await doAdminSearchQuery({ groupByProduct: true });
-                expect(search.items.map(i => i.productId).sort()).toEqual(['T_1', 'T_2']);
+                expect(search.items.map((i) => i.productId).sort()).toEqual(['T_1', 'T_2']);
             });
 
             it('removing product from channel', async () => {
@@ -768,7 +771,7 @@ describe('Elasticsearch plugin', () => {
 
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
                 const { search } = await doAdminSearchQuery({ groupByProduct: true });
-                expect(search.items.map(i => i.productId)).toEqual(['T_1']);
+                expect(search.items.map((i) => i.productId)).toEqual(['T_1']);
             });
 
             it('reindexes in channel', async () => {
@@ -784,7 +787,7 @@ describe('Elasticsearch plugin', () => {
                 expect(job!.state).toBe(JobState.COMPLETED);
 
                 const { search } = await doAdminSearchQuery({ groupByProduct: true });
-                expect(search.items.map(i => i.productId).sort()).toEqual(['T_1']);
+                expect(search.items.map((i) => i.productId).sort()).toEqual(['T_1']);
             });
         });
     });
@@ -880,7 +883,7 @@ const REINDEX = gql`
 `;
 
 const GET_JOB_INFO = gql`
-    query GetJobInfo($id: String!) {
+    query GetJobInfo($id: ID!) {
         job(jobId: $id) {
             id
             name

+ 52 - 23
packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts

@@ -1262,28 +1262,58 @@ export type IntCustomFieldConfig = CustomField & {
     step?: Maybe<Scalars['Int']>;
 };
 
-export type JobInfo = {
-    __typename?: 'JobInfo';
-    id: Scalars['String'];
+export type Job = Node & {
+    __typename?: 'Job';
+    id: Scalars['ID'];
     name: Scalars['String'];
     state: JobState;
     progress: Scalars['Float'];
     metadata?: Maybe<Scalars['JSON']>;
     result?: Maybe<Scalars['JSON']>;
-    started?: Maybe<Scalars['DateTime']>;
-    ended?: Maybe<Scalars['DateTime']>;
-    duration?: Maybe<Scalars['Int']>;
+    error?: Maybe<Scalars['JSON']>;
+    started: Scalars['DateTime'];
+    settled?: Maybe<Scalars['DateTime']>;
+    isSettled: Scalars['Boolean'];
+    duration: Scalars['Int'];
+};
+
+export type JobFilterParameter = {
+    name?: Maybe<StringOperators>;
+    state?: Maybe<StringOperators>;
+    progress?: Maybe<NumberOperators>;
+    started?: Maybe<DateOperators>;
+    settled?: Maybe<DateOperators>;
+    isSettled?: Maybe<BooleanOperators>;
+    duration?: Maybe<NumberOperators>;
+};
+
+export type JobList = PaginatedList & {
+    __typename?: 'JobList';
+    items: Array<Job>;
+    totalItems: Scalars['Int'];
+};
+
+export type JobListOptions = {
+    skip?: Maybe<Scalars['Int']>;
+    take?: Maybe<Scalars['Int']>;
+    sort?: Maybe<JobSortParameter>;
+    filter?: Maybe<JobFilterParameter>;
 };
 
-export type JobListInput = {
-    state?: Maybe<JobState>;
-    ids?: Maybe<Array<Scalars['String']>>;
+export type JobSortParameter = {
+    id?: Maybe<SortOrder>;
+    name?: Maybe<SortOrder>;
+    progress?: Maybe<SortOrder>;
+    started?: Maybe<SortOrder>;
+    settled?: Maybe<SortOrder>;
+    duration?: Maybe<SortOrder>;
 };
 
 export enum JobState {
     PENDING = 'PENDING',
     RUNNING = 'RUNNING',
     COMPLETED = 'COMPLETED',
+    RETRYING = 'RETRYING',
     FAILED = 'FAILED',
 }
 
@@ -1776,7 +1806,7 @@ export type Mutation = {
     createProductOption: ProductOption;
     /** Create a new ProductOption within a ProductOptionGroup */
     updateProductOption: ProductOption;
-    reindex: JobInfo;
+    reindex: Job;
     /** Create a new Product */
     createProduct: Product;
     /** Update an existing Product */
@@ -2695,8 +2725,9 @@ export type Query = {
     facets: FacetList;
     facet?: Maybe<Facet>;
     globalSettings: GlobalSettings;
-    job?: Maybe<JobInfo>;
-    jobs: Array<JobInfo>;
+    job?: Maybe<Job>;
+    jobs: JobList;
+    jobsById: Array<Job>;
     order?: Maybe<Order>;
     orders: OrderList;
     paymentMethods: PaymentMethodList;
@@ -2784,11 +2815,15 @@ export type QueryFacetArgs = {
 };
 
 export type QueryJobArgs = {
-    jobId: Scalars['String'];
+    jobId: Scalars['ID'];
 };
 
 export type QueryJobsArgs = {
-    input?: Maybe<JobListInput>;
+    input?: Maybe<JobListOptions>;
+};
+
+export type QueryJobsByIdArgs = {
+    jobIds: Array<Scalars['ID']>;
 };
 
 export type QueryOrderArgs = {
@@ -3541,22 +3576,16 @@ export type SearchGetPricesQuery = { __typename?: 'Query' } & {
 export type ReindexMutationVariables = {};
 
 export type ReindexMutation = { __typename?: 'Mutation' } & {
-    reindex: { __typename?: 'JobInfo' } & Pick<
-        JobInfo,
-        'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'
-    >;
+    reindex: { __typename?: 'Job' } & Pick<Job, 'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'>;
 };
 
 export type GetJobInfoQueryVariables = {
-    id: Scalars['String'];
+    id: Scalars['ID'];
 };
 
 export type GetJobInfoQuery = { __typename?: 'Query' } & {
     job?: Maybe<
-        { __typename?: 'JobInfo' } & Pick<
-            JobInfo,
-            'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'
-        >
+        { __typename?: 'Job' } & Pick<Job, 'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'>
     >;
 };
 

Разница между файлами не показана из-за своего большого размера
+ 0 - 0
schema-admin.json


Некоторые файлы не были показаны из-за большого количества измененных файлов