|
|
@@ -11,7 +11,9 @@ import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
|
|
|
import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
|
|
|
|
|
|
import { Injector } from '../common';
|
|
|
-import { InspectableJobQueueStrategy } from '../config';
|
|
|
+import { InspectableJobQueueStrategy } from '../config/job-queue/inspectable-job-queue-strategy';
|
|
|
+import { Logger } from '../config/logger/vendure-logger';
|
|
|
+import { ProcessContext } from '../process-context/process-context';
|
|
|
|
|
|
import { Job } from './job';
|
|
|
import { PollingJobQueueStrategy } from './polling-job-queue-strategy';
|
|
|
@@ -21,19 +23,33 @@ import { JobData } from './types';
|
|
|
* @description
|
|
|
* An in-memory {@link JobQueueStrategy}. This is the default strategy if not using a dedicated
|
|
|
* JobQueue plugin (e.g. {@link DefaultJobQueuePlugin}). Not recommended for production, since
|
|
|
- * the queue will be cleared when the server stops.
|
|
|
+ * the queue will be cleared when the server stops, and can only be used when the JobQueueService is
|
|
|
+ * started from the main server process:
|
|
|
+ *
|
|
|
+ * @example
|
|
|
+ * ```TypeScript
|
|
|
+ * bootstrap(config)
|
|
|
+ * .then(app => app.get(JobQueueService).start());
|
|
|
+ * ```
|
|
|
+ *
|
|
|
+ * Attempting to use this strategy when running the worker in a separate process (using `bootstrapWorker()`)
|
|
|
+ * will result in an error on startup.
|
|
|
+ *
|
|
|
* Completed jobs will be evicted from the store every 2 hours to prevent a memory leak.
|
|
|
*
|
|
|
* @docsCategory JobQueue
|
|
|
*/
|
|
|
export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
protected jobs = new Map<ID, Job>();
|
|
|
- protected unsettledJobs: { [queueName: string]: Job[] } = {};
|
|
|
+ protected unsettledJobs: { [queueName: string]: Array<{ job: Job; updatedAt: Date }> } = {};
|
|
|
private timer: any;
|
|
|
private evictJobsAfterMs = 1000 * 60 * 60 * 2; // 2 hours
|
|
|
+ private processContext: ProcessContext;
|
|
|
+ private processContextChecked = false;
|
|
|
|
|
|
init(injector: Injector) {
|
|
|
super.init(injector);
|
|
|
+ this.processContext = injector.get(ProcessContext);
|
|
|
this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
|
|
|
}
|
|
|
|
|
|
@@ -53,7 +69,7 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements
|
|
|
if (!this.unsettledJobs[job.queueName]) {
|
|
|
this.unsettledJobs[job.queueName] = [];
|
|
|
}
|
|
|
- this.unsettledJobs[job.queueName].push(job);
|
|
|
+ this.unsettledJobs[job.queueName].push({ job, updatedAt: new Date() });
|
|
|
return job;
|
|
|
}
|
|
|
|
|
|
@@ -85,16 +101,25 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements
|
|
|
}
|
|
|
|
|
|
async next(queueName: string): Promise<Job | undefined> {
|
|
|
+ this.checkProcessContext();
|
|
|
const next = this.unsettledJobs[queueName]?.shift();
|
|
|
if (next) {
|
|
|
- next.start();
|
|
|
- return next;
|
|
|
+ if (next.job.state === JobState.RETRYING && typeof this.backOffStrategy === 'function') {
|
|
|
+ const msSinceLastFailure = Date.now() - +next.updatedAt;
|
|
|
+ const backOffDelayMs = this.backOffStrategy(queueName, next.job.attempts, next.job);
|
|
|
+ if (msSinceLastFailure < backOffDelayMs) {
|
|
|
+ this.unsettledJobs[queueName]?.push(next);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ next.job.start();
|
|
|
+ return next.job;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
async update(job: Job): Promise<void> {
|
|
|
if (job.state === JobState.RETRYING || job.state === JobState.PENDING) {
|
|
|
- this.unsettledJobs[job.queueName].unshift(job);
|
|
|
+ this.unsettledJobs[job.queueName].unshift({ job, updatedAt: new Date() });
|
|
|
}
|
|
|
// tslint:disable-next-line:no-non-null-assertion
|
|
|
this.jobs.set(job.id!, job);
|
|
|
@@ -195,4 +220,16 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements
|
|
|
this.removeSettledJobs([], new Date(olderThanMs));
|
|
|
this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
|
|
|
};
|
|
|
+
|
|
|
+ private checkProcessContext() {
|
|
|
+ if (!this.processContextChecked) {
|
|
|
+ if (this.processContext.isWorker) {
|
|
|
+ Logger.error(
|
|
|
+ `The InMemoryJobQueueStrategy will not work when running job queues outside the main server process!`,
|
|
|
+ );
|
|
|
+ process.kill(process.pid, 'SIGINT');
|
|
|
+ }
|
|
|
+ this.processContextChecked = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|