| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import { Injector } from '../../common/injector';
- import { TransactionalConnection } from '../../connection/transactional-connection';
- import { Job } from '../../job-queue/job';
- import { JobBufferStorageStrategy } from '../../job-queue/job-buffer/job-buffer-storage-strategy';
- import { JobConfig } from '../../job-queue/types';
- import { JobRecordBuffer } from './job-record-buffer.entity';
- /**
- * @description
- * This stores the buffered jobs in the database.
- */
- export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
- private connection: TransactionalConnection;
- init(injector: Injector) {
- this.connection = injector.get(TransactionalConnection);
- }
- async add(bufferId: string, job: Job): Promise<Job> {
- await this.connection.rawConnection.getRepository(JobRecordBuffer).save(
- new JobRecordBuffer({
- bufferId,
- job: this.toJobConfig(job),
- }),
- );
- return job;
- }
- async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
- const qb = await this.connection
- .rawConnection
- .getRepository(JobRecordBuffer)
- .createQueryBuilder('record')
- .select(`COUNT(*)`, 'count')
- .addSelect(`record.bufferId`, 'bufferId');
- if (bufferIds?.length) {
- qb.andWhere(`record.bufferId IN (:...bufferIds)`, { bufferIds });
- }
- const rows = await qb.groupBy('record.bufferId').getRawMany();
- const result: { [bufferId: string]: number } = {};
- for (const row of rows) {
- if (bufferIds) result[row.bufferId] = +row.count;
- }
- return result;
- }
- async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }> {
- const selectQb = this.connection.rawConnection
- .getRepository(JobRecordBuffer)
- .createQueryBuilder('record');
- if (bufferIds?.length) {
- selectQb.where(`record.bufferId IN (:...bufferIds)`, { bufferIds });
- }
- const rows = await selectQb.getMany();
- const result: { [bufferId: string]: Job[] } = {};
- for (const row of rows) {
- if (!result[row.bufferId]) {
- result[row.bufferId] = [];
- }
- result[row.bufferId].push(new Job(row.job));
- }
- const deleteQb = this.connection.rawConnection.createQueryBuilder().delete().from(JobRecordBuffer);
- if (bufferIds?.length) {
- deleteQb.where(`bufferId IN (:...bufferIds)`, { bufferIds });
- }
- await deleteQb.execute();
- return result;
- }
- private toJobConfig(job: Job<any>): JobConfig<any> {
- return {
- ...job,
- data: job.data,
- id: job.id ?? undefined,
- };
- }
- }
|