sql-job-queue-strategy.ts 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. import { ModuleRef } from '@nestjs/core';
  2. import { getConnectionToken } from '@nestjs/typeorm';
  3. import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
  4. import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
  5. import { Brackets, Connection, FindConditions, In, LessThan, Not } from 'typeorm';
  6. import { JobQueueStrategy } from '../../config/job-queue/job-queue-strategy';
  7. import { Job } from '../../job-queue/job';
  8. import { ProcessContext } from '../../process-context/process-context';
  9. import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder';
  10. import { JobRecord } from './job-record.entity';
  11. export class SqlJobQueueStrategy implements JobQueueStrategy {
  12. private connection: Connection | undefined;
  13. private listQueryBuilder: ListQueryBuilder;
  14. init(moduleRef: ModuleRef) {
  15. const processContext = moduleRef.get(ProcessContext, { strict: false });
  16. if (processContext.isServer) {
  17. this.connection = moduleRef.get(getConnectionToken() as any, { strict: false });
  18. this.listQueryBuilder = moduleRef.get(ListQueryBuilder, { strict: false });
  19. }
  20. }
  21. async add(job: Job): Promise<Job> {
  22. if (!this.connectionAvailable(this.connection)) {
  23. return job;
  24. }
  25. const newRecord = this.toRecord(job);
  26. const record = await this.connection.getRepository(JobRecord).save(newRecord);
  27. return this.fromRecord(record);
  28. }
  29. async next(queueName: string): Promise<Job | undefined> {
  30. if (!this.connectionAvailable(this.connection)) {
  31. return;
  32. }
  33. const record = await this.connection
  34. .getRepository(JobRecord)
  35. .createQueryBuilder('record')
  36. .where('record.queueName = :queueName', { queueName })
  37. .andWhere(
  38. new Brackets(qb => {
  39. qb.where('record.state = :pending', {
  40. pending: JobState.PENDING,
  41. }).orWhere('record.state = :retrying', { retrying: JobState.RETRYING });
  42. }),
  43. )
  44. .orderBy('record.createdAt', 'ASC')
  45. .getOne();
  46. if (record) {
  47. const job = this.fromRecord(record);
  48. job.start();
  49. return job;
  50. }
  51. }
  52. async update(job: Job<any>): Promise<void> {
  53. if (!this.connectionAvailable(this.connection)) {
  54. return;
  55. }
  56. await this.connection.getRepository(JobRecord).save(this.toRecord(job));
  57. }
  58. async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
  59. if (!this.connectionAvailable(this.connection)) {
  60. return {
  61. items: [],
  62. totalItems: 0,
  63. };
  64. }
  65. return this.listQueryBuilder
  66. .build(JobRecord, options)
  67. .getManyAndCount()
  68. .then(([items, totalItems]) => ({
  69. items: items.map(this.fromRecord),
  70. totalItems,
  71. }));
  72. }
  73. async findOne(id: ID): Promise<Job | undefined> {
  74. if (!this.connectionAvailable(this.connection)) {
  75. return;
  76. }
  77. const record = await this.connection.getRepository(JobRecord).findOne(id);
  78. if (record) {
  79. return this.fromRecord(record);
  80. }
  81. }
  82. async findManyById(ids: ID[]): Promise<Job[]> {
  83. if (!this.connectionAvailable(this.connection)) {
  84. return [];
  85. }
  86. return this.connection
  87. .getRepository(JobRecord)
  88. .findByIds(ids)
  89. .then(records => records.map(this.fromRecord));
  90. }
  91. async removeSettledJobs(queueNames: string[] = [], olderThan?: Date) {
  92. if (!this.connectionAvailable(this.connection)) {
  93. return 0;
  94. }
  95. const findOptions: FindConditions<JobRecord> = {
  96. ...(0 < queueNames.length ? { queueName: In(queueNames) } : {}),
  97. isSettled: true,
  98. settledAt: LessThan(olderThan || new Date()),
  99. };
  100. const toDelete = await this.connection.getRepository(JobRecord).find({ where: findOptions });
  101. const deleteCount = await this.connection.getRepository(JobRecord).count({ where: findOptions });
  102. await this.connection.getRepository(JobRecord).delete(findOptions);
  103. return deleteCount;
  104. }
  105. private connectionAvailable(connection: Connection | undefined): connection is Connection {
  106. return !!this.connection && this.connection.isConnected;
  107. }
  108. private toRecord(job: Job<any>): JobRecord {
  109. return new JobRecord({
  110. id: job.id,
  111. queueName: job.queueName,
  112. data: job.data,
  113. state: job.state,
  114. progress: job.progress,
  115. result: job.result,
  116. error: job.error,
  117. startedAt: job.startedAt,
  118. settledAt: job.settledAt,
  119. isSettled: job.isSettled,
  120. retries: job.retries,
  121. attempts: job.attempts,
  122. });
  123. }
  124. private fromRecord(jobRecord: JobRecord): Job<any> {
  125. return new Job<any>(jobRecord);
  126. }
  127. }