sql-job-queue-strategy.ts 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
  2. import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
  3. import { Brackets, Connection, FindConditions, In, LessThan } from 'typeorm';
  4. import { Injector } from '../../common/injector';
  5. import { JobQueueStrategy } from '../../config/job-queue/job-queue-strategy';
  6. import { Job } from '../../job-queue/job';
  7. import { ProcessContext } from '../../process-context/process-context';
  8. import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder';
  9. import { JobRecord } from './job-record.entity';
  10. /**
  11. * @description
  12. * A {@link JobQueueStrategy} which uses the configured SQL database to persist jobs in the queue.
  13. * This strategy is used by the {@link DefaultJobQueuePlugin}.
  14. *
  15. * @docsCategory JobQueue
  16. */
  17. export class SqlJobQueueStrategy implements JobQueueStrategy {
  18. private connection: Connection | undefined;
  19. private listQueryBuilder: ListQueryBuilder;
  20. init(injector: Injector) {
  21. const processContext = injector.get(ProcessContext);
  22. if (processContext.isServer) {
  23. this.connection = injector.getConnection();
  24. this.listQueryBuilder = injector.get(ListQueryBuilder);
  25. }
  26. }
  27. async add(job: Job): Promise<Job> {
  28. if (!this.connectionAvailable(this.connection)) {
  29. return job;
  30. }
  31. const newRecord = this.toRecord(job);
  32. const record = await this.connection.getRepository(JobRecord).save(newRecord);
  33. return this.fromRecord(record);
  34. }
  35. async next(queueName: string): Promise<Job | undefined> {
  36. if (!this.connectionAvailable(this.connection)) {
  37. return;
  38. }
  39. const record = await this.connection
  40. .getRepository(JobRecord)
  41. .createQueryBuilder('record')
  42. .where('record.queueName = :queueName', { queueName })
  43. .andWhere(
  44. new Brackets(qb => {
  45. qb.where('record.state = :pending', {
  46. pending: JobState.PENDING,
  47. }).orWhere('record.state = :retrying', { retrying: JobState.RETRYING });
  48. }),
  49. )
  50. .orderBy('record.createdAt', 'ASC')
  51. .getOne();
  52. if (record) {
  53. const job = this.fromRecord(record);
  54. job.start();
  55. return job;
  56. }
  57. }
  58. async update(job: Job<any>): Promise<void> {
  59. if (!this.connectionAvailable(this.connection)) {
  60. return;
  61. }
  62. await this.connection.getRepository(JobRecord).save(this.toRecord(job));
  63. }
  64. async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
  65. if (!this.connectionAvailable(this.connection)) {
  66. return {
  67. items: [],
  68. totalItems: 0,
  69. };
  70. }
  71. return this.listQueryBuilder
  72. .build(JobRecord, options)
  73. .getManyAndCount()
  74. .then(([items, totalItems]) => ({
  75. items: items.map(this.fromRecord),
  76. totalItems,
  77. }));
  78. }
  79. async findOne(id: ID): Promise<Job | undefined> {
  80. if (!this.connectionAvailable(this.connection)) {
  81. return;
  82. }
  83. const record = await this.connection.getRepository(JobRecord).findOne(id);
  84. if (record) {
  85. return this.fromRecord(record);
  86. }
  87. }
  88. async findManyById(ids: ID[]): Promise<Job[]> {
  89. if (!this.connectionAvailable(this.connection)) {
  90. return [];
  91. }
  92. return this.connection
  93. .getRepository(JobRecord)
  94. .findByIds(ids)
  95. .then(records => records.map(this.fromRecord));
  96. }
  97. async removeSettledJobs(queueNames: string[] = [], olderThan?: Date) {
  98. if (!this.connectionAvailable(this.connection)) {
  99. return 0;
  100. }
  101. const findOptions: FindConditions<JobRecord> = {
  102. ...(0 < queueNames.length ? { queueName: In(queueNames) } : {}),
  103. isSettled: true,
  104. settledAt: LessThan(olderThan || new Date()),
  105. };
  106. const toDelete = await this.connection.getRepository(JobRecord).find({ where: findOptions });
  107. const deleteCount = await this.connection.getRepository(JobRecord).count({ where: findOptions });
  108. await this.connection.getRepository(JobRecord).delete(findOptions);
  109. return deleteCount;
  110. }
  111. private connectionAvailable(connection: Connection | undefined): connection is Connection {
  112. return !!this.connection && this.connection.isConnected;
  113. }
  114. private toRecord(job: Job<any>): JobRecord {
  115. return new JobRecord({
  116. id: job.id,
  117. queueName: job.queueName,
  118. data: job.data,
  119. state: job.state,
  120. progress: job.progress,
  121. result: job.result,
  122. error: job.error,
  123. startedAt: job.startedAt,
  124. settledAt: job.settledAt,
  125. isSettled: job.isSettled,
  126. retries: job.retries,
  127. attempts: job.attempts,
  128. });
  129. }
  130. private fromRecord(jobRecord: JobRecord): Job<any> {
  131. return new Job<any>(jobRecord);
  132. }
  133. }