Browse Source

feat(core): Tracing for job-queue

David Höck 9 months ago
parent
commit
7a27428e39

+ 18 - 2
packages/core/src/job-queue/job-queue.service.ts

@@ -1,5 +1,6 @@
 import { Injectable, OnModuleDestroy } from '@nestjs/common';
 import { JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types';
+import { Span, TraceService } from 'nestjs-otel';
 
 import { ConfigService, JobQueueStrategy, Logger } from '../config';
 
@@ -54,7 +55,11 @@ export class JobQueueService implements OnModuleDestroy {
         return this.configService.jobQueueOptions.jobQueueStrategy;
     }
 
-    constructor(private configService: ConfigService, private jobBufferService: JobBufferService) {}
+    constructor(
+        private configService: ConfigService,
+        private jobBufferService: JobBufferService,
+        private traceService: TraceService,
+    ) {}
 
     /** @internal */
     onModuleDestroy() {
@@ -66,6 +71,7 @@ export class JobQueueService implements OnModuleDestroy {
      * @description
      * Configures and creates a new {@link JobQueue} instance.
      */
+    @Span('vendure.job-queue.create-queue')
     async createQueue<Data extends JobData<Data>>(
         options: CreateQueueOptions<Data>,
     ): Promise<JobQueue<Data>> {
@@ -74,20 +80,30 @@ export class JobQueueService implements OnModuleDestroy {
         }
         const wrappedProcessFn = this.createWrappedProcessFn(options.process);
         options = { ...options, process: wrappedProcessFn };
-        const queue = new JobQueue(options, this.jobQueueStrategy, this.jobBufferService);
+
+        const span = this.traceService.getSpan();
+        span?.setAttribute('job-queue.name', options.name);
+
+        const queue = new JobQueue(options, this.jobQueueStrategy, this.jobBufferService, this.traceService);
         if (this.hasStarted && this.shouldStartQueue(queue.name)) {
             await queue.start();
         }
         this.queues.push(queue);
+
+        span?.end();
+
         return queue;
     }
 
+    @Span('vendure.job-queue.start')
     async start(): Promise<void> {
         this.hasStarted = true;
         for (const queue of this.queues) {
             if (!queue.started && this.shouldStartQueue(queue.name)) {
                 Logger.info(`Starting queue: ${queue.name}`, loggerCtx);
                 await queue.start();
+                const span = this.traceService.getSpan();
+                span?.setAttribute('job-queue.name', queue.name);
             }
         }
     }

+ 12 - 0
packages/core/src/job-queue/job-queue.ts

@@ -1,4 +1,5 @@
 import { JobState } from '@vendure/common/lib/generated-types';
+import { Span, TraceService } from 'nestjs-otel';
 import { Subject, Subscription } from 'rxjs';
 import { throttleTime } from 'rxjs/operators';
 
@@ -37,6 +38,7 @@ export class JobQueue<Data extends JobData<Data> = object> {
         private options: CreateQueueOptions<Data>,
         private jobQueueStrategy: JobQueueStrategy,
         private jobBufferService: JobBufferService,
+        private traceService: TraceService,
     ) {}
 
     /** @internal */
@@ -90,16 +92,24 @@ export class JobQueue<Data extends JobData<Data> = object> {
      *   .catch(err => err.message);
      * ```
      */
+    @Span('vendure.job-queue.add')
     async add(data: Data, options?: JobOptions<Data>): Promise<SubscribableJob<Data>> {
         const job = new Job<any>({
             data,
             queueName: this.options.name,
             retries: options?.retries ?? 0,
         });
+        const span = this.traceService.getSpan();
+        span?.setAttribute('job.data', JSON.stringify(data));
+        span?.setAttribute('job.retries', options?.retries ?? 0);
+        span?.setAttribute('job.queueName', this.options.name);
+        span?.setAttribute('job.id', job.id ?? 'unknown');
 
         const isBuffered = await this.jobBufferService.add(job);
         if (!isBuffered) {
             const addedJob = await this.jobQueueStrategy.add(job, options);
+            span?.setAttribute('job.buffered', false);
+            span?.end();
             return new SubscribableJob(addedJob, this.jobQueueStrategy);
         } else {
             const bufferedJob = new Job({
@@ -107,6 +117,8 @@ export class JobQueue<Data extends JobData<Data> = object> {
                 data: job.data,
                 id: 'buffered',
             });
+            span?.setAttribute('job.buffered', true);
+            span?.end();
             return new SubscribableJob(bufferedJob, this.jobQueueStrategy);
         }
     }