sql-job-queue-strategy.ts 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
  2. import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
  3. import { Brackets, Connection, EntityManager, FindOptionsWhere, In, LessThan } from 'typeorm';
  4. import { Injector } from '../../common/injector';
  5. import { InspectableJobQueueStrategy, JobQueueStrategy } from '../../config';
  6. import { Logger } from '../../config/logger/vendure-logger';
  7. import { TransactionalConnection } from '../../connection/transactional-connection';
  8. import { Job, JobData, JobQueueStrategyJobOptions } from '../../job-queue';
  9. import { PollingJobQueueStrategy } from '../../job-queue/polling-job-queue-strategy';
  10. import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder';
  11. import { JobRecord } from './job-record.entity';
  12. /**
  13. * @description
  14. * A {@link JobQueueStrategy} which uses the configured SQL database to persist jobs in the queue.
  15. * This strategy is used by the {@link DefaultJobQueuePlugin}.
  16. *
  17. * @docsCategory JobQueue
  18. */
  19. export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements InspectableJobQueueStrategy {
  20. private rawConnection: Connection | undefined;
  21. private connection: TransactionalConnection | undefined;
  22. private listQueryBuilder: ListQueryBuilder;
  23. init(injector: Injector) {
  24. this.rawConnection = injector.get(TransactionalConnection).rawConnection;
  25. this.connection = injector.get(TransactionalConnection);
  26. this.listQueryBuilder = injector.get(ListQueryBuilder);
  27. super.init(injector);
  28. }
  29. destroy() {
  30. this.rawConnection = undefined;
  31. super.destroy();
  32. }
  33. async add<Data extends JobData<Data> = object>(job: Job<Data>, jobOptions?: JobQueueStrategyJobOptions<Data>): Promise<Job<Data>> {
  34. if (!this.connectionAvailable(this.rawConnection)) {
  35. throw new Error('Connection not available');
  36. }
  37. const jobRecordRepository = jobOptions?.ctx && this.connection ? this.connection.getRepository(jobOptions.ctx, JobRecord) :
  38. this.rawConnection.getRepository(JobRecord);
  39. const constrainedData = this.constrainDataSize(job);
  40. const newRecord = this.toRecord(job, constrainedData, this.setRetries(job.queueName, job));
  41. const record = await jobRecordRepository.save(newRecord);
  42. return this.fromRecord(record);
  43. }
  44. /**
  45. * MySQL & MariaDB store job data as a "text" type which has a limit of 64kb. Going over that limit will cause the job to not be stored.
  46. * In order to try to prevent that, this method will truncate any strings in the `data` object over 2kb in size.
  47. */
  48. private constrainDataSize<Data extends JobData<Data> = object>(job: Job<Data>): Data | undefined {
  49. const type = this.rawConnection?.options.type;
  50. if (type === 'mysql' || type === 'mariadb') {
  51. const stringified = JSON.stringify(job.data);
  52. if (64 * 1024 <= stringified.length) {
  53. const truncatedKeys: Array<{ key: string; size: number }> = [];
  54. const reduced = JSON.parse(stringified, (key, value) => {
  55. if (typeof value === 'string' && 2048 < value.length) {
  56. truncatedKeys.push({ key, size: value.length });
  57. return `[truncated - originally ${value.length} bytes]`;
  58. }
  59. return value;
  60. });
  61. Logger.warn(
  62. `Job data for "${
  63. job.queueName
  64. }" is too long to store with the ${type} driver (${Math.round(
  65. stringified.length / 1024,
  66. )}kb).\nThe following keys were truncated: ${truncatedKeys
  67. .map(({ key, size }) => `${key} (${size} bytes)`)
  68. .join(', ')}`,
  69. );
  70. return reduced;
  71. }
  72. }
  73. }
  74. async next(queueName: string): Promise<Job | undefined> {
  75. if (!this.connectionAvailable(this.rawConnection)) {
  76. throw new Error('Connection not available');
  77. }
  78. const connection = this.rawConnection;
  79. const connectionType = this.rawConnection.options.type;
  80. const isSQLite =
  81. connectionType === 'sqlite' || connectionType === 'sqljs' || connectionType === 'better-sqlite3';
  82. return new Promise(async (resolve, reject) => {
  83. if (isSQLite) {
  84. try {
  85. // SQLite driver does not support concurrent transactions. See https://github.com/typeorm/typeorm/issues/1884
  86. const result = await this.getNextAndSetAsRunning(connection.manager, queueName, false);
  87. resolve(result);
  88. } catch (e: any) {
  89. reject(e);
  90. }
  91. } else {
  92. // Selecting the next job is wrapped in a transaction so that we can
  93. // set a lock on that row and immediately update the status to "RUNNING".
  94. // This prevents multiple worker processes from taking the same job when
  95. // running concurrent workers.
  96. connection
  97. .transaction(async transactionManager => {
  98. const result = await this.getNextAndSetAsRunning(transactionManager, queueName, true);
  99. resolve(result);
  100. })
  101. .catch(err => reject(err));
  102. }
  103. });
  104. }
  105. private async getNextAndSetAsRunning(
  106. manager: EntityManager,
  107. queueName: string,
  108. setLock: boolean,
  109. waitingJobIds: ID[] = [],
  110. ): Promise<Job | undefined> {
  111. const qb = manager
  112. .getRepository(JobRecord)
  113. .createQueryBuilder('record')
  114. .where('record.queueName = :queueName', { queueName })
  115. .andWhere(
  116. new Brackets(qb1 => {
  117. qb1.where('record.state = :pending', {
  118. pending: JobState.PENDING,
  119. }).orWhere('record.state = :retrying', { retrying: JobState.RETRYING });
  120. }),
  121. )
  122. .orderBy('record.createdAt', 'ASC');
  123. if (waitingJobIds.length) {
  124. qb.andWhere('record.id NOT IN (:...waitingJobIds)', { waitingJobIds });
  125. }
  126. if (setLock) {
  127. qb.setLock('pessimistic_write');
  128. }
  129. const record = await qb.getOne();
  130. if (record) {
  131. const job = this.fromRecord(record);
  132. if (record.state === JobState.RETRYING && typeof this.backOffStrategy === 'function') {
  133. const msSinceLastFailure = Date.now() - +record.updatedAt;
  134. const backOffDelayMs = this.backOffStrategy(queueName, record.attempts, job);
  135. if (msSinceLastFailure < backOffDelayMs) {
  136. return await this.getNextAndSetAsRunning(manager, queueName, setLock, [
  137. ...waitingJobIds,
  138. record.id,
  139. ]);
  140. }
  141. }
  142. job.start();
  143. record.state = JobState.RUNNING;
  144. await manager.getRepository(JobRecord).save(record, { reload: false });
  145. return job;
  146. } else {
  147. return;
  148. }
  149. }
  150. async update(job: Job<any>): Promise<void> {
  151. if (!this.connectionAvailable(this.rawConnection)) {
  152. throw new Error('Connection not available');
  153. }
  154. await this.rawConnection
  155. .getRepository(JobRecord)
  156. .createQueryBuilder('job')
  157. .update()
  158. .set(this.toRecord(job))
  159. .where('id = :id', { id: job.id })
  160. .andWhere('settledAt IS NULL')
  161. .execute();
  162. }
  163. async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
  164. if (!this.connectionAvailable(this.rawConnection)) {
  165. throw new Error('Connection not available');
  166. }
  167. return this.listQueryBuilder
  168. .build(JobRecord, options)
  169. .getManyAndCount()
  170. .then(([items, totalItems]) => ({
  171. items: items.map(this.fromRecord),
  172. totalItems,
  173. }));
  174. }
  175. async findOne(id: ID): Promise<Job | undefined> {
  176. if (!this.connectionAvailable(this.rawConnection)) {
  177. throw new Error('Connection not available');
  178. }
  179. const record = await this.rawConnection.getRepository(JobRecord).findOne({ where: { id } });
  180. if (record) {
  181. return this.fromRecord(record);
  182. }
  183. }
  184. async findManyById(ids: ID[]): Promise<Job[]> {
  185. if (!this.connectionAvailable(this.rawConnection)) {
  186. throw new Error('Connection not available');
  187. }
  188. return this.rawConnection
  189. .getRepository(JobRecord)
  190. .find({ where: { id: In(ids) } })
  191. .then(records => records.map(this.fromRecord));
  192. }
  193. async removeSettledJobs(queueNames: string[] = [], olderThan?: Date) {
  194. if (!this.connectionAvailable(this.rawConnection)) {
  195. throw new Error('Connection not available');
  196. }
  197. const findOptions: FindOptionsWhere<JobRecord> = {
  198. ...(0 < queueNames.length ? { queueName: In(queueNames) } : {}),
  199. isSettled: true,
  200. settledAt: LessThan(olderThan || new Date()),
  201. };
  202. const toDelete = await this.rawConnection.getRepository(JobRecord).find({ where: findOptions });
  203. const deleteCount = await this.rawConnection.getRepository(JobRecord).count({ where: findOptions });
  204. await this.rawConnection.getRepository(JobRecord).delete(findOptions);
  205. return deleteCount;
  206. }
  207. private connectionAvailable(connection: Connection | undefined): connection is Connection {
  208. return !!this.rawConnection && this.rawConnection.isConnected;
  209. }
  210. private toRecord(job: Job<any>, data?: any, retries?: number): JobRecord {
  211. return new JobRecord({
  212. id: job.id || undefined,
  213. queueName: job.queueName,
  214. data: data ?? job.data,
  215. state: job.state,
  216. progress: job.progress,
  217. result: job.result,
  218. error: job.error,
  219. startedAt: job.startedAt,
  220. settledAt: job.settledAt,
  221. isSettled: job.isSettled,
  222. retries: retries ?? job.retries,
  223. attempts: job.attempts,
  224. });
  225. }
  226. private fromRecord(this: void, jobRecord: JobRecord): Job<any> {
  227. return new Job<any>(jobRecord);
  228. }
  229. }