job-buffer.service.ts 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import { Injectable } from '@nestjs/common';
  2. import { InternalServerError } from '../../common/error/errors';
  3. import { ConfigService } from '../../config/config.service';
  4. import { Logger } from '../../config/logger/vendure-logger';
  5. import { Job } from '../job';
  6. import { JobBuffer } from './job-buffer';
  7. import { JobBufferStorageStrategy } from './job-buffer-storage-strategy';
  8. /**
  9. * @description
  10. * Used to manage {@link JobBuffer}s.Primarily intended to be used internally by the {@link JobQueueService}, which
  11. * exposes its public-facing functionality.
  12. */
  13. @Injectable()
  14. export class JobBufferService {
  15. private buffers = new Set<JobBuffer>();
  16. private storageStrategy: JobBufferStorageStrategy;
  17. constructor(private configService: ConfigService) {
  18. this.storageStrategy = configService.jobQueueOptions.jobBufferStorageStrategy;
  19. }
  20. addBuffer(buffer: JobBuffer<any>) {
  21. const idAlreadyExists = Array.from(this.buffers).find(p => p.id === buffer.id);
  22. if (idAlreadyExists) {
  23. throw new InternalServerError(
  24. `There is already a JobBufferProcessor with the id "${buffer.id}". Ids must be unique`,
  25. );
  26. }
  27. this.buffers.add(buffer);
  28. }
  29. removeBuffer(buffer: JobBuffer<any>) {
  30. this.buffers.delete(buffer);
  31. }
  32. async add(job: Job): Promise<boolean> {
  33. let collected = false;
  34. for (const buffer of this.buffers) {
  35. const shouldCollect = await buffer.collect(job);
  36. if (shouldCollect) {
  37. collected = true;
  38. await this.storageStrategy.add(buffer.id, job);
  39. }
  40. }
  41. return collected;
  42. }
  43. bufferSize(forBuffers?: Array<JobBuffer | string>): Promise<{ [bufferId: string]: number }> {
  44. const buffer = forBuffers ?? Array.from(this.buffers);
  45. return this.storageStrategy.bufferSize(buffer.map(p => (typeof p === 'string' ? p : p.id)));
  46. }
  47. async flush(forBuffers?: Array<JobBuffer | string>): Promise<Job[]> {
  48. const { jobQueueStrategy } = this.configService.jobQueueOptions;
  49. const buffers = forBuffers ?? Array.from(this.buffers);
  50. const flushResult = await this.storageStrategy.flush(
  51. buffers.map(p => (typeof p === 'string' ? p : p.id)),
  52. );
  53. const result: Job[] = [];
  54. for (const buffer of this.buffers) {
  55. const jobsForBuffer = flushResult[buffer.id];
  56. if (jobsForBuffer?.length) {
  57. let jobsToAdd = jobsForBuffer;
  58. try {
  59. jobsToAdd = await buffer.reduce(jobsForBuffer);
  60. } catch (e: any) {
  61. Logger.error(
  62. `Error encountered processing jobs in JobBuffer "${buffer.id}":\n${JSON.stringify(
  63. e.message,
  64. )}`,
  65. undefined,
  66. e.stack,
  67. );
  68. }
  69. for (const job of jobsToAdd) {
  70. result.push(await jobQueueStrategy.add(job));
  71. }
  72. }
  73. }
  74. return result;
  75. }
  76. }