Przeglądaj źródła

fix(job-queue-plugin): Correctly list running jobs

Michael Bromley 6 miesięcy temu
rodzic
commit
391e31488a

+ 45 - 0
packages/job-queue-plugin/src/bullmq/scripts/get-jobs-by-type.ts

@@ -27,6 +27,7 @@ local totalResults = 0
 -- Create a temporary key for merging results
 local tempKey = prefix .. 'temp:merge:' .. math.random(1000000)
 local sourceKeys = {}
+local listKeys = {}
 
 -- Function to count jobs in a sorted set
 local function countJobsInSortedSet(key)
@@ -54,6 +55,7 @@ if filterName ~= "" then
             table.insert(sourceKeys, indexedKey)
         elseif keyType == 'list' then
             totalResults = totalResults + countJobsInList(indexedKey)
+            table.insert(listKeys, indexedKey)
         end
     end
 else
@@ -71,6 +73,8 @@ else
             table.insert(sourceKeys, key)
         elseif keyType == 'list' then
             totalResults = totalResults + countJobsInList(key)
+            table.insert(listKeys, key)
+            -- redis.log(redis.LOG_NOTICE, 'total jobs in list: ' .. totalResults)
         end
     end
 end
@@ -131,6 +135,47 @@ if #sourceKeys > 0 then
     end
 end
 
+-- Handle list keys separately
+if #listKeys > 0 then
+    -- Create a temporary list to merge all list keys
+    local tempListKey = tempKey .. ':list'
+
+    -- Merge all list keys into the temporary list
+    for _, listKey in ipairs(listKeys) do
+        local listElements = rcall('LRANGE', listKey, 0, -1)
+        if #listElements > 0 then
+            rcall('RPUSH', tempListKey, unpack(listElements))
+        end
+    end
+
+    -- Get the total number of elements in the merged list
+    local totalListElements = rcall('LLEN', tempListKey)
+
+    if totalListElements > 0 then
+        -- If we already have results from sorted sets, we need to merge them
+        if #results > 0 then
+            -- Convert the merged sorted set results to a temporary list
+            local tempSortedListKey = tempKey .. ':sorted'
+            for _, jobId in ipairs(results) do
+                rcall('RPUSH', tempSortedListKey, jobId)
+            end
+
+            -- Merge the sorted results with list results
+            local sortedElements = rcall('LRANGE', tempSortedListKey, 0, -1)
+            rcall('RPUSH', tempListKey, unpack(sortedElements))
+
+            -- Clean up temporary sorted list
+            rcall('DEL', tempSortedListKey)
+        end
+
+        -- Get the final paginated results from the merged list
+        results = rcall('LRANGE', tempListKey, skip, skip + take - 1)
+
+        -- Clean up temporary list
+        rcall('DEL', tempListKey)
+    end
+end
+
 -- Clean up temporary key
 rcall('DEL', tempKey)