import { ModuleRef } from '@nestjs/core'; import { getConnectionToken } from '@nestjs/typeorm'; import { JobListOptions, JobState } from '@vendure/common/lib/generated-types'; import { ID, PaginatedList } from '@vendure/common/lib/shared-types'; import { Brackets, Connection, FindConditions, In, LessThan, Not } from 'typeorm'; import { JobQueueStrategy } from '../../config/job-queue/job-queue-strategy'; import { Job } from '../../job-queue/job'; import { ProcessContext } from '../../process-context/process-context'; import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder'; import { JobRecord } from './job-record.entity'; export class SqlJobQueueStrategy implements JobQueueStrategy { private connection: Connection | undefined; private listQueryBuilder: ListQueryBuilder; init(moduleRef: ModuleRef) { const processContext = moduleRef.get(ProcessContext, { strict: false }); if (processContext.isServer) { this.connection = moduleRef.get(getConnectionToken() as any, { strict: false }); this.listQueryBuilder = moduleRef.get(ListQueryBuilder, { strict: false }); } } async add(job: Job): Promise { if (!this.connectionAvailable(this.connection)) { return job; } const newRecord = this.toRecord(job); const record = await this.connection.getRepository(JobRecord).save(newRecord); return this.fromRecord(record); } async next(queueName: string): Promise { if (!this.connectionAvailable(this.connection)) { return; } const record = await this.connection .getRepository(JobRecord) .createQueryBuilder('record') .where('record.queueName = :queueName', { queueName }) .andWhere( new Brackets(qb => { qb.where('record.state = :pending', { pending: JobState.PENDING, }).orWhere('record.state = :retrying', { retrying: JobState.RETRYING }); }), ) .orderBy('record.createdAt', 'ASC') .getOne(); if (record) { const job = this.fromRecord(record); job.start(); return job; } } async update(job: Job): Promise { if (!this.connectionAvailable(this.connection)) { return; } await this.connection.getRepository(JobRecord).save(this.toRecord(job)); } async findMany(options?: JobListOptions): Promise> { if (!this.connectionAvailable(this.connection)) { return { items: [], totalItems: 0, }; } return this.listQueryBuilder .build(JobRecord, options) .getManyAndCount() .then(([items, totalItems]) => ({ items: items.map(this.fromRecord), totalItems, })); } async findOne(id: ID): Promise { if (!this.connectionAvailable(this.connection)) { return; } const record = await this.connection.getRepository(JobRecord).findOne(id); if (record) { return this.fromRecord(record); } } async findManyById(ids: ID[]): Promise { if (!this.connectionAvailable(this.connection)) { return []; } return this.connection .getRepository(JobRecord) .findByIds(ids) .then(records => records.map(this.fromRecord)); } async removeSettledJobs(queueNames: string[] = [], olderThan?: Date) { if (!this.connectionAvailable(this.connection)) { return 0; } const findOptions: FindConditions = { ...(0 < queueNames.length ? { queueName: In(queueNames) } : {}), isSettled: true, settledAt: LessThan(olderThan || new Date()), }; const toDelete = await this.connection.getRepository(JobRecord).find({ where: findOptions }); const deleteCount = await this.connection.getRepository(JobRecord).count({ where: findOptions }); await this.connection.getRepository(JobRecord).delete(findOptions); return deleteCount; } private connectionAvailable(connection: Connection | undefined): connection is Connection { return !!this.connection && this.connection.isConnected; } private toRecord(job: Job): JobRecord { return new JobRecord({ id: job.id, queueName: job.queueName, data: job.data, state: job.state, progress: job.progress, result: job.result, error: job.error, startedAt: job.startedAt, settledAt: job.settledAt, isSettled: job.isSettled, retries: job.retries, attempts: job.attempts, }); } private fromRecord(jobRecord: JobRecord): Job { return new Job(jobRecord); } }