Browse Source

fix: Correct some buffer storage logic

Michael Bromley 4 years ago
parent
commit
74e87b9cd8

+ 9 - 5
packages/core/src/plugin/default-job-queue-plugin/sql-job-buffer-storage-strategy.ts

@@ -29,17 +29,21 @@ export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
     }
 
     async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
-        const rows = await this.connection
+        const qb = await this.connection
             .getRepository(JobRecordBuffer)
             .createQueryBuilder('record')
             .select(`COUNT(*)`, 'count')
-            .addSelect(`record.bufferId`, 'bufferId')
-            .groupBy('record.bufferId')
-            .getRawMany();
+            .addSelect(`record.bufferId`, 'bufferId');
+
+        if (bufferIds?.length) {
+            qb.andWhere(`record.bufferId IN (:...bufferIds)`, { bufferIds });
+        }
+
+        const rows = await qb.groupBy('record.bufferId').getRawMany();
 
         const result: { [bufferId: string]: number } = {};
         for (const row of rows) {
-            result[row.bufferId] = +row.count;
+            if (bufferIds) result[row.bufferId] = +row.count;
         }
         return result;
     }

+ 16 - 0
packages/job-queue-plugin/src/bullmq/redis-job-buffer-storage-strategy.ts

@@ -26,6 +26,7 @@ export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
     }
 
     async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
+        const ids = bufferIds?.length ? bufferIds : await this.getAllBufferIds();
         const result: { [bufferId: string]: number } = {};
         for (const id of bufferIds || []) {
             const key = this.keyName(id);
@@ -36,6 +37,7 @@ export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
     }
 
     async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }> {
+        const ids = bufferIds?.length ? bufferIds : await this.getAllBufferIds();
         const result: { [bufferId: string]: Job[] } = {};
         for (const id of bufferIds || []) {
             const key = this.keyName(id);
@@ -68,4 +70,18 @@ export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
             throw e;
         }
     }
+
+    private async getAllBufferIds(): Promise<string[]> {
+        const stream = this.redis.scanStream({
+            match: `${BUFFER_LIST_PREFIX}:*`,
+        });
+        const keys = await new Promise<string[]>((resolve, reject) => {
+            const allKeys: string[] = [];
+            stream.on('data', _keys => allKeys.push(..._keys));
+            stream.on('end', () => resolve(allKeys));
+            stream.on('error', err => reject(err));
+        });
+
+        return keys.map(key => key.replace(`${BUFFER_LIST_PREFIX}:`, ''));
+    }
 }