sql-job-buffer-storage-strategy.ts 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import { Injector } from '../../common/injector';
  2. import { TransactionalConnection } from '../../connection/transactional-connection';
  3. import { Job } from '../../job-queue/job';
  4. import { JobBufferStorageStrategy } from '../../job-queue/job-buffer/job-buffer-storage-strategy';
  5. import { JobConfig } from '../../job-queue/types';
  6. import { JobRecordBuffer } from './job-record-buffer.entity';
  7. /**
  8. * @description
  9. * This stores the buffered jobs in the database.
  10. */
  11. export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
  12. private connection: TransactionalConnection;
  13. init(injector: Injector) {
  14. this.connection = injector.get(TransactionalConnection);
  15. }
  16. async add(bufferId: string, job: Job): Promise<Job> {
  17. await this.connection.rawConnection.getRepository(JobRecordBuffer).save(
  18. new JobRecordBuffer({
  19. bufferId,
  20. job: this.toJobConfig(job),
  21. }),
  22. );
  23. return job;
  24. }
  25. async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
  26. const qb = await this.connection
  27. .rawConnection
  28. .getRepository(JobRecordBuffer)
  29. .createQueryBuilder('record')
  30. .select(`COUNT(*)`, 'count')
  31. .addSelect(`record.bufferId`, 'bufferId');
  32. if (bufferIds?.length) {
  33. qb.andWhere(`record.bufferId IN (:...bufferIds)`, { bufferIds });
  34. }
  35. const rows = await qb.groupBy('record.bufferId').getRawMany();
  36. const result: { [bufferId: string]: number } = {};
  37. for (const row of rows) {
  38. if (bufferIds) result[row.bufferId] = +row.count;
  39. }
  40. return result;
  41. }
  42. async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }> {
  43. const selectQb = this.connection.rawConnection
  44. .getRepository(JobRecordBuffer)
  45. .createQueryBuilder('record');
  46. if (bufferIds?.length) {
  47. selectQb.where(`record.bufferId IN (:...bufferIds)`, { bufferIds });
  48. }
  49. const rows = await selectQb.getMany();
  50. const result: { [bufferId: string]: Job[] } = {};
  51. for (const row of rows) {
  52. if (!result[row.bufferId]) {
  53. result[row.bufferId] = [];
  54. }
  55. result[row.bufferId].push(new Job(row.job));
  56. }
  57. const deleteQb = this.connection.rawConnection.createQueryBuilder().delete().from(JobRecordBuffer);
  58. if (bufferIds?.length) {
  59. deleteQb.where(`bufferId IN (:...bufferIds)`, { bufferIds });
  60. }
  61. await deleteQb.execute();
  62. return result;
  63. }
  64. private toJobConfig(job: Job<any>): JobConfig<any> {
  65. return {
  66. ...job,
  67. data: job.data,
  68. id: job.id ?? undefined,
  69. };
  70. }
  71. }