Browse Source

feat(core): Pass ctx to job queue strategy add (#2759)

Closes #2758
Pieter Doms 1 year ago
parent
commit
3909251338

+ 1 - 1
docs/docs/reference/typescript-api/job-queue/index.md

@@ -26,7 +26,7 @@ class JobQueue<Data extends JobData<Data> = object> {
     name: string
     started: boolean
     constructor(options: CreateQueueOptions<Data>, jobQueueStrategy: JobQueueStrategy, jobBufferService: JobBufferService)
-    add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>) => Promise<SubscribableJob<Data>>;
+    add(data: Data, options?: JobOptions) => Promise<SubscribableJob<Data>>;
 }
 ```
 

+ 3 - 2
packages/core/src/config/job-queue/job-queue-strategy.ts

@@ -2,8 +2,9 @@ import { JobListOptions } from '@vendure/common/lib/generated-types';
 import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
 
 import { InjectableStrategy } from '../../common';
-import { JobData } from '../../job-queue';
+import { JobData, JobQueueStrategyJobOptions } from '../../job-queue';
 import { Job } from '../../job-queue';
+import { JobOptions } from '../../job-queue';
 
 /**
  * @description
@@ -25,7 +26,7 @@ export interface JobQueueStrategy extends InjectableStrategy {
      * @description
      * Add a new job to the queue.
      */
-    add<Data extends JobData<Data> = object>(job: Job<Data>): Promise<Job<Data>>;
+    add<Data extends JobData<Data> = object>(job: Job<Data>, jobOptions?: JobQueueStrategyJobOptions<Data>): Promise<Job<Data>>;
 
     /**
      * @description

+ 4 - 4
packages/core/src/job-queue/job-queue.ts

@@ -8,7 +8,7 @@ import { Logger } from '../config/logger/vendure-logger';
 import { Job } from './job';
 import { JobBufferService } from './job-buffer/job-buffer.service';
 import { SubscribableJob } from './subscribable-job';
-import { CreateQueueOptions, JobConfig, JobData } from './types';
+import { CreateQueueOptions, JobConfig, JobData, JobOptions } from './types';
 
 /**
  * @description
@@ -90,7 +90,7 @@ export class JobQueue<Data extends JobData<Data> = object> {
      *   .catch(err => err.message);
      * ```
      */
-    async add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>): Promise<SubscribableJob<Data>> {
+    async add(data: Data, options?: JobOptions<Data>): Promise<SubscribableJob<Data>> {
         const job = new Job<any>({
             data,
             queueName: this.options.name,
@@ -99,8 +99,8 @@ export class JobQueue<Data extends JobData<Data> = object> {
 
         const isBuffered = await this.jobBufferService.add(job);
         if (!isBuffered) {
-                const addedJob = await this.jobQueueStrategy.add(job);
-                return new SubscribableJob(addedJob, this.jobQueueStrategy);
+            const addedJob = await this.jobQueueStrategy.add(job);
+            return new SubscribableJob(addedJob, this.jobQueueStrategy);
         } else {
             const bufferedJob = new Job({
                 ...job,

+ 8 - 0
packages/core/src/job-queue/types.ts

@@ -1,6 +1,8 @@
 import { JobState } from '@vendure/common/lib/generated-types';
 import { ID, JsonCompatible } from '@vendure/common/lib/shared-types';
 
+import { RequestContext } from '../api/common/request-context';
+
 import { Job } from './job';
 
 /**
@@ -55,3 +57,9 @@ export interface JobConfig<T extends JobData<T>> {
     startedAt?: Date;
     settledAt?: Date;
 }
+
+export type JobOptions<Data extends JsonCompatible<Data>> = Pick<JobConfig<Data>, 'retries'>  & {
+    ctx?: RequestContext
+};
+
+export type JobQueueStrategyJobOptions<Data extends JsonCompatible<Data>> = Omit<JobOptions<Data>, "retries">

+ 27 - 23
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -6,7 +6,7 @@ import { Injector } from '../../common/injector';
 import { InspectableJobQueueStrategy, JobQueueStrategy } from '../../config';
 import { Logger } from '../../config/logger/vendure-logger';
 import { TransactionalConnection } from '../../connection/transactional-connection';
-import { Job, JobData } from '../../job-queue';
+import { Job, JobData, JobQueueStrategyJobOptions } from '../../job-queue';
 import { PollingJobQueueStrategy } from '../../job-queue/polling-job-queue-strategy';
 import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder';
 
@@ -20,27 +20,31 @@ import { JobRecord } from './job-record.entity';
  * @docsCategory JobQueue
  */
 export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements InspectableJobQueueStrategy {
-    private connection: Connection | undefined;
+    private rawConnection: Connection | undefined;
+    private connection: TransactionalConnection | undefined;
     private listQueryBuilder: ListQueryBuilder;
 
     init(injector: Injector) {
-        this.connection = injector.get(TransactionalConnection).rawConnection;
+        this.rawConnection = injector.get(TransactionalConnection).rawConnection;
+        this.connection = injector.get(TransactionalConnection);
         this.listQueryBuilder = injector.get(ListQueryBuilder);
         super.init(injector);
     }
 
     destroy() {
-        this.connection = undefined;
+        this.rawConnection = undefined;
         super.destroy();
     }
 
-    async add<Data extends JobData<Data> = object>(job: Job<Data>): Promise<Job<Data>> {
-        if (!this.connectionAvailable(this.connection)) {
+    async add<Data extends JobData<Data> = object>(job: Job<Data>, jobOptions?: JobQueueStrategyJobOptions<Data>): Promise<Job<Data>> {
+        if (!this.connectionAvailable(this.rawConnection)) {
             throw new Error('Connection not available');
         }
+        const jobRecordRepository = jobOptions?.ctx && this.connection ? this.connection.getRepository(jobOptions.ctx, JobRecord) :
+            this.rawConnection.getRepository(JobRecord);
         const constrainedData = this.constrainDataSize(job);
         const newRecord = this.toRecord(job, constrainedData, this.setRetries(job.queueName, job));
-        const record = await this.connection.getRepository(JobRecord).save(newRecord);
+        const record = await jobRecordRepository.save(newRecord);
         return this.fromRecord(record);
     }
 
@@ -49,7 +53,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
      * In order to try to prevent that, this method will truncate any strings in the `data` object over 2kb in size.
      */
     private constrainDataSize<Data extends JobData<Data> = object>(job: Job<Data>): Data | undefined {
-        const type = this.connection?.options.type;
+        const type = this.rawConnection?.options.type;
         if (type === 'mysql' || type === 'mariadb') {
             const stringified = JSON.stringify(job.data);
             if (64 * 1024 <= stringified.length) {
@@ -76,11 +80,11 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
     }
 
     async next(queueName: string): Promise<Job | undefined> {
-        if (!this.connectionAvailable(this.connection)) {
+        if (!this.connectionAvailable(this.rawConnection)) {
             throw new Error('Connection not available');
         }
-        const connection = this.connection;
-        const connectionType = this.connection.options.type;
+        const connection = this.rawConnection;
+        const connectionType = this.rawConnection.options.type;
         const isSQLite =
             connectionType === 'sqlite' || connectionType === 'sqljs' || connectionType === 'better-sqlite3';
 
@@ -157,10 +161,10 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
     }
 
     async update(job: Job<any>): Promise<void> {
-        if (!this.connectionAvailable(this.connection)) {
+        if (!this.connectionAvailable(this.rawConnection)) {
             throw new Error('Connection not available');
         }
-        await this.connection
+        await this.rawConnection
             .getRepository(JobRecord)
             .createQueryBuilder('job')
             .update()
@@ -171,7 +175,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
     }
 
     async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
-        if (!this.connectionAvailable(this.connection)) {
+        if (!this.connectionAvailable(this.rawConnection)) {
             throw new Error('Connection not available');
         }
         return this.listQueryBuilder
@@ -184,27 +188,27 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
     }
 
     async findOne(id: ID): Promise<Job | undefined> {
-        if (!this.connectionAvailable(this.connection)) {
+        if (!this.connectionAvailable(this.rawConnection)) {
             throw new Error('Connection not available');
         }
-        const record = await this.connection.getRepository(JobRecord).findOne({ where: { id } });
+        const record = await this.rawConnection.getRepository(JobRecord).findOne({ where: { id } });
         if (record) {
             return this.fromRecord(record);
         }
     }
 
     async findManyById(ids: ID[]): Promise<Job[]> {
-        if (!this.connectionAvailable(this.connection)) {
+        if (!this.connectionAvailable(this.rawConnection)) {
             throw new Error('Connection not available');
         }
-        return this.connection
+        return this.rawConnection
             .getRepository(JobRecord)
             .find({ where: { id: In(ids) } })
             .then(records => records.map(this.fromRecord));
     }
 
     async removeSettledJobs(queueNames: string[] = [], olderThan?: Date) {
-        if (!this.connectionAvailable(this.connection)) {
+        if (!this.connectionAvailable(this.rawConnection)) {
             throw new Error('Connection not available');
         }
         const findOptions: FindOptionsWhere<JobRecord> = {
@@ -212,14 +216,14 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
             isSettled: true,
             settledAt: LessThan(olderThan || new Date()),
         };
-        const toDelete = await this.connection.getRepository(JobRecord).find({ where: findOptions });
-        const deleteCount = await this.connection.getRepository(JobRecord).count({ where: findOptions });
-        await this.connection.getRepository(JobRecord).delete(findOptions);
+        const toDelete = await this.rawConnection.getRepository(JobRecord).find({ where: findOptions });
+        const deleteCount = await this.rawConnection.getRepository(JobRecord).count({ where: findOptions });
+        await this.rawConnection.getRepository(JobRecord).delete(findOptions);
         return deleteCount;
     }
 
     private connectionAvailable(connection: Connection | undefined): connection is Connection {
-        return !!this.connection && this.connection.isConnected;
+        return !!this.rawConnection && this.rawConnection.isConnected;
     }
 
     private toRecord(job: Job<any>, data?: any, retries?: number): JobRecord {

+ 18 - 12
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -67,7 +67,7 @@ export class SearchIndexService implements OnApplicationBootstrap {
     }
 
     reindex(ctx: RequestContext) {
-        return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() });
+        return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() }, { ctx });
     }
 
     updateProduct(ctx: RequestContext, product: Product) {
@@ -75,12 +75,13 @@ export class SearchIndexService implements OnApplicationBootstrap {
             type: 'update-product',
             ctx: ctx.serialize(),
             productId: product.id,
-        });
+        },
+        {   ctx   });
     }
 
     updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map(v => v.id);
-        return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds });
+        return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds }, { ctx });
     }
 
     deleteProduct(ctx: RequestContext, product: Product) {
@@ -88,24 +89,25 @@ export class SearchIndexService implements OnApplicationBootstrap {
             type: 'delete-product',
             ctx: ctx.serialize(),
             productId: product.id,
-        });
+        },
+        {   ctx   });
     }
 
     deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map(v => v.id);
-        return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds });
+        return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds }, { ctx });
     }
 
     updateVariantsById(ctx: RequestContext, ids: ID[]) {
-        return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids });
+        return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids }, { ctx });
     }
 
     updateAsset(ctx: RequestContext, asset: Asset) {
-        return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any });
+        return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx });
     }
 
     deleteAsset(ctx: RequestContext, asset: Asset) {
-        return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any });
+        return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx });
     }
 
     assignProductToChannel(ctx: RequestContext, productId: ID, channelId: ID) {
@@ -114,7 +116,8 @@ export class SearchIndexService implements OnApplicationBootstrap {
             ctx: ctx.serialize(),
             productId,
             channelId,
-        });
+        },
+        {   ctx   });
     }
 
     removeProductFromChannel(ctx: RequestContext, productId: ID, channelId: ID) {
@@ -123,7 +126,8 @@ export class SearchIndexService implements OnApplicationBootstrap {
             ctx: ctx.serialize(),
             productId,
             channelId,
-        });
+        },
+        {   ctx   });
     }
 
     assignVariantToChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
@@ -132,7 +136,8 @@ export class SearchIndexService implements OnApplicationBootstrap {
             ctx: ctx.serialize(),
             productVariantId,
             channelId,
-        });
+        },
+        {   ctx   });
     }
 
     removeVariantFromChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
@@ -141,7 +146,8 @@ export class SearchIndexService implements OnApplicationBootstrap {
             ctx: ctx.serialize(),
             productVariantId,
             channelId,
-        });
+        },
+        {   ctx   });
     }
 
     private jobWithProgress(

+ 10 - 5
packages/core/src/service/services/collection.service.ts

@@ -104,7 +104,8 @@ export class CollectionService implements OnModuleInit {
                 await this.applyFiltersQueue.add({
                     ctx: event.ctx.serialize(),
                     collectionIds: collections.map(c => c.id),
-                });
+                },
+                {   ctx: event.ctx   });
             });
 
         this.applyFiltersQueue = await this.jobQueueService.createQueue({
@@ -471,7 +472,8 @@ export class CollectionService implements OnModuleInit {
         await this.applyFiltersQueue.add({
             ctx: ctx.serialize(),
             collectionIds: [collection.id],
-        });
+        },
+        {   ctx   });
         await this.eventBus.publish(new CollectionEvent(ctx, collectionWithRelations, 'created', input));
         return assertFound(this.findOne(ctx, collection.id));
     }
@@ -497,7 +499,8 @@ export class CollectionService implements OnModuleInit {
                 ctx: ctx.serialize(),
                 collectionIds: [collection.id],
                 applyToChangedVariantsOnly: false,
-            });
+            },
+            {   ctx   });
         } else {
             const affectedVariantIds = await this.getCollectionProductVariantIds(collection);
             await this.eventBus.publish(new CollectionModificationEvent(ctx, collection, affectedVariantIds));
@@ -571,7 +574,8 @@ export class CollectionService implements OnModuleInit {
         await this.applyFiltersQueue.add({
             ctx: ctx.serialize(),
             collectionIds: [target.id],
-        });
+        },
+        {   ctx   });
         return assertFound(this.findOne(ctx, input.collectionId));
     }
 
@@ -829,7 +833,8 @@ export class CollectionService implements OnModuleInit {
         await this.applyFiltersQueue.add({
             ctx: ctx.serialize(),
             collectionIds: collectionsToAssign.map(collection => collection.id),
-        });
+        },
+        {   ctx   });
 
         return this.connection
             .findByIdsInChannel(

+ 18 - 12
packages/elasticsearch-plugin/src/indexing/elasticsearch-index.service.ts

@@ -67,7 +67,7 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap {
     }
 
     reindex(ctx: RequestContext) {
-        return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() });
+        return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() }, { ctx });
     }
 
     updateProduct(ctx: RequestContext, product: Product) {
@@ -75,12 +75,13 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap {
             type: 'update-product',
             ctx: ctx.serialize(),
             productId: product.id,
-        });
+        },
+        {   ctx   });
     }
 
     updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map(v => v.id);
-        return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds });
+        return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds }, { ctx });
     }
 
     deleteProduct(ctx: RequestContext, product: Product) {
@@ -88,12 +89,13 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap {
             type: 'delete-product',
             ctx: ctx.serialize(),
             productId: product.id,
-        });
+        },
+        {   ctx   });
     }
 
     deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
         const variantIds = variants.map(v => v.id);
-        return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds });
+        return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds }, { ctx });
     }
 
     assignProductToChannel(ctx: RequestContext, product: Product, channelId: ID) {
@@ -102,7 +104,8 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap {
             ctx: ctx.serialize(),
             productId: product.id,
             channelId,
-        });
+        },
+        {   ctx   });
     }
 
     removeProductFromChannel(ctx: RequestContext, product: Product, channelId: ID) {
@@ -111,7 +114,8 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap {
             ctx: ctx.serialize(),
             productId: product.id,
             channelId,
-        });
+        },
+        {   ctx   });
     }
 
     assignVariantToChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
@@ -120,7 +124,8 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap {
             ctx: ctx.serialize(),
             productVariantId,
             channelId,
-        });
+        },
+        {   ctx   });
     }
 
     removeVariantFromChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
@@ -129,19 +134,20 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap {
             ctx: ctx.serialize(),
             productVariantId,
             channelId,
-        });
+        },
+        {   ctx   });
     }
 
     updateVariantsById(ctx: RequestContext, ids: ID[]) {
-        return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids });
+        return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids }, { ctx });
     }
 
     updateAsset(ctx: RequestContext, asset: Asset) {
-        return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any });
+        return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx });
     }
 
     deleteAsset(ctx: RequestContext, asset: Asset) {
-        return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any });
+        return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx });
     }
 
     private jobWithProgress(