Bladeren bron

feat(core): Add DB-based persistence for JobQueue

Relates to #282

BREAKING CHANGE: A new JobRecord entity has been added, so a DB migration will be needed.
Michael Bromley 5 jaren geleden
bovenliggende
commit
a61df9332b
31 gewijzigde bestanden met toevoegingen van 510 en 131 verwijderingen
  1. 8 5
      e2e-common/test-config.ts
  2. 9 6
      packages/admin-ui/src/lib/core/src/common/generated-types.ts
  3. 2 2
      packages/admin-ui/src/lib/core/src/data/definitions/settings-definitions.ts
  4. 8 5
      packages/asset-server-plugin/e2e/graphql/generated-e2e-asset-server-plugin-types.ts
  5. 8 5
      packages/common/src/generated-types.ts
  6. 2 0
      packages/core/e2e/collection.e2e-spec.ts
  7. 23 16
      packages/core/e2e/default-search-plugin.e2e-spec.ts
  8. 16 9
      packages/core/e2e/graphql/generated-e2e-admin-types.ts
  9. 4 3
      packages/core/e2e/graphql/shared-definitions.ts
  10. 13 2
      packages/core/e2e/utils/await-running-jobs.ts
  11. 1 1
      packages/core/src/api/resolvers/admin/job.resolver.ts
  12. 4 3
      packages/core/src/api/schema/admin-api/job.api.graphql
  13. 29 5
      packages/core/src/app.module.ts
  14. 2 3
      packages/core/src/config/default-config.ts
  15. 54 2
      packages/core/src/config/job-queue/job-queue-strategy.ts
  16. 2 0
      packages/core/src/entity/entities.ts
  17. 54 0
      packages/core/src/entity/job-record/job-record.entity.ts
  18. 8 3
      packages/core/src/job-queue/job-queue.service.spec.ts
  19. 26 14
      packages/core/src/job-queue/job-queue.service.ts
  20. 27 7
      packages/core/src/job-queue/job-queue.ts
  21. 6 2
      packages/core/src/job-queue/job.ts
  22. 114 14
      packages/core/src/job-queue/sql-job-queue-strategy.ts
  23. 0 3
      packages/core/src/plugin/plugin-common.module.ts
  24. 22 0
      packages/core/src/process-context/process-context.module.ts
  25. 30 0
      packages/core/src/process-context/process-context.ts
  26. 14 10
      packages/core/src/service/controllers/collection.controller.ts
  27. 8 1
      packages/core/src/worker/worker.module.ts
  28. 1 1
      packages/dev-server/dev-config.ts
  29. 2 2
      packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts
  30. 13 7
      packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts
  31. 0 0
      schema-admin.json

+ 8 - 5
e2e-common/test-config.ts

@@ -1,10 +1,14 @@
 import { mergeConfig } from '@vendure/core';
-import { MysqlInitializer, PostgresInitializer, registerInitializer, SqljsInitializer, testConfig as defaultTestConfig } from '@vendure/testing';
+import {
+    MysqlInitializer,
+    PostgresInitializer,
+    registerInitializer,
+    SqljsInitializer,
+    testConfig as defaultTestConfig,
+} from '@vendure/testing';
 import path from 'path';
 import { ConnectionOptions } from 'typeorm';
 
-import { TestingJobQueueStrategy } from '../packages/core/src/job-queue/testing-job-queue-strategy';
-
 import { getPackageDir } from './get-package-dir';
 
 /**
@@ -35,8 +39,7 @@ export const testConfig = mergeConfig(defaultTestConfig, {
         importAssetsDir: path.join(packageDir, 'fixtures/assets'),
     },
     jobQueueOptions: {
-        jobQueueStrategy: new TestingJobQueueStrategy(),
-        pollInterval: 10,
+        pollInterval: 100,
     },
     dbConnectionOptions: getDbConfig(),
 });

+ 9 - 6
packages/admin-ui/src/lib/core/src/common/generated-types.ts

@@ -1269,10 +1269,11 @@ export type IntCustomFieldConfig = CustomField & {
 export type Job = Node & {
    __typename?: 'Job';
   id: Scalars['ID'];
-  name: Scalars['String'];
+  createdAt: Scalars['DateTime'];
+  queueName: Scalars['String'];
   state: JobState;
   progress: Scalars['Float'];
-  metadata?: Maybe<Scalars['JSON']>;
+  data?: Maybe<Scalars['JSON']>;
   result?: Maybe<Scalars['JSON']>;
   error?: Maybe<Scalars['JSON']>;
   started: Scalars['DateTime'];
@@ -1282,7 +1283,8 @@ export type Job = Node & {
 };
 
 export type JobFilterParameter = {
-  name?: Maybe<StringOperators>;
+  createdAt?: Maybe<DateOperators>;
+  queueName?: Maybe<StringOperators>;
   state?: Maybe<StringOperators>;
   progress?: Maybe<NumberOperators>;
   started?: Maybe<DateOperators>;
@@ -1306,7 +1308,8 @@ export type JobListOptions = {
 
 export type JobSortParameter = {
   id?: Maybe<SortOrder>;
-  name?: Maybe<SortOrder>;
+  createdAt?: Maybe<SortOrder>;
+  queueName?: Maybe<SortOrder>;
   progress?: Maybe<SortOrder>;
   started?: Maybe<SortOrder>;
   settled?: Maybe<SortOrder>;
@@ -2952,7 +2955,7 @@ export type QueryJobArgs = {
 
 
 export type QueryJobsArgs = {
-  input?: Maybe<JobListOptions>;
+  options?: Maybe<JobListOptions>;
 };
 
 
@@ -6094,7 +6097,7 @@ export type GetServerConfigQuery = (
 
 export type JobInfoFragment = (
   { __typename?: 'Job' }
-  & Pick<Job, 'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'>
+  & Pick<Job, 'id' | 'queueName' | 'state' | 'progress' | 'duration' | 'result'>
 );
 
 export type GetJobInfoQueryVariables = {

+ 2 - 2
packages/admin-ui/src/lib/core/src/data/definitions/settings-definitions.ts

@@ -573,7 +573,7 @@ export const GET_SERVER_CONFIG = gql`
 export const JOB_INFO_FRAGMENT = gql`
     fragment JobInfo on Job {
         id
-        name
+        queueName
         state
         progress
         duration
@@ -592,7 +592,7 @@ export const GET_JOB_INFO = gql`
 
 export const GET_ALL_JOBS = gql`
     query GetAllJobs($input: JobListOptions) {
-        jobs(input: $input) {
+        jobs(options: $input) {
             items {
                 ...JobInfo
             }

+ 8 - 5
packages/asset-server-plugin/e2e/graphql/generated-e2e-asset-server-plugin-types.ts

@@ -1265,10 +1265,11 @@ export type IntCustomFieldConfig = CustomField & {
 export type Job = Node & {
     __typename?: 'Job';
     id: Scalars['ID'];
-    name: Scalars['String'];
+    createdAt: Scalars['DateTime'];
+    queueName: Scalars['String'];
     state: JobState;
     progress: Scalars['Float'];
-    metadata?: Maybe<Scalars['JSON']>;
+    data?: Maybe<Scalars['JSON']>;
     result?: Maybe<Scalars['JSON']>;
     error?: Maybe<Scalars['JSON']>;
     started: Scalars['DateTime'];
@@ -1278,7 +1279,8 @@ export type Job = Node & {
 };
 
 export type JobFilterParameter = {
-    name?: Maybe<StringOperators>;
+    createdAt?: Maybe<DateOperators>;
+    queueName?: Maybe<StringOperators>;
     state?: Maybe<StringOperators>;
     progress?: Maybe<NumberOperators>;
     started?: Maybe<DateOperators>;
@@ -1302,7 +1304,8 @@ export type JobListOptions = {
 
 export type JobSortParameter = {
     id?: Maybe<SortOrder>;
-    name?: Maybe<SortOrder>;
+    createdAt?: Maybe<SortOrder>;
+    queueName?: Maybe<SortOrder>;
     progress?: Maybe<SortOrder>;
     started?: Maybe<SortOrder>;
     settled?: Maybe<SortOrder>;
@@ -2819,7 +2822,7 @@ export type QueryJobArgs = {
 };
 
 export type QueryJobsArgs = {
-    input?: Maybe<JobListOptions>;
+    options?: Maybe<JobListOptions>;
 };
 
 export type QueryJobsByIdArgs = {

+ 8 - 5
packages/common/src/generated-types.ts

@@ -1261,10 +1261,11 @@ export type IntCustomFieldConfig = CustomField & {
 export type Job = Node & {
    __typename?: 'Job';
   id: Scalars['ID'];
-  name: Scalars['String'];
+  createdAt: Scalars['DateTime'];
+  queueName: Scalars['String'];
   state: JobState;
   progress: Scalars['Float'];
-  metadata?: Maybe<Scalars['JSON']>;
+  data?: Maybe<Scalars['JSON']>;
   result?: Maybe<Scalars['JSON']>;
   error?: Maybe<Scalars['JSON']>;
   started: Scalars['DateTime'];
@@ -1274,7 +1275,8 @@ export type Job = Node & {
 };
 
 export type JobFilterParameter = {
-  name?: Maybe<StringOperators>;
+  createdAt?: Maybe<DateOperators>;
+  queueName?: Maybe<StringOperators>;
   state?: Maybe<StringOperators>;
   progress?: Maybe<NumberOperators>;
   started?: Maybe<DateOperators>;
@@ -1298,7 +1300,8 @@ export type JobListOptions = {
 
 export type JobSortParameter = {
   id?: Maybe<SortOrder>;
-  name?: Maybe<SortOrder>;
+  createdAt?: Maybe<SortOrder>;
+  queueName?: Maybe<SortOrder>;
   progress?: Maybe<SortOrder>;
   started?: Maybe<SortOrder>;
   settled?: Maybe<SortOrder>;
@@ -2909,7 +2912,7 @@ export type QueryJobArgs = {
 
 
 export type QueryJobsArgs = {
-  input?: Maybe<JobListOptions>;
+  options?: Maybe<JobListOptions>;
 };
 
 

+ 2 - 0
packages/core/e2e/collection.e2e-spec.ts

@@ -346,6 +346,8 @@ describe('Collection resolver', () => {
         });
 
         it('re-evaluates Collection contents on move', async () => {
+            await awaitRunningJobs(adminClient);
+
             const result = await adminClient.query<
                 GetCollectionProducts.Query,
                 GetCollectionProducts.Variables

+ 23 - 16
packages/core/e2e/default-search-plugin.e2e-spec.ts

@@ -108,7 +108,7 @@ describe('Default search plugin', () => {
                 },
             },
         );
-        expect(result.search.items.map(i => i.productName)).toEqual([
+        expect(result.search.items.map((i) => i.productName)).toEqual([
             'Camera Lens',
             'Instant Camera',
             'Slr Camera',
@@ -125,7 +125,7 @@ describe('Default search plugin', () => {
                 },
             },
         );
-        expect(result.search.items.map(i => i.productName)).toEqual([
+        expect(result.search.items.map((i) => i.productName)).toEqual([
             'Laptop',
             'Curvy Monitor',
             'Gaming PC',
@@ -145,7 +145,7 @@ describe('Default search plugin', () => {
                 },
             },
         );
-        expect(result.search.items.map(i => i.productName)).toEqual([
+        expect(result.search.items.map((i) => i.productName)).toEqual([
             'Spiky Cactus',
             'Orchid',
             'Bonsai Tree',
@@ -335,7 +335,7 @@ describe('Default search plugin', () => {
                     },
                 },
             );
-            expect(result.search.items.map(i => i.productVariantId)).toEqual(['T_1', 'T_2', 'T_4']);
+            expect(result.search.items.map((i) => i.productVariantId)).toEqual(['T_1', 'T_2', 'T_4']);
         });
 
         it('encodes collectionIds', async () => {
@@ -373,7 +373,7 @@ describe('Default search plugin', () => {
             it('updates index when ProductVariants are changed', async () => {
                 await awaitRunningJobs(adminClient);
                 const { search } = await doAdminSearchQuery({ term: 'drive', groupByProduct: false });
-                expect(search.items.map(i => i.sku)).toEqual([
+                expect(search.items.map((i) => i.sku)).toEqual([
                     'IHD455T1',
                     'IHD455T2',
                     'IHD455T3',
@@ -384,7 +384,7 @@ describe('Default search plugin', () => {
                 await adminClient.query<UpdateProductVariants.Mutation, UpdateProductVariants.Variables>(
                     UPDATE_PRODUCT_VARIANTS,
                     {
-                        input: search.items.map(i => ({
+                        input: search.items.map((i) => ({
                             id: i.productVariantId,
                             sku: i.sku + '_updated',
                         })),
@@ -397,7 +397,7 @@ describe('Default search plugin', () => {
                     groupByProduct: false,
                 });
 
-                expect(search2.items.map(i => i.sku)).toEqual([
+                expect(search2.items.map((i) => i.sku)).toEqual([
                     'IHD455T1_updated',
                     'IHD455T2_updated',
                     'IHD455T3_updated',
@@ -423,7 +423,7 @@ describe('Default search plugin', () => {
                     groupByProduct: false,
                 });
 
-                expect(search2.items.map(i => i.sku)).toEqual([
+                expect(search2.items.map((i) => i.sku)).toEqual([
                     'IHD455T2_updated',
                     'IHD455T3_updated',
                     'IHD455T4_updated',
@@ -440,7 +440,7 @@ describe('Default search plugin', () => {
                 });
                 await awaitRunningJobs(adminClient);
                 const result = await doAdminSearchQuery({ facetValueIds: ['T_2'], groupByProduct: true });
-                expect(result.search.items.map(i => i.productName)).toEqual([
+                expect(result.search.items.map((i) => i.productName)).toEqual([
                     'Curvy Monitor',
                     'Gaming PC',
                     'Hard Drive',
@@ -451,7 +451,7 @@ describe('Default search plugin', () => {
 
             it('updates index when a Product is deleted', async () => {
                 const { search } = await doAdminSearchQuery({ facetValueIds: ['T_2'], groupByProduct: true });
-                expect(search.items.map(i => i.productId)).toEqual(['T_2', 'T_3', 'T_4', 'T_5', 'T_6']);
+                expect(search.items.map((i) => i.productId)).toEqual(['T_2', 'T_3', 'T_4', 'T_5', 'T_6']);
                 await adminClient.query<DeleteProduct.Mutation, DeleteProduct.Variables>(DELETE_PRODUCT, {
                     id: 'T_5',
                 });
@@ -460,7 +460,7 @@ describe('Default search plugin', () => {
                     facetValueIds: ['T_2'],
                     groupByProduct: true,
                 });
-                expect(search2.items.map(i => i.productId)).toEqual(['T_2', 'T_3', 'T_4', 'T_6']);
+                expect(search2.items.map((i) => i.productId)).toEqual(['T_2', 'T_3', 'T_4', 'T_6']);
             });
 
             it('updates index when a Collection is changed', async () => {
@@ -490,9 +490,11 @@ describe('Default search plugin', () => {
                     },
                 );
                 await awaitRunningJobs(adminClient);
+                // add an additional check for the collection filters to update
+                await awaitRunningJobs(adminClient);
                 const result = await doAdminSearchQuery({ collectionId: 'T_2', groupByProduct: true });
 
-                expect(result.search.items.map(i => i.productName)).toEqual([
+                expect(result.search.items.map((i) => i.productName)).toEqual([
                     'Road Bike',
                     'Skipping Rope',
                     'Boxing Gloves',
@@ -536,11 +538,13 @@ describe('Default search plugin', () => {
                     },
                 });
                 await awaitRunningJobs(adminClient);
+                // add an additional check for the collection filters to update
+                await awaitRunningJobs(adminClient);
                 const result = await doAdminSearchQuery({
                     collectionId: createCollection.id,
                     groupByProduct: true,
                 });
-                expect(result.search.items.map(i => i.productName)).toEqual([
+                expect(result.search.items.map((i) => i.productName)).toEqual([
                     'Instant Camera',
                     'Camera Lens',
                     'Tripod',
@@ -646,7 +650,10 @@ describe('Default search plugin', () => {
                 await adminClient.query<UpdateProductVariants.Mutation, UpdateProductVariants.Variables>(
                     UPDATE_PRODUCT_VARIANTS,
                     {
-                        input: [{ id: 'T_1', enabled: false }, { id: 'T_2', enabled: false }],
+                        input: [
+                            { id: 'T_1', enabled: false },
+                            { id: 'T_2', enabled: false },
+                        ],
                     },
                 );
                 await awaitRunningJobs(adminClient);
@@ -725,7 +732,7 @@ describe('Default search plugin', () => {
 
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
                 const { search } = await doAdminSearchQuery({ groupByProduct: true });
-                expect(search.items.map(i => i.productId)).toEqual(['T_1', 'T_2']);
+                expect(search.items.map((i) => i.productId)).toEqual(['T_1', 'T_2']);
             }, 10000);
 
             it('removing product from channel', async () => {
@@ -743,7 +750,7 @@ describe('Default search plugin', () => {
 
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
                 const { search } = await doAdminSearchQuery({ groupByProduct: true });
-                expect(search.items.map(i => i.productId)).toEqual(['T_1']);
+                expect(search.items.map((i) => i.productId)).toEqual(['T_1']);
             }, 10000);
         });
     });

+ 16 - 9
packages/core/e2e/graphql/generated-e2e-admin-types.ts

@@ -1265,10 +1265,11 @@ export type IntCustomFieldConfig = CustomField & {
 export type Job = Node & {
     __typename?: 'Job';
     id: Scalars['ID'];
-    name: Scalars['String'];
+    createdAt: Scalars['DateTime'];
+    queueName: Scalars['String'];
     state: JobState;
     progress: Scalars['Float'];
-    metadata?: Maybe<Scalars['JSON']>;
+    data?: Maybe<Scalars['JSON']>;
     result?: Maybe<Scalars['JSON']>;
     error?: Maybe<Scalars['JSON']>;
     started: Scalars['DateTime'];
@@ -1278,7 +1279,8 @@ export type Job = Node & {
 };
 
 export type JobFilterParameter = {
-    name?: Maybe<StringOperators>;
+    createdAt?: Maybe<DateOperators>;
+    queueName?: Maybe<StringOperators>;
     state?: Maybe<StringOperators>;
     progress?: Maybe<NumberOperators>;
     started?: Maybe<DateOperators>;
@@ -1302,7 +1304,8 @@ export type JobListOptions = {
 
 export type JobSortParameter = {
     id?: Maybe<SortOrder>;
-    name?: Maybe<SortOrder>;
+    createdAt?: Maybe<SortOrder>;
+    queueName?: Maybe<SortOrder>;
     progress?: Maybe<SortOrder>;
     started?: Maybe<SortOrder>;
     settled?: Maybe<SortOrder>;
@@ -2819,7 +2822,7 @@ export type QueryJobArgs = {
 };
 
 export type QueryJobsArgs = {
-    input?: Maybe<JobListOptions>;
+    options?: Maybe<JobListOptions>;
 };
 
 export type QueryJobsByIdArgs = {
@@ -4573,12 +4576,16 @@ export type GetStockMovementQuery = { __typename?: 'Query' } & {
     >;
 };
 
-export type GetRunningJobsQueryVariables = {};
+export type GetRunningJobsQueryVariables = {
+    options?: Maybe<JobListOptions>;
+};
 
 export type GetRunningJobsQuery = { __typename?: 'Query' } & {
-    jobs: { __typename?: 'JobList' } & {
-        items: Array<{ __typename?: 'Job' } & Pick<Job, 'id' | 'name' | 'state' | 'isSettled' | 'duration'>>;
-    };
+    jobs: { __typename?: 'JobList' } & Pick<JobList, 'totalItems'> & {
+            items: Array<
+                { __typename?: 'Job' } & Pick<Job, 'id' | 'queueName' | 'state' | 'isSettled' | 'duration'>
+            >;
+        };
 };
 
 export type CreatePromotionMutationVariables = {

+ 4 - 3
packages/core/e2e/graphql/shared-definitions.ts

@@ -268,15 +268,16 @@ export const GET_STOCK_MOVEMENT = gql`
     ${VARIANT_WITH_STOCK_FRAGMENT}
 `;
 export const GET_RUNNING_JOBS = gql`
-    query GetRunningJobs {
-        jobs {
+    query GetRunningJobs($options: JobListOptions) {
+        jobs(options: $options) {
             items {
                 id
-                name
+                queueName
                 state
                 isSettled
                 duration
             }
+            totalItems
         }
     }
 `;

+ 13 - 2
packages/core/e2e/utils/await-running-jobs.ts

@@ -15,8 +15,19 @@ export async function awaitRunningJobs(adminClient: SimpleGraphQLClient, timeout
     // e.g. event debouncing is used before triggering the job.
     await new Promise((resolve) => setTimeout(resolve, 100));
     do {
-        const { jobs } = await adminClient.query<GetRunningJobs.Query>(GET_RUNNING_JOBS);
-        runningJobs = jobs.items.filter((job) => !job.isSettled).length;
+        const { jobs } = await adminClient.query<GetRunningJobs.Query, GetRunningJobs.Variables>(
+            GET_RUNNING_JOBS,
+            {
+                options: {
+                    filter: {
+                        isSettled: {
+                            eq: false,
+                        },
+                    },
+                },
+            },
+        );
+        runningJobs = jobs.totalItems;
         timedOut = timeout < +new Date() - startTime;
     } while (runningJobs > 0 && !timedOut);
 }

+ 1 - 1
packages/core/src/api/resolvers/admin/job.resolver.ts

@@ -22,7 +22,7 @@ export class JobResolver {
     @Query()
     @Allow(Permission.Authenticated)
     jobs(@Args() args: QueryJobsArgs) {
-        return this.jobService.getJobs(args.input || undefined);
+        return this.jobService.getJobs(args.options || undefined);
     }
 
     @Query()

+ 4 - 3
packages/core/src/api/schema/admin-api/job.api.graphql

@@ -1,6 +1,6 @@
 type Query {
     job(jobId: ID!): Job
-    jobs(input: JobListOptions): JobList!
+    jobs(options: JobListOptions): JobList!
     jobsById(jobIds: [ID!]!): [Job!]!
 }
 
@@ -21,10 +21,11 @@ type JobList implements PaginatedList {
 
 type Job implements Node {
     id: ID!
-    name: String!
+    createdAt: DateTime!
+    queueName: String!
     state: JobState!
     progress: Float!
-    metadata: JSON
+    data: JSON
     result: JSON
     error: JSON
     started: DateTime!

+ 29 - 5
packages/core/src/app.module.ts

@@ -1,4 +1,12 @@
-import { MiddlewareConsumer, Module, NestModule, OnApplicationShutdown } from '@nestjs/common';
+import {
+    MiddlewareConsumer,
+    Module,
+    NestModule,
+    OnApplicationBootstrap,
+    OnApplicationShutdown,
+    OnModuleInit,
+} from '@nestjs/common';
+import { ModuleRef } from '@nestjs/core';
 import cookieSession = require('cookie-session');
 import { RequestHandler } from 'express';
 
@@ -9,12 +17,24 @@ import { Logger } from './config/logger/vendure-logger';
 import { I18nModule } from './i18n/i18n.module';
 import { I18nService } from './i18n/i18n.service';
 import { PluginModule } from './plugin/plugin.module';
+import { ProcessContextModule } from './process-context/process-context.module';
 
 @Module({
-    imports: [ConfigModule, I18nModule, ApiModule, PluginModule.forRoot()],
+    imports: [ConfigModule, I18nModule, ApiModule, PluginModule.forRoot(), ProcessContextModule.forRoot()],
 })
-export class AppModule implements NestModule, OnApplicationShutdown {
-    constructor(private configService: ConfigService, private i18nService: I18nService) {}
+export class AppModule implements NestModule, OnApplicationBootstrap, OnApplicationShutdown {
+    constructor(
+        private configService: ConfigService,
+        private i18nService: I18nService,
+        private moduleRef: ModuleRef,
+    ) {}
+
+    async onApplicationBootstrap() {
+        const { jobQueueStrategy } = this.configService.jobQueueOptions;
+        if (typeof jobQueueStrategy.init === 'function') {
+            await jobQueueStrategy.init(this.moduleRef);
+        }
+    }
 
     configure(consumer: MiddlewareConsumer) {
         const { adminApiPath, shopApiPath } = this.configService;
@@ -39,7 +59,11 @@ export class AppModule implements NestModule, OnApplicationShutdown {
         }
     }
 
-    onApplicationShutdown(signal?: string) {
+    async onApplicationShutdown(signal?: string) {
+        const { jobQueueStrategy } = this.configService.jobQueueOptions;
+        if (typeof jobQueueStrategy.destroy === 'function') {
+            await jobQueueStrategy.destroy();
+        }
         if (signal) {
             Logger.info('Received shutdown signal:' + signal);
         }

+ 2 - 3
packages/core/src/config/default-config.ts

@@ -4,7 +4,6 @@ import { DEFAULT_AUTH_TOKEN_HEADER_KEY } from '@vendure/common/lib/shared-consta
 
 import { generatePublicId } from '../common/generate-public-id';
 import { SqlJobQueueStrategy } from '../job-queue/sql-job-queue-strategy';
-import { TestingJobQueueStrategy } from '../job-queue/testing-job-queue-strategy';
 
 import { DefaultAssetNamingStrategy } from './asset-naming-strategy/default-asset-naming-strategy';
 import { NoAssetPreviewStrategy } from './asset-preview-strategy/no-asset-preview-strategy';
@@ -95,8 +94,8 @@ export const defaultConfig: RuntimeVendureConfig = {
         },
     },
     jobQueueOptions: {
-        jobQueueStrategy: new TestingJobQueueStrategy(),
-        pollInterval: 100,
+        jobQueueStrategy: new SqlJobQueueStrategy(),
+        pollInterval: 200,
     },
     customFields: {
         Address: [],

+ 54 - 2
packages/core/src/config/job-queue/job-queue-strategy.ts

@@ -1,7 +1,6 @@
 import { ModuleRef } from '@nestjs/core';
+import { JobListOptions } from '@vendure/common/lib/generated-types';
 import { PaginatedList } from '@vendure/common/lib/shared-types';
-import { JobListOptions } from '@vendure/common/src/generated-types';
-import { Connection } from 'typeorm';
 
 import { Job } from '../../job-queue/job';
 
@@ -14,11 +13,64 @@ import { Job } from '../../job-queue/job';
  * @docsCateogry JobQueue
  */
 export interface JobQueueStrategy {
+    /**
+     * @description
+     * Initialization logic to be run when after the Vendure server has been initialized
+     * (in the Nestjs [onApplicationBootstrap hook](https://docs.nestjs.com/fundamentals/lifecycle-events)).
+     *
+     * Receives an instance of the application's ModuleRef, which can be used to inject
+     * providers:
+     *
+     * @example
+     * ```TypeScript
+     * init(moduleRef: ModuleRef) {
+     *     const myService = moduleRef.get(MyService, { strict: false });
+     * }
+     * ```
+     */
     init?(moduleRef: ModuleRef): void | Promise<void>;
+
+    /**
+     * @description
+     * Teardown logic to be run when the Vendure server shuts down.
+     */
+    destroy?(): void | Promise<void>;
+
+    /**
+     * @description
+     * Add a new job to the queue.
+     */
     add(job: Job): Promise<Job>;
+
+    /**
+     * @description
+     * Should return the next job in the given queue. The implementation is
+     * responsible for returning the correct job according to the time of
+     * creation.
+     */
     next(queueName: string): Promise<Job | undefined>;
+
+    /**
+     * @description
+     * Update the job details in the store.
+     */
     update(job: Job): Promise<void>;
+
+    /**
+     * @description
+     * Returns a job by its id.
+     */
     findOne(id: string): Promise<Job | undefined>;
+
+    /**
+     * @description
+     * Returns a list of jobs according to the specified options.
+     */
     findMany(options?: JobListOptions): Promise<PaginatedList<Job>>;
+
+    /**
+     * @description
+     * Returns an array of jobs for the given ids.
+     */
     findManyById(ids: string[]): Promise<Job[]>;
 }

+ 2 - 0
packages/core/src/entity/entities.ts

@@ -17,6 +17,7 @@ import { Fulfillment } from './fulfillment/fulfillment.entity';
 import { GlobalSettings } from './global-settings/global-settings.entity';
 import { HistoryEntry } from './history-entry/history-entry.entity';
 import { OrderHistoryEntry } from './history-entry/order-history-entry.entity';
+import { JobRecord } from './job-record/job-record.entity';
 import { OrderItem } from './order-item/order-item.entity';
 import { OrderLine } from './order-line/order-line.entity';
 import { Order } from './order/order.entity';
@@ -74,6 +75,7 @@ export const coreEntitiesMap = {
     Fulfillment,
     GlobalSettings,
     HistoryEntry,
+    JobRecord,
     Order,
     OrderHistoryEntry,
     OrderItem,

+ 54 - 0
packages/core/src/entity/job-record/job-record.entity.ts

@@ -0,0 +1,54 @@
+import { JobState } from '@vendure/common/lib/generated-types';
+import { DeepPartial } from '@vendure/common/lib/shared-types';
+import { Column, CreateDateColumn, Entity, PrimaryColumn, UpdateDateColumn } from 'typeorm';
+
+@Entity()
+export class JobRecord {
+    constructor(input: DeepPartial<JobRecord>) {
+        if (input) {
+            for (const [key, value] of Object.entries(input)) {
+                (this as any)[key] = value;
+            }
+        }
+    }
+
+    @PrimaryColumn()
+    id: string;
+
+    @CreateDateColumn() createdAt: Date;
+
+    @UpdateDateColumn() updatedAt: Date;
+
+    @Column()
+    queueName: string;
+
+    @Column('simple-json', { nullable: true })
+    data: any;
+
+    @Column('varchar')
+    state: JobState;
+
+    @Column()
+    progress: number;
+
+    @Column('simple-json', { nullable: true })
+    result: any;
+
+    @Column({ nullable: true })
+    error: string;
+
+    @Column({ nullable: true })
+    started?: Date;
+
+    @Column({ nullable: true })
+    settled?: Date;
+
+    @Column()
+    isSettled: boolean;
+
+    @Column()
+    retries: number;
+
+    @Column()
+    attempts: number;
+}

+ 8 - 3
packages/core/src/job-queue/job-queue.service.spec.ts

@@ -3,6 +3,7 @@ import { JobState } from '@vendure/common/lib/generated-types';
 import { Subject } from 'rxjs';
 
 import { ConfigService } from '../config/config.service';
+import { ProcessContext, ServerProcessContext } from '../process-context/process-context';
 
 import { Job } from './job';
 import { JobQueueService } from './job-queue.service';
@@ -16,7 +17,11 @@ describe('JobQueueService', () => {
 
     beforeEach(async () => {
         module = await Test.createTestingModule({
-            providers: [{ provide: ConfigService, useClass: MockConfigService }, JobQueueService],
+            providers: [
+                { provide: ConfigService, useClass: MockConfigService },
+                { provide: ProcessContext, useClass: ServerProcessContext },
+                JobQueueService,
+            ],
         }).compile();
 
         jobQueueService = module.get(JobQueueService);
@@ -107,7 +112,7 @@ describe('JobQueueService', () => {
 
         await tick(queuePollInterval);
         expect(testJob.state).toBe(JobState.FAILED);
-        expect(testJob.error).toBe(err.toString());
+        expect(testJob.error).toBe(err.message);
 
         subject.complete();
     });
@@ -128,7 +133,7 @@ describe('JobQueueService', () => {
 
         await tick(queuePollInterval);
         expect(testJob.state).toBe(JobState.FAILED);
-        expect(testJob.error).toBe(err.toString());
+        expect(testJob.error).toBe(err.message);
 
         subject.complete();
     });

+ 26 - 14
packages/core/src/job-queue/job-queue.service.ts

@@ -1,10 +1,11 @@
-import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
-import { ModuleRef } from '@nestjs/core';
+import { Injectable, OnApplicationBootstrap, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
 import { JobListOptions } from '@vendure/common/lib/generated-types';
+import { PaginatedList } from '@vendure/common/lib/shared-types';
 
-import { PaginatedList } from '../../../common/src/shared-types';
 import { ConfigService } from '../config/config.service';
 import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+import { Logger } from '../config/logger/vendure-logger';
+import { ProcessContext } from '../process-context/process-context';
 
 import { Job } from './job';
 import { JobQueue } from './job-queue';
@@ -18,29 +19,38 @@ import { CreateQueueOptions, JobData } from './types';
  * @docsCateogory JobQueue
  */
 @Injectable()
-export class JobQueueService implements OnModuleInit, OnModuleDestroy {
+export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy {
     private cleanJobsTimer: NodeJS.Timeout;
     private queues: Array<JobQueue<any>> = [];
+    private hasInitialized = false;
 
     private get jobQueueStrategy(): JobQueueStrategy {
         return this.configService.jobQueueOptions.jobQueueStrategy;
     }
 
-    constructor(private configService: ConfigService, private moduleRef: ModuleRef) {}
+    constructor(private configService: ConfigService, private processContext: ProcessContext) {}
 
-    /** @internal */
-    async onModuleInit() {
-        const { jobQueueStrategy } = this.configService.jobQueueOptions;
-        if (typeof jobQueueStrategy.init === 'function') {
-            await jobQueueStrategy.init(this.moduleRef);
+    async onApplicationBootstrap() {
+        if (this.processContext.isServer) {
+            const { pollInterval } = this.configService.jobQueueOptions;
+            if (pollInterval < 100) {
+                Logger.warn(
+                    `jobQueueOptions.pollInterval is set to ${pollInterval}ms. It is not receommended to set this lower than 100ms`,
+                );
+            }
+            await new Promise((resolve) => setTimeout(resolve, 1000));
+            this.hasInitialized = true;
+            for (const queue of this.queues) {
+                if (!queue.started) {
+                    queue.start();
+                }
+            }
         }
     }
 
     /** @internal */
     onModuleDestroy() {
-        for (const queue of this.queues) {
-            queue.destroy();
-        }
+        return Promise.all(this.queues.map((q) => q.destroy()));
     }
 
     /**
@@ -50,7 +60,9 @@ export class JobQueueService implements OnModuleInit, OnModuleDestroy {
     createQueue<Data extends JobData<Data>>(options: CreateQueueOptions<Data>): JobQueue<Data> {
         const { jobQueueStrategy, pollInterval } = this.configService.jobQueueOptions;
         const queue = new JobQueue(options, jobQueueStrategy, pollInterval);
-        queue.start();
+        if (this.processContext.isServer && this.hasInitialized) {
+            queue.start();
+        }
         this.queues.push(queue);
         return queue;
     }

+ 27 - 7
packages/core/src/job-queue/job-queue.ts

@@ -17,6 +17,8 @@ export class JobQueue<Data extends JobData<Data> = {}> {
     private activeJobs: Array<Job<Data>> = [];
     private timer: any;
     private fooId: number;
+    private running = false;
+
     get concurrency(): number {
         return this.options.concurrency;
     }
@@ -25,6 +27,10 @@ export class JobQueue<Data extends JobData<Data> = {}> {
         return this.options.name;
     }
 
+    get started(): boolean {
+        return this.running;
+    }
+
     constructor(
         private options: CreateQueueOptions<Data>,
         private jobQueueStrategy: JobQueueStrategy,
@@ -33,8 +39,12 @@ export class JobQueue<Data extends JobData<Data> = {}> {
 
     /** @internal */
     start() {
+        if (this.running) {
+            return;
+        }
+        this.running = true;
+        const concurrency = this.options.concurrency;
         const runNextJobs = async () => {
-            const concurrency = this.options.concurrency;
             const runningJobsCount = this.activeJobs.length;
             for (let i = runningJobsCount; i < concurrency; i++) {
                 const nextJob: Job<Data> | undefined = await this.jobQueueStrategy.next(this.options.name);
@@ -61,17 +71,27 @@ export class JobQueue<Data extends JobData<Data> = {}> {
 
     /** @internal */
     pause() {
+        this.running = false;
         clearTimeout(this.timer);
     }
 
     /** @internal */
-    destroy() {
+    async destroy(): Promise<void> {
+        this.running = false;
         clearTimeout(this.timer);
-    }
-
-    /** @internal */
-    _process(job: Job<Data>) {
-        this.options.process(job);
+        const start = +new Date();
+        const maxTimeout = 5000;
+        return new Promise((resolve) => {
+            const pollActiveJobs = () => {
+                const timedOut = +new Date() - start > maxTimeout;
+                if (this.activeJobs.length === 0 || timedOut) {
+                    resolve();
+                } else {
+                    setTimeout(pollActiveJobs, 50);
+                }
+            };
+            pollActiveJobs();
+        });
     }
 
     /**

+ 6 - 2
packages/core/src/job-queue/job.ts

@@ -35,9 +35,9 @@ export type JobEventListener<T extends JobData<T>> = (job: Job<T>) => void;
 export class Job<T extends JobData<T> = any> {
     readonly id: string;
     readonly queueName: string;
+    readonly retries: number;
     private readonly _data: T;
     private readonly created: Date;
-    private readonly retries: number;
     private _state: JobState;
     private _progress: number;
     private _result?: any;
@@ -92,6 +92,10 @@ export class Job<T extends JobData<T> = any> {
         return +end - +(this._started || end);
     }
 
+    get attempts(): number {
+        return this._attempts;
+    }
+
     constructor(config: JobConfig<T>) {
         this.queueName = config.queueName;
         this._data = config.data;
@@ -146,7 +150,7 @@ export class Job<T extends JobData<T> = any> {
      * Calling this method signifies that the job failed.
      */
     fail(err?: any) {
-        this._error = String(err);
+        this._error = err?.message ? err.message : String(err);
         this._progress = 0;
         if (this.retries >= this._attempts) {
             this._state = JobState.RETRYING;

+ 114 - 14
packages/core/src/job-queue/sql-job-queue-strategy.ts

@@ -1,29 +1,129 @@
-import { Connection } from 'typeorm';
+import { ModuleRef } from '@nestjs/core';
+import { getConnectionToken } from '@nestjs/typeorm';
+import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
+import { PaginatedList } from '@vendure/common/lib/shared-types';
+import { Brackets, Connection } from 'typeorm';
 
 import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+import { JobRecord } from '../entity/job-record/job-record.entity';
+import { ProcessContext } from '../process-context/process-context';
+import { ListQueryBuilder } from '../service/helpers/list-query-builder/list-query-builder';
 
 import { Job } from './job';
 
 export class SqlJobQueueStrategy implements JobQueueStrategy {
-    private connection: Connection;
+    private connection: Connection | undefined;
+    private listQueryBuilder: ListQueryBuilder;
 
-    init(connection: Connection) {
-        this.connection = connection;
+    init(moduleRef: ModuleRef) {
+        const processContext = moduleRef.get(ProcessContext, { strict: false });
+        if (processContext.isServer) {
+            this.connection = moduleRef.get(getConnectionToken() as any, { strict: false });
+            this.listQueryBuilder = moduleRef.get(ListQueryBuilder, { strict: false });
+        }
     }
-    add(job: Job): Promise<Job> {
-        throw new Error('Method not implemented.');
+
+    async add(job: Job): Promise<Job> {
+        if (!this.connectionAvailable(this.connection)) {
+            return job;
+        }
+        const newRecord = this.toRecord(job);
+        const record = await this.connection.getRepository(JobRecord).save(newRecord);
+        return this.fromRecord(record);
+    }
+
+    async next(queueName: string): Promise<Job | undefined> {
+        if (!this.connectionAvailable(this.connection)) {
+            return;
+        }
+        const record = await this.connection
+            .getRepository(JobRecord)
+            .createQueryBuilder('record')
+            .where('record.queueName = :queueName', { queueName })
+            .andWhere(
+                new Brackets((qb) => {
+                    qb.where('record.state = :pending', {
+                        pending: JobState.PENDING,
+                    }).orWhere('record.state = :retrying', { retrying: JobState.RETRYING });
+                }),
+            )
+            .orderBy('record.createdAt', 'ASC')
+            .getOne();
+        if (record) {
+            const job = this.fromRecord(record);
+            job.start();
+            return job;
+        }
+    }
+
+    async update(job: Job<{}>): Promise<void> {
+        if (!this.connectionAvailable(this.connection)) {
+            return;
+        }
+        await this.connection.getRepository(JobRecord).save(this.toRecord(job));
+    }
+
+    async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
+        if (!this.connectionAvailable(this.connection)) {
+            return {
+                items: [],
+                totalItems: 0,
+            };
+        }
+        return this.listQueryBuilder
+            .build(JobRecord, options)
+            .getManyAndCount()
+            .then(([items, totalItems]) => ({
+                items: items.map(this.fromRecord),
+                totalItems,
+            }));
+    }
+
+    async findOne(id: string): Promise<Job | undefined> {
+        if (!this.connectionAvailable(this.connection)) {
+            return;
+        }
+        const record = await this.connection.getRepository(JobRecord).findOne(id);
+        if (record) {
+            return this.fromRecord(record);
+        }
     }
-    next(queueName: string): Promise<Job> {
-        return {} as any;
+
+    async findManyById(ids: string[]): Promise<Job[]> {
+        if (!this.connectionAvailable(this.connection)) {
+            return [];
+        }
+        return this.connection
+            .getRepository(JobRecord)
+            .findByIds(ids)
+            .then((records) => records.map(this.fromRecord));
     }
-    update(job: Job<{}>): Promise<void> {
-        return {} as any;
+
+    private connectionAvailable(connection: Connection | undefined): connection is Connection {
+        return !!this.connection && this.connection.isConnected;
     }
-    findMany(): Promise<Job[]> {
-        return {} as any;
+
+    private toRecord(job: Job<any>): JobRecord {
+        return new JobRecord({
+            id: job.id,
+            queueName: job.queueName,
+            data: job.data,
+            state: job.state,
+            progress: job.progress,
+            result: job.result,
+            error: job.error,
+            started: job.started,
+            settled: job.settled,
+            isSettled: job.isSettled,
+            retries: job.retries,
+            attempts: job.attempts,
+        });
     }
 
-    findOne(id: string): Promise<Job> {
-        return {} as any;
+    private fromRecord(jobRecord: JobRecord): Job<any> {
+        return new Job<any>({
+            ...jobRecord,
+            created: jobRecord.createdAt,
+        });
     }
 }

+ 0 - 3
packages/core/src/plugin/plugin-common.module.ts

@@ -23,9 +23,6 @@ import { WorkerServiceModule } from '../worker/worker-service.module';
  */
 @Module({
     imports: [EventBusModule, ConfigModule, ServiceModule.forPlugin(), WorkerServiceModule, JobQueueModule],
-    providers: [
-        // TODO: Provide an injectable which defines whether in main or worker context
-    ],
     exports: [EventBusModule, ConfigModule, ServiceModule.forPlugin(), WorkerServiceModule, JobQueueModule],
 })
 export class PluginCommonModule {}

+ 22 - 0
packages/core/src/process-context/process-context.module.ts

@@ -0,0 +1,22 @@
+import { DynamicModule, Global, Module } from '@nestjs/common';
+
+import { ProcessContext, ServerProcessContext, WorkerProcessContext } from './process-context';
+
+@Global()
+@Module({})
+export class ProcessContextModule {
+    static forRoot(): DynamicModule {
+        return {
+            module: ProcessContextModule,
+            providers: [{ provide: ProcessContext, useClass: ServerProcessContext }],
+            exports: [ProcessContext],
+        };
+    }
+    static forWorker(): DynamicModule {
+        return {
+            module: ProcessContextModule,
+            providers: [{ provide: ProcessContext, useClass: WorkerProcessContext }],
+            exports: [ProcessContext],
+        };
+    }
+}

+ 30 - 0
packages/core/src/process-context/process-context.ts

@@ -0,0 +1,30 @@
+import { Injectable } from '@nestjs/common';
+
+/**
+ * @description
+ * The ProcessContext can be injected into your providers in order to know whether that provider
+ * is being executed in the context of the main Vendure server or the worker.
+ *
+ * @docsCategory common
+ */
+@Injectable()
+export class ProcessContext {
+    protected _isServer: boolean;
+
+    get isServer(): boolean {
+        return this._isServer;
+    }
+    get isWorker(): boolean {
+        return !this._isServer;
+    }
+}
+
+@Injectable()
+export class ServerProcessContext extends ProcessContext {
+    protected _isServer = true;
+}
+
+@Injectable()
+export class WorkerProcessContext extends ProcessContext {
+    protected _isServer = false;
+}

+ 14 - 10
packages/core/src/service/controllers/collection.controller.ts

@@ -32,7 +32,7 @@ export class CollectionController {
     applyCollectionFilters({
         collectionIds,
     }: ApplyCollectionFiltersMessage['data']): Observable<ApplyCollectionFiltersMessage['response']> {
-        return asyncObservable(async observer => {
+        return asyncObservable(async (observer) => {
             Logger.verbose(`Processing ${collectionIds.length} Collections`);
             const timeStart = Date.now();
             const collections = await this.connection.getRepository(Collection).findByIds(collectionIds, {
@@ -59,7 +59,7 @@ export class CollectionController {
     private async applyCollectionFiltersInternal(collection: Collection): Promise<ID[]> {
         const ancestorFilters = await this.collectionService
             .getAncestors(collection.id)
-            .then(ancestors =>
+            .then((ancestors) =>
                 ancestors.reduce(
                     (filters, c) => [...filters, ...(c.filters || [])],
                     [] as ConfigurableOperation[],
@@ -70,16 +70,20 @@ export class CollectionController {
             ...ancestorFilters,
             ...(collection.filters || []),
         ]);
-        const postIds = collection.productVariants.map(v => v.id);
-        await this.connection
-            .getRepository(Collection)
-            .save(collection, { chunk: Math.ceil(collection.productVariants.length / 500) });
+        const postIds = collection.productVariants.map((v) => v.id);
+        try {
+            await this.connection
+                .getRepository(Collection)
+                .save(collection, { chunk: Math.ceil(collection.productVariants.length / 500) });
+        } catch (e) {
+            Logger.error(e);
+        }
 
         const preIdsSet = new Set(preIds);
         const postIdsSet = new Set(postIds);
         const difference = [
-            ...preIds.filter(id => !postIdsSet.has(id)),
-            ...postIds.filter(id => !preIdsSet.has(id)),
+            ...preIds.filter((id) => !postIdsSet.has(id)),
+            ...postIds.filter((id) => !preIdsSet.has(id)),
         ];
         return difference;
     }
@@ -91,8 +95,8 @@ export class CollectionController {
         if (filters.length === 0) {
             return [];
         }
-        const facetFilters = filters.filter(f => f.code === facetValueCollectionFilter.code);
-        const variantNameFilters = filters.filter(f => f.code === variantNameCollectionFilter.code);
+        const facetFilters = filters.filter((f) => f.code === facetValueCollectionFilter.code);
+        const variantNameFilters = filters.filter((f) => f.code === variantNameCollectionFilter.code);
         let qb = this.connection.getRepository(ProductVariant).createQueryBuilder('productVariant');
 
         // Apply any facetValue-based filters

+ 8 - 1
packages/core/src/worker/worker.module.ts

@@ -4,6 +4,7 @@ import { APP_INTERCEPTOR } from '@nestjs/core';
 import { ConfigModule } from '../config/config.module';
 import { Logger } from '../config/logger/vendure-logger';
 import { PluginModule } from '../plugin/plugin.module';
+import { ProcessContextModule } from '../process-context/process-context.module';
 import { ServiceModule } from '../service/service.module';
 
 import { MessageInterceptor } from './message-interceptor';
@@ -11,7 +12,13 @@ import { WorkerMonitor } from './worker-monitor';
 import { WorkerServiceModule } from './worker-service.module';
 
 @Module({
-    imports: [ConfigModule, ServiceModule.forWorker(), PluginModule.forWorker(), WorkerServiceModule],
+    imports: [
+        ConfigModule,
+        ServiceModule.forWorker(),
+        PluginModule.forWorker(),
+        WorkerServiceModule,
+        ProcessContextModule.forWorker(),
+    ],
     providers: [
         WorkerMonitor,
         {

+ 1 - 1
packages/dev-server/dev-config.ts

@@ -106,7 +106,7 @@ function getDbConfig(): ConnectionOptions {
         default:
             console.log('Using mysql connection');
             return {
-                synchronize: false,
+                synchronize: true,
                 type: 'mysql',
                 host: '192.168.99.100',
                 port: 3306,

+ 2 - 2
packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts

@@ -873,7 +873,7 @@ const REINDEX = gql`
     mutation Reindex {
         reindex {
             id
-            name
+            queueName
             state
             progress
             duration
@@ -886,7 +886,7 @@ const GET_JOB_INFO = gql`
     query GetJobInfo($id: ID!) {
         job(jobId: $id) {
             id
-            name
+            queueName
             state
             progress
             duration

+ 13 - 7
packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts

@@ -1265,10 +1265,11 @@ export type IntCustomFieldConfig = CustomField & {
 export type Job = Node & {
     __typename?: 'Job';
     id: Scalars['ID'];
-    name: Scalars['String'];
+    createdAt: Scalars['DateTime'];
+    queueName: Scalars['String'];
     state: JobState;
     progress: Scalars['Float'];
-    metadata?: Maybe<Scalars['JSON']>;
+    data?: Maybe<Scalars['JSON']>;
     result?: Maybe<Scalars['JSON']>;
     error?: Maybe<Scalars['JSON']>;
     started: Scalars['DateTime'];
@@ -1278,7 +1279,8 @@ export type Job = Node & {
 };
 
 export type JobFilterParameter = {
-    name?: Maybe<StringOperators>;
+    createdAt?: Maybe<DateOperators>;
+    queueName?: Maybe<StringOperators>;
     state?: Maybe<StringOperators>;
     progress?: Maybe<NumberOperators>;
     started?: Maybe<DateOperators>;
@@ -1302,7 +1304,8 @@ export type JobListOptions = {
 
 export type JobSortParameter = {
     id?: Maybe<SortOrder>;
-    name?: Maybe<SortOrder>;
+    createdAt?: Maybe<SortOrder>;
+    queueName?: Maybe<SortOrder>;
     progress?: Maybe<SortOrder>;
     started?: Maybe<SortOrder>;
     settled?: Maybe<SortOrder>;
@@ -2819,7 +2822,7 @@ export type QueryJobArgs = {
 };
 
 export type QueryJobsArgs = {
-    input?: Maybe<JobListOptions>;
+    options?: Maybe<JobListOptions>;
 };
 
 export type QueryJobsByIdArgs = {
@@ -3576,7 +3579,10 @@ export type SearchGetPricesQuery = { __typename?: 'Query' } & {
 export type ReindexMutationVariables = {};
 
 export type ReindexMutation = { __typename?: 'Mutation' } & {
-    reindex: { __typename?: 'Job' } & Pick<Job, 'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'>;
+    reindex: { __typename?: 'Job' } & Pick<
+        Job,
+        'id' | 'queueName' | 'state' | 'progress' | 'duration' | 'result'
+    >;
 };
 
 export type GetJobInfoQueryVariables = {
@@ -3585,7 +3591,7 @@ export type GetJobInfoQueryVariables = {
 
 export type GetJobInfoQuery = { __typename?: 'Query' } & {
     job?: Maybe<
-        { __typename?: 'Job' } & Pick<Job, 'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'>
+        { __typename?: 'Job' } & Pick<Job, 'id' | 'queueName' | 'state' | 'progress' | 'duration' | 'result'>
     >;
 };
 

File diff suppressed because it is too large
+ 0 - 0
schema-admin.json


Some files were not shown because too many files changed in this diff