1
0
Эх сурвалжийг харах

fix(core): Improve fault-tolerance of JobQueue

Michael Bromley 4 жил өмнө
parent
commit
cb5b10017a

+ 7 - 2
packages/core/src/job-queue/job-queue.ts

@@ -91,7 +91,12 @@ export class JobQueue<Data extends JobData<Data> = {}> {
             queueName: this.options.name,
             queueName: this.options.name,
             retries: options?.retries ?? 0,
             retries: options?.retries ?? 0,
         });
         });
-        const addedJob = await this.jobQueueStrategy.add(job);
-        return new SubscribableJob(addedJob, this.jobQueueStrategy);
+        try {
+            const addedJob = await this.jobQueueStrategy.add(job);
+            return new SubscribableJob(addedJob, this.jobQueueStrategy);
+        } catch (err) {
+            Logger.error(`Could not add Job to "${this.name}" queue`, undefined, err.stack);
+            return new SubscribableJob(job, this.jobQueueStrategy);
+        }
     }
     }
 }
 }

+ 36 - 3
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -4,6 +4,7 @@ import { Brackets, Connection, EntityManager, FindConditions, In, LessThan } fro
 
 
 import { Injector } from '../../common/injector';
 import { Injector } from '../../common/injector';
 import { InspectableJobQueueStrategy, JobQueueStrategy } from '../../config';
 import { InspectableJobQueueStrategy, JobQueueStrategy } from '../../config';
+import { Logger } from '../../config/logger/vendure-logger';
 import { Job, JobData } from '../../job-queue';
 import { Job, JobData } from '../../job-queue';
 import { PollingJobQueueStrategy } from '../../job-queue/polling-job-queue-strategy';
 import { PollingJobQueueStrategy } from '../../job-queue/polling-job-queue-strategy';
 import { TransactionalConnection } from '../../service';
 import { TransactionalConnection } from '../../service';
@@ -37,11 +38,43 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
         if (!this.connectionAvailable(this.connection)) {
         if (!this.connectionAvailable(this.connection)) {
             throw new Error('Connection not available');
             throw new Error('Connection not available');
         }
         }
-        const newRecord = this.toRecord(job);
+        const constrainedData = this.constrainDataSize(job);
+        const newRecord = this.toRecord(job, constrainedData);
         const record = await this.connection.getRepository(JobRecord).save(newRecord);
         const record = await this.connection.getRepository(JobRecord).save(newRecord);
         return this.fromRecord(record);
         return this.fromRecord(record);
     }
     }
 
 
+    /**
+     * MySQL & MariaDB store job data as a "text" type which has a limit of 64kb. Going over that limit will cause the job to not be stored.
+     * In order to try to prevent that, this method will truncate any strings in the `data` object over 2kb in size.
+     */
+    private constrainDataSize<Data extends JobData<Data> = {}>(job: Job<Data>): Data | undefined {
+        const type = this.connection?.options.type;
+        if (type === 'mysql' || type === 'mariadb') {
+            const stringified = JSON.stringify(job.data);
+            if (64 * 1024 <= stringified.length) {
+                const truncatedKeys: Array<{ key: string; size: number }> = [];
+                const reduced = JSON.parse(stringified, (key, value) => {
+                    if (typeof value === 'string' && 2048 < value.length) {
+                        truncatedKeys.push({ key, size: value.length });
+                        return `[truncated - originally ${value.length} bytes]`;
+                    }
+                    return value;
+                });
+                Logger.warn(
+                    `Job data for "${
+                        job.queueName
+                    }" is too long to store with the ${type} driver (${Math.round(
+                        stringified.length / 1024,
+                    )}kb).\nThe following keys were truncated: ${truncatedKeys
+                        .map(({ key, size }) => `${key} (${size} bytes)`)
+                        .join(', ')}`,
+                );
+                return reduced;
+            }
+        }
+    }
+
     async next(queueName: string): Promise<Job | undefined> {
     async next(queueName: string): Promise<Job | undefined> {
         if (!this.connectionAvailable(this.connection)) {
         if (!this.connectionAvailable(this.connection)) {
             throw new Error('Connection not available');
             throw new Error('Connection not available');
@@ -183,11 +216,11 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
         return !!this.connection && this.connection.isConnected;
         return !!this.connection && this.connection.isConnected;
     }
     }
 
 
-    private toRecord(job: Job<any>): JobRecord {
+    private toRecord(job: Job<any>, data?: any): JobRecord {
         return new JobRecord({
         return new JobRecord({
             id: job.id || undefined,
             id: job.id || undefined,
             queueName: job.queueName,
             queueName: job.queueName,
-            data: job.data,
+            data: data ?? job.data,
             state: job.state,
             state: job.state,
             progress: job.progress,
             progress: job.progress,
             result: job.result,
             result: job.result,