Просмотр исходного кода

refactor(job-queue-plugin): Make queue prefix optional in custom lua script

David Höck 1 год назад
Родитель
Сommit
4f6cbfd24a

+ 5 - 11
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -252,8 +252,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         let items: Bull.Job[] = [];
         let totalItems = 0;
 
-        // TODO: pagination with separated queues is not possible, because there is no getter for jobs of all queues
-
         try {
             const [total, jobIds] = await this.callCustomScript(getJobsByType, [
                 skip,
@@ -264,7 +262,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             items = (
                 await Promise.all(
                     jobIds.map(id => {
-                        return BullJob.fromId(this.queue, id);
+                        return this.findOneBullJob(id);
                     }),
                 )
             ).filter(notNullOrUndefined);
@@ -525,20 +523,16 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     }
 
     private callCustomScript<T, Args extends any[]>(
-        queueName: string,
         scriptDef: CustomScriptDefinition<T, Args>,
         args: Args,
+        queueName?: string,
     ): Promise<T> {
-        const bullQueueName = this.getBullQueueName(queueName);
-        const bullQueue = this.queues.get(bullQueueName);
-
-        if (!bullQueue) {
-            throw new InternalServerError(`Queue ${bullQueueName} not found. Could not call custom script`);
-        }
+        const bullQueueName = queueName ? this.getBullQueueName(queueName) : undefined;
+        const bullQueue = bullQueueName ? this.queues.get(bullQueueName) : undefined;
 
         return new Promise<T>((resolve, reject) => {
             (this.redisConnection as any)[scriptDef.name](
-                `bull:${bullQueue.name}:`,
+                bullQueue ? `bull:${bullQueue.name}:` : undefined,
                 ...args,
                 (err: any, result: any) => {
                     if (err) {

+ 3 - 5
packages/job-queue-plugin/src/bullmq/scripts/get-jobs-by-type.ts

@@ -4,14 +4,15 @@ import { CustomScriptDefinition } from '../types';
 const script = `--[[
   Get job ids per provided states and filter by name
     Input:
-      KEYS[1]    'prefix'
+      KEYS[1]    'prefix' (optional)
       ARGV[1]    start
       ARGV[2]    end
       ARGV[3]    filterName
       ARGV[4...] types
 ]]
 local rcall = redis.call
-local prefix = KEYS[1]
+ -- Use prefix if provided, otherwise an empty string
+local prefix = KEYS[1] ~= '' and KEYS[1] or ''
 local rangeStart = tonumber(ARGV[1])
 local rangeEnd = tonumber(ARGV[2])
 local filterName = ARGV[3]
@@ -28,7 +29,6 @@ local typesInUnion = {}
 -- regular lists
 local listsToInclude = {}
 
-
 -- Iterate through ARGV starting from the first element (ARGV[1]) up to the end
 for i = 4, #ARGV do
     local setKey = prefix .. ARGV[i]
@@ -65,7 +65,6 @@ end
 
 local originalResults = rcall("ZREVRANGE", tempSortedSetUnionKey, 0, -1)
 
-
 if #listsToInclude > 0 then
     for _, listKey in ipairs(listsToInclude) do
         local list = rcall("LRANGE", listKey, 0, -1)
@@ -75,7 +74,6 @@ if #listsToInclude > 0 then
     end
 end
 
-
 -- Define a custom comparison function for sorting in descending order
 local function compareDescending(a, b)
     return tonumber(a) > tonumber(b)