Bläddra i källkod

perf(job-queue-plugin): Optimize list query for BullMQJobQueuePlugin (#3590)

Michael Bromley 7 månader sedan
förälder
incheckning
208b87adf7

+ 75 - 0
packages/dev-server/test-plugins/job-queue-stress-test/job-queue-stress-test.plugin.ts

@@ -0,0 +1,75 @@
+import { Injectable, OnModuleInit } from '@nestjs/common';
+import { Args, Mutation, Resolver } from '@nestjs/graphql';
+import { JobQueue, JobQueueService, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
+import gql from 'graphql-tag';
+
+@Injectable()
+class TestQueueService implements OnModuleInit {
+    private jobQueue: JobQueue<{ message: string }>;
+
+    constructor(private jobQueueService: JobQueueService) {}
+
+    async onModuleInit() {
+        this.jobQueue = await this.jobQueueService.createQueue({
+            name: 'test-queue',
+            process: async job => {
+                // Process the job here
+                Logger.info(`Processing job with message: ${job.data.message}`, 'TestQueueService');
+                if (Math.random() < 0.2) {
+                    throw new Error('Random failure occurred while processing job');
+                }
+                return { processed: true, message: job.data.message };
+            },
+        });
+    }
+
+    addJob(message: string) {
+        return this.jobQueue.add({ message });
+    }
+
+    async generateJobs(count: number) {
+        const jobs = [];
+        for (let i = 0; i < count; i++) {
+            jobs.push(this.addJob(`Test job ${i + 1}`));
+            // await new Promise(resolve => setTimeout(resolve, 100));
+        }
+        return Promise.all(jobs);
+    }
+}
+
+@Resolver()
+class TestQueueResolver {
+    constructor(private testQueueService: TestQueueService) {}
+
+    @Mutation()
+    async generateTestJobs(@Args() args: { jobCount: number }) {
+        const jobs = await this.testQueueService.generateJobs(args.jobCount);
+        return {
+            success: true,
+            jobCount: jobs.length,
+        };
+    }
+}
+
+/**
+ * This plugin can create a large number of jobs in the job queue, which we
+ * can then use to stress test the job queue's ability to return lists of jobs.
+ */
+@VendurePlugin({
+    imports: [PluginCommonModule],
+    providers: [TestQueueService],
+    adminApiExtensions: {
+        schema: gql`
+            extend type Mutation {
+                generateTestJobs(jobCount: Int!): GenerateTestJobsResult!
+            }
+
+            type GenerateTestJobsResult {
+                success: Boolean!
+                jobCount: Int!
+            }
+        `,
+        resolvers: [TestQueueResolver],
+    },
+})
+export class JobQueueStressTestPlugin {}

+ 14 - 6
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -11,26 +11,31 @@ import {
     PaginatedList,
 } from '@vendure/core';
 import Bull, {
+    Job as BullJob,
     ConnectionOptions,
     JobType,
     Processor,
     Queue,
     Worker,
     WorkerOptions,
-    Job as BullJob,
 } from 'bullmq';
 import { EventEmitter } from 'events';
 import { Cluster, Redis, RedisOptions } from 'ioredis';
 import { Subject } from 'rxjs';
 import { filter, takeUntil } from 'rxjs/operators';
 
-import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
+import {
+    ALL_JOB_TYPES,
+    BULLMQ_PLUGIN_OPTIONS,
+    DEFAULT_CONCURRENCY,
+    loggerCtx,
+    QUEUE_NAME,
+} from './constants';
+import { JobListIndexService } from './job-list-index.service';
 import { RedisHealthIndicator } from './redis-health-indicator';
 import { getJobsByType } from './scripts/get-jobs-by-type';
 import { BullMQPluginOptions, CustomScriptDefinition } from './types';
-
-const QUEUE_NAME = 'vendure-job-queue';
-const DEFAULT_CONCURRENCY = 3;
+import { getPrefix } from './utils';
 
 /**
  * @description
@@ -46,6 +51,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     private worker: Worker;
     private workerProcessor: Processor;
     private options: BullMQPluginOptions;
+    private jobListIndexService: JobListIndexService;
     private readonly queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
     private cancellationSub: Redis;
     private readonly cancelRunningJob$ = new Subject<string>();
@@ -54,6 +60,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 
     async init(injector: Injector): Promise<void> {
         const options = injector.get<BullMQPluginOptions>(BULLMQ_PLUGIN_OPTIONS);
+        this.jobListIndexService = injector.get(JobListIndexService);
         this.options = {
             ...options,
             workerOptions: {
@@ -138,6 +145,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         };
         // Subscription-mode Redis connection for the cancellation messages
         this.cancellationSub = new Redis(this.connectionOptions as RedisOptions);
+        this.jobListIndexService.register(this.redisConnection, this.queue);
     }
 
     async destroy() {
@@ -404,7 +412,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         args: Args,
     ): Promise<T> {
         return new Promise<T>((resolve, reject) => {
-            const prefix = this.options.workerOptions?.prefix ?? 'bull';
+            const prefix = getPrefix(this.options);
             (this.redisConnection as any)[scriptDef.name](
                 `${prefix}:${this.queue.name}:`,
                 ...args,

+ 12 - 0
packages/job-queue-plugin/src/bullmq/clean-indexed-sets-task.ts

@@ -0,0 +1,12 @@
+import { ScheduledTask } from '@vendure/core';
+
+import { JobListIndexService } from './job-list-index.service';
+
+export const cleanIndexedSetsTask = new ScheduledTask({
+    id: 'clean-job-queue-index',
+    description: 'Cleans up the index used to speed up job queue listing operations',
+    schedule: cron => cron.everyMinute(),
+    async execute({ injector }) {
+        return injector.get(JobListIndexService).cleanupIndexedSets();
+    },
+});

+ 2 - 0
packages/job-queue-plugin/src/bullmq/constants.ts

@@ -2,6 +2,8 @@ import { JobType } from 'bullmq';
 
 export const loggerCtx = 'BullMQJobQueuePlugin';
 export const BULLMQ_PLUGIN_OPTIONS = Symbol('BULLMQ_PLUGIN_OPTIONS');
+export const QUEUE_NAME = 'vendure-job-queue';
+export const DEFAULT_CONCURRENCY = 3;
 
 export const ALL_JOB_TYPES: JobType[] = [
     'completed',

+ 314 - 0
packages/job-queue-plugin/src/bullmq/job-list-index.service.ts

@@ -0,0 +1,314 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { Logger, ProcessContext } from '@vendure/core';
+import { Job, JobType, Queue, QueueEvents } from 'bullmq';
+import Redis, { Cluster } from 'ioredis';
+
+import { BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
+import { BullMQPluginOptions } from './types';
+import { getPrefix } from './utils';
+
+/**
+ * @description
+ * In order to efficiently query jobs in the job queue, we use a "sorted set" in Redis to track jobs
+ * added to each queue. This allows to quickly fetch a list of jobs in a given queue without needing
+ * to iterate over all jobs in the queue and read the job data.
+ *
+ * By using this approach we can achieve a several order of magnitude improvement in performance
+ * over the former approach of iterating over all jobs via the custom LUA script.
+ *
+ * This also means that we need to periodically clean up the sorted sets to remove jobs that have
+ * been removed from the queue (via the automatic removal features of BullMQ). Why do we need to
+ * do this scheduled cleanup? Because currently BullMQ does not provide an event for when a job
+ * is automatically removed from the queue, so we cannot listen for that event and remove. The
+ * "removed" event is only emitted when a job is removed manually via the `remove()` method.
+ * See https://github.com/taskforcesh/bullmq/issues/3209#issuecomment-2795102551
+ */
+@Injectable()
+export class JobListIndexService {
+    private readonly BATCH_SIZE = 100;
+    private redis: Redis | Cluster;
+    private queue: Queue | undefined;
+    private queueEvents: QueueEvents | undefined;
+    private allStates: JobType[] = [
+        'wait',
+        'active',
+        'completed',
+        'failed',
+        'delayed',
+        'waiting-children',
+        'prioritized',
+    ];
+
+    constructor(
+        @Inject(BULLMQ_PLUGIN_OPTIONS) private readonly options: BullMQPluginOptions,
+        private readonly processContext: ProcessContext,
+    ) {}
+
+    /**
+     * @description
+     * Should be called by the BullMQJobQueueStrategy as soon as the Redis connection and Queue
+     * object are available in the init() function.
+     */
+    register(redisConnection: Redis | Cluster, queue: Queue) {
+        this.redis = redisConnection;
+        this.queue = queue;
+        this.queueEvents = new QueueEvents(queue.name, { connection: redisConnection });
+        this.setupEventListeners();
+        void this.migrateExistingJobs();
+    }
+
+    private setupEventListeners() {
+        if (this.processContext.isServer) return;
+        if (!this.queueEvents || !this.queue) return;
+
+        // When a job is added to the queue
+        this.queueEvents.on('waiting', ({ jobId }) => {
+            void this.updateJobIndex(jobId, 'wait');
+        });
+
+        this.queueEvents.on('waiting-children', ({ jobId }) => {
+            void this.updateJobIndex(jobId, 'waiting-children');
+        });
+
+        // When a job starts processing
+        this.queueEvents.on('active', ({ jobId }) => {
+            void this.updateJobIndex(jobId, 'active');
+        });
+
+        // When a job completes successfully
+        this.queueEvents.on('completed', ({ jobId }) => {
+            void this.updateJobIndex(jobId, 'completed');
+        });
+
+        // When a job fails
+        this.queueEvents.on('failed', ({ jobId }) => {
+            void this.updateJobIndex(jobId, 'failed');
+        });
+
+        // When a job is delayed
+        this.queueEvents.on('delayed', ({ jobId }) => {
+            void this.updateJobIndex(jobId, 'delayed');
+        });
+
+        // When a job is removed
+        this.queueEvents.on('removed', ({ jobId }) => {
+            void this.removeJobFromAllIndices(jobId);
+        });
+    }
+
+    /**
+     * When a job's state changes, we need to update the indexed set
+     * to reflect the new state of the job.
+     */
+    private async updateJobIndex(jobId: string, state: JobType) {
+        if (!this.redis || !this.queue) return;
+
+        try {
+            const job: Job | undefined = await this.queue.getJob(jobId);
+            if (!job) return;
+            const timestamp = job.timestamp;
+            const targetKey = this.createSortedSetKey(job.name, state);
+
+            // Remove from all state indices first
+            await this.removeJobFromAllIndices(jobId);
+
+            // Add to the specific state index
+            const result = await this.redis.zadd(targetKey, timestamp, jobId);
+            if (result === 1) {
+                Logger.debug(`Added job ${jobId} to indexed key: ${targetKey}`, loggerCtx);
+            }
+        } catch (err: unknown) {
+            const error = err as Error;
+            Logger.error(`Failed to update job index: ${error.message}`, loggerCtx);
+        }
+    }
+
+    private async removeJobFromAllIndices(jobId: string) {
+        if (!this.redis || !this.queue) return;
+
+        try {
+            const job: Job | undefined = await this.queue.getJob(jobId);
+            if (!job) return;
+            const pipeline = this.redis.pipeline();
+
+            for (const state of this.allStates) {
+                const indexedKey = this.createSortedSetKey(job.name, state);
+                pipeline.zrem(indexedKey, jobId);
+            }
+
+            await pipeline.exec();
+        } catch (err: unknown) {
+            const error = err as Error;
+            Logger.error(`Failed to remove job from indices: ${error.message}`, loggerCtx);
+        }
+    }
+
+    /**
+     * @description
+     * This method is used to migrate existing jobs to use the indexed set method of tracking jobs.
+     * When the app bootstraps, we check to see if the existing jobs in the queue have a corresponding
+     * indexed set. If not, we create the indexed set and add the jobs to it.
+     */
+    async migrateExistingJobs(): Promise<void> {
+        if (this.processContext.isServer) {
+            // We only want to perform this work on the worker.
+            return;
+        }
+        if (!this.redis || !this.queue) {
+            throw new Error('Redis and Queue must be registered before migrating jobs');
+        }
+        Logger.debug('Starting migration of existing jobs to indexed sets...', loggerCtx);
+        // Get counts of jobs in each state
+        const counts = await this.queue.getJobCounts();
+        Logger.debug(`Found job counts: ${JSON.stringify(counts)}`, loggerCtx);
+
+        let totalMigrated = 0;
+
+        // Get all jobs from each state
+        for (const state of this.allStates) {
+            if (counts[state] > 0) {
+                Logger.debug(`Processing ${counts[state]} jobs in ${state} state`, loggerCtx);
+                if (!this.queue) {
+                    Logger.error('Queue is not initialized', loggerCtx);
+                    continue;
+                }
+                try {
+                    const jobs = await this.queue.getJobs([state], 0, counts[state]);
+                    if (!jobs) {
+                        Logger.error(`getJobs returned undefined for state ${state}`, loggerCtx);
+                        continue;
+                    }
+                    Logger.debug(`Retrieved ${jobs.length} jobs for state ${state}`, loggerCtx);
+
+                    // Group jobs by queue name
+                    const jobsByQueue = new Map<string, Job[]>();
+                    for (const job of jobs) {
+                        if (!job) {
+                            Logger.error('Null job found in results', loggerCtx);
+                            continue;
+                        }
+                        if (!jobsByQueue.has(job.name)) {
+                            jobsByQueue.set(job.name, []);
+                        }
+                        jobsByQueue.get(job.name)?.push(job);
+                    }
+
+                    // Create sorted sets for each queue in this state
+                    for (const [queueName, queueJobs] of jobsByQueue) {
+                        const indexedKey = this.createSortedSetKey(queueName, state);
+                        const exists = await this.redis.exists(indexedKey);
+                        if (exists === 0) {
+                            Logger.info(
+                                `Creating indexed set for queue: ${queueName} in state: ${state}`,
+                                loggerCtx,
+                            );
+                            const pipeline = this.redis.pipeline();
+                            // Add jobs in batches
+                            for (let i = 0; i < queueJobs.length; i += this.BATCH_SIZE) {
+                                const batch = queueJobs.slice(i, i + this.BATCH_SIZE);
+                                const args = batch
+                                    .flatMap(job => [job.timestamp, job.id])
+                                    .filter((id): id is string | number => id != null);
+                                pipeline.zadd(indexedKey, ...args);
+                            }
+                            await pipeline.exec();
+                            totalMigrated += queueJobs.length;
+                        }
+                    }
+                } catch (err: unknown) {
+                    const error = err as Error;
+                    Logger.error(`Failed to migrate jobs: ${error.message}`, loggerCtx);
+                }
+            }
+        }
+
+        if (totalMigrated > 0) {
+            Logger.info(`Successfully migrated ${totalMigrated} jobs to indexed sets`, loggerCtx);
+        }
+    }
+
+    /**
+     * @description
+     * This method is used to clean up the indexed sets to remove jobs that have been removed from the queue.
+     * This is done by checking each job in the indexed set to see if it still exists in the queue. If it does not,
+     * it is removed from the indexed set.
+     */
+    async cleanupIndexedSets() {
+        if (!this.redis || !this.queue) {
+            throw new Error('Redis and Queue must be registered before cleaning up indexed sets');
+        }
+
+        // Get all queue names from our indexed sets
+        const allStateKeys = this.createSortedSetKey('*');
+        const keys = await this.redis.keys(allStateKeys);
+        const result: Array<{ queueName: string; jobsRemoved: number }> = [];
+        const startTime = Date.now();
+        Logger.verbose(`Cleaning up ${keys.length} indexed sets`, loggerCtx);
+
+        for (const key of keys) {
+            let cursor = '0';
+            let jobsRemoved = 0;
+
+            // Use ZSCAN to iterate over the set in batches
+            do {
+                const [nextCursor, elements] = await this.redis.zscan(key, cursor, 'COUNT', this.BATCH_SIZE);
+                cursor = nextCursor;
+
+                if (elements.length > 0) {
+                    // Extract job IDs from the elements (they come as [score, id] pairs)
+                    const jobIds = elements.filter((_, i) => i % 2 === 0);
+
+                    // Check existence of jobs directly in Redis
+                    const pipeline = this.redis.pipeline();
+                    for (const jobId of jobIds) {
+                        pipeline.exists(this.createQueueItemKey(jobId));
+                    }
+                    const existsResults = await pipeline.exec();
+
+                    // Filter out non-existent jobs
+                    const jobsToRemove = jobIds.filter((jobId, i) => {
+                        const exists = existsResults?.[i]?.[1] === 1;
+                        return !exists;
+                    });
+
+                    if (jobsToRemove.length > 0) {
+                        await this.redis.zrem(key, ...jobsToRemove);
+                        jobsRemoved += jobsToRemove.length;
+                    }
+                }
+            } while (cursor !== '0');
+
+            if (jobsRemoved > 0) {
+                Logger.verbose(
+                    `Cleaned up ${jobsRemoved} non-existent jobs from indexed key: ${key}`,
+                    loggerCtx,
+                );
+            }
+            result.push({ queueName: key, jobsRemoved });
+        }
+
+        const endTime = Date.now();
+        Logger.verbose(`Cleaned up ${keys.length} indexed sets in ${endTime - startTime}ms`, loggerCtx);
+        return result;
+    }
+
+    private createSortedSetKey(queueName: string, state?: string): string {
+        const prefix = getPrefix(this.options);
+        if (!this.queue) {
+            throw new Error('Queue is not initialized');
+        }
+        let key = `${prefix}:${this.queue.name}:queue:${queueName}`;
+        if (state) {
+            key += `:${state}`;
+        }
+        return key;
+    }
+
+    private createQueueItemKey(jobId: string): string {
+        const prefix = getPrefix(this.options);
+        if (!this.queue) {
+            throw new Error('Queue is not initialized');
+        }
+        return `${prefix}:${this.queue.name}:${jobId}`;
+    }
+}

+ 4 - 0
packages/job-queue-plugin/src/bullmq/plugin.ts

@@ -1,7 +1,9 @@
 import { PluginCommonModule, VendurePlugin } from '@vendure/core';
 
 import { BullMQJobQueueStrategy } from './bullmq-job-queue-strategy';
+import { cleanIndexedSetsTask } from './clean-indexed-sets-task';
 import { BULLMQ_PLUGIN_OPTIONS } from './constants';
+import { JobListIndexService } from './job-list-index.service';
 import { RedisHealthCheckStrategy } from './redis-health-check-strategy';
 import { RedisHealthIndicator } from './redis-health-indicator';
 import { RedisJobBufferStorageStrategy } from './redis-job-buffer-storage-strategy';
@@ -193,11 +195,13 @@ import { BullMQPluginOptions } from './types';
         config.jobQueueOptions.jobQueueStrategy = new BullMQJobQueueStrategy();
         config.jobQueueOptions.jobBufferStorageStrategy = new RedisJobBufferStorageStrategy();
         config.systemOptions.healthChecks.push(new RedisHealthCheckStrategy());
+        config.schedulerOptions.tasks.push(cleanIndexedSetsTask);
         return config;
     },
     providers: [
         { provide: BULLMQ_PLUGIN_OPTIONS, useFactory: () => BullMQJobQueuePlugin.options },
         RedisHealthIndicator,
+        JobListIndexService,
     ],
     compatibility: '^3.0.0',
 })

+ 107 - 77
packages/job-queue-plugin/src/bullmq/scripts/get-jobs-by-type.ts

@@ -1,115 +1,145 @@
-// language=Lua
 import { CustomScriptDefinition } from '../types';
 
+// language=Lua
 const script = `--[[
-  Get job ids per provided states and filter by name
+  Get job ids per provided states and filter by name - Optimized version using indexed structure
     Input:
       KEYS[1]    'prefix'
-      ARGV[1]    start
-      ARGV[2]    end
+      ARGV[1]    skip
+      ARGV[2]    take
       ARGV[3]    filterName
       ARGV[4...] types
 ]]
 local rcall = redis.call
 local prefix = KEYS[1]
-local rangeStart = tonumber(ARGV[1])
-local rangeEnd = tonumber(ARGV[2])
+local skip = tonumber(ARGV[1])
+local take = tonumber(ARGV[2])
 local filterName = ARGV[3]
 local results = {}
+local totalResults = 0
 
-local targetSets = {}
-
--- Initialize an empty array to hold the sets to unionize. The "completed" and "failed" lists
--- are sorted sets
-local setsToUnionize = {}
-local typesInUnion = {}
-
--- Initialize an empty array to hold lists to include. The "active" and "wait" lists are
--- regular lists
-local listsToInclude = {}
-
-
--- Iterate through ARGV starting from the first element (ARGV[1]) up to the end
-for i = 4, #ARGV do
-    local setKey = prefix .. ARGV[i]
-
-    -- Check if the setKey is valid (e.g., it exists and is a sorted set)
-    local targetExists = redis.call('EXISTS', setKey)
-    local listType = redis.call('TYPE', setKey).ok
-
-    if targetExists == 1 and listType == 'zset' then
-        -- Add the valid set to the array
-        table.insert(setsToUnionize, setKey)
-        table.insert(typesInUnion, ARGV[i])
-    end
-    if targetExists == 1 and listType == 'list' then
-        -- Add the valid set to the array
-        table.insert(listsToInclude, setKey)
-        table.insert(typesInUnion, ARGV[i])
-    end
-end
+-- redis.log(redis.LOG_NOTICE, 'Filter name: "' .. filterName .. '"')
+-- redis.log(redis.LOG_NOTICE, 'Filter name length: ' .. tostring(#filterName))
+-- redis.log(redis.LOG_NOTICE, 'Number of ARGV: ' .. tostring(#ARGV))
+-- redis.log(redis.LOG_NOTICE, 'skip: "' .. tostring(skip) .. '"')
+-- redis.log(redis.LOG_NOTICE, 'take: "' .. tostring(take) .. '"')
 
--- Define the destination key for the concatenated sorted set
-local tempSortedSetUnionKey = prefix .. 'union:' .. table.concat(typesInUnion, ':');
+-- Create a temporary key for merging results
+local tempKey = prefix .. 'temp:merge:' .. math.random(1000000)
+local sourceKeys = {}
 
-if #listsToInclude  == 0 and #setsToUnionize == 0 then
-    return {0, {}}
+-- Function to count jobs in a sorted set
+local function countJobsInSortedSet(key)
+    return rcall('ZCARD', key) or 0
 end
 
--- Check if there are valid sets to unionize
-if #setsToUnionize > 0 then
-    -- Use ZUNIONSTORE to concatenate the valid sorted sets into the destination key
-    local numSets = #setsToUnionize
-    redis.call('ZUNIONSTORE', tempSortedSetUnionKey, numSets, unpack(setsToUnionize))
+-- Function to count jobs in a list
+local function countJobsInList(key)
+    return rcall('LLEN', key) or 0
 end
 
-local originalResults = rcall("ZREVRANGE", tempSortedSetUnionKey, 0, -1)
-
-
-if #listsToInclude > 0 then
-    for _, listKey in ipairs(listsToInclude) do
-        local list = rcall("LRANGE", listKey, 0, -1)
-        for _, jobId in ipairs(list) do
-            table.insert(originalResults, jobId)
+-- First count total jobs and collect source keys
+if filterName ~= "" then
+    -- When filtering by name, we need to check each state
+    for i = 4, #ARGV do
+        local state = ARGV[i]
+        -- redis.log(redis.LOG_NOTICE, 'Processing state: "' .. state .. '"')
+        local indexedKey = prefix .. 'queue:' .. filterName .. ':' .. state
+        -- redis.log(redis.LOG_NOTICE, 'Looking for key: ' .. indexedKey)
+        local keyType = rcall('TYPE', indexedKey).ok
+        -- redis.log(redis.LOG_NOTICE, 'Key type: ' .. keyType)
+
+        if keyType == 'zset' then
+            totalResults = totalResults + countJobsInSortedSet(indexedKey)
+            table.insert(sourceKeys, indexedKey)
+        elseif keyType == 'list' then
+            totalResults = totalResults + countJobsInList(indexedKey)
+        end
+    end
+else
+    -- No filter, count all types
+    for i = 4, #ARGV do
+        local state = ARGV[i]
+        -- redis.log(redis.LOG_NOTICE, 'Processing state: "' .. state .. '"')
+        local key = prefix .. state
+        -- redis.log(redis.LOG_NOTICE, 'Looking for key: ' .. key)
+        local keyType = rcall('TYPE', key).ok
+        -- redis.log(redis.LOG_NOTICE, 'Key type: ' .. keyType)
+
+        if keyType == 'zset' then
+            totalResults = totalResults + countJobsInSortedSet(key)
+            table.insert(sourceKeys, key)
+        elseif keyType == 'list' then
+            totalResults = totalResults + countJobsInList(key)
         end
     end
 end
 
+-- If we have any sorted sets to merge, do it
+if #sourceKeys > 0 then
+    -- Calculate how many elements we need to merge
+    local neededElements = skip + take
+    -- redis.log(redis.LOG_NOTICE, 'Number of source keys: ' .. tostring(#sourceKeys))
+    -- redis.log(redis.LOG_NOTICE, 'Needed elements: ' .. tostring(neededElements))
+
+    -- Create temporary keys for each source set with limited elements
+    local limitedKeys = {}
+    for i, sourceKey in ipairs(sourceKeys) do
+        local limitedKey = tempKey .. ':limited:' .. i
+        -- redis.log(redis.LOG_NOTICE, 'Processing source key: ' .. sourceKey)
+        -- Get only the elements we need from each source set
+        local elements = rcall('ZREVRANGE', sourceKey, 0, neededElements - 1, 'WITHSCORES')
+        -- redis.log(redis.LOG_NOTICE, 'Found ' .. tostring(#elements) .. ' elements in ' .. sourceKey)
+        if #elements > 0 then
+            -- Process elements in pairs (member, score)
+            local chunkSize = 1000 -- Process in chunks of 1000 elements
+            for j = 1, #elements, chunkSize * 2 do
+                local chunkEnd = math.min(j + chunkSize * 2 - 1, #elements)
+                local chunkArgs = {}
+                for k = j, chunkEnd, 2 do
+                    local member = elements[k]
+                    local score = elements[k + 1]
+                    table.insert(chunkArgs, score)
+                    table.insert(chunkArgs, member)
+                end
+                if #chunkArgs > 0 then
+                    rcall('ZADD', limitedKey, unpack(chunkArgs))
+                end
+            end
+            table.insert(limitedKeys, limitedKey)
+            -- redis.log(redis.LOG_NOTICE, 'Added to limited key: ' .. limitedKey)
+        end
+    end
 
--- Define a custom comparison function for sorting in descending order
-local function compareDescending(a, b)
-    return tonumber(a) > tonumber(b)
-end
+    -- redis.log(redis.LOG_NOTICE, 'Number of limited keys: ' .. tostring(#limitedKeys))
 
--- Sort the table in descending order
-table.sort(originalResults, compareDescending)
+    if #limitedKeys > 0 then
+        -- Merge the limited sets
+        rcall('ZUNIONSTORE', tempKey, #limitedKeys, unpack(limitedKeys))
+        -- redis.log(redis.LOG_NOTICE, 'Merged sets into: ' .. tempKey)
 
-local filteredResults = {}
-local totalResults = 0
+        -- Get the paginated results from the merged set
+        results = rcall('ZREVRANGE', tempKey, skip, skip + take - 1)
+        -- redis.log(redis.LOG_NOTICE, 'Got ' .. tostring(#results) .. ' results from merged set')
 
-for _, job in ipairs(originalResults) do
-  local jobName = rcall("HGET", prefix .. job, "name");
-  if filterName ~= "" and jobName == filterName then
-    if rangeStart <= totalResults and #filteredResults < rangeEnd then
-      table.insert(filteredResults, job)
-    end
-    totalResults = totalResults + 1
-  elseif filterName == "" then
-    if rangeStart <= totalResults and #filteredResults < rangeEnd then
-      table.insert(filteredResults, job)
+        -- Clean up temporary limited keys
+        for _, key in ipairs(limitedKeys) do
+            rcall('DEL', key)
+        end
+    else
+        -- redis.log(redis.LOG_NOTICE, 'No elements found in any source sets')
     end
-    totalResults = totalResults + 1
-  end
 end
 
-rcall("DEL", tempSortedSetUnionKey)
+-- Clean up temporary key
+rcall('DEL', tempKey)
 
-return {totalResults, filteredResults}
+return {totalResults, results}
 `;
 
 export const getJobsByType: CustomScriptDefinition<
     [totalItems: number, jobIds: string[]],
-    [rangeStart: number, rangeEnd: number, queueName: string | undefined, ...states: string[]]
+    [skip: number, take: number, queueName: string | undefined, ...states: string[]]
 > = {
     script,
     numberOfKeys: 1,

+ 5 - 0
packages/job-queue-plugin/src/bullmq/utils.ts

@@ -0,0 +1,5 @@
+import { BullMQPluginOptions } from "./types";
+
+export function getPrefix(options: BullMQPluginOptions) {
+    return options.workerOptions?.prefix ?? 'bull';
+}