Browse Source

feat(job-queue-plugin): Implement Redis-based job buffering

Michael Bromley 4 years ago
parent
commit
c7b91c34b5

+ 2 - 0
packages/job-queue-plugin/src/bullmq/plugin.ts

@@ -3,6 +3,7 @@ import { HealthCheckRegistryService, PluginCommonModule, VendurePlugin } from '@
 import { BullMQJobQueueStrategy } from './bullmq-job-queue-strategy';
 import { BullMQJobQueueStrategy } from './bullmq-job-queue-strategy';
 import { BULLMQ_PLUGIN_OPTIONS } from './constants';
 import { BULLMQ_PLUGIN_OPTIONS } from './constants';
 import { RedisHealthIndicator } from './redis-health-indicator';
 import { RedisHealthIndicator } from './redis-health-indicator';
+import { RedisJobBufferStorageStrategy } from './redis-job-buffer-storage-strategy';
 import { BullMQPluginOptions } from './types';
 import { BullMQPluginOptions } from './types';
 
 
 /**
 /**
@@ -103,6 +104,7 @@ import { BullMQPluginOptions } from './types';
     imports: [PluginCommonModule],
     imports: [PluginCommonModule],
     configuration: config => {
     configuration: config => {
         config.jobQueueOptions.jobQueueStrategy = new BullMQJobQueueStrategy();
         config.jobQueueOptions.jobQueueStrategy = new BullMQJobQueueStrategy();
+        config.jobQueueOptions.jobBufferStorageStrategy = new RedisJobBufferStorageStrategy();
         return config;
         return config;
     },
     },
     providers: [
     providers: [

+ 45 - 6
packages/job-queue-plugin/src/bullmq/redis-job-buffer-storage-strategy.ts

@@ -1,9 +1,11 @@
-import { Injector, Job, JobBufferStorageStrategy } from '@vendure/core';
+import { Injector, Job, JobBufferStorageStrategy, JobConfig, Logger } from '@vendure/core';
 import Redis, { Cluster, RedisOptions } from 'ioredis';
 import Redis, { Cluster, RedisOptions } from 'ioredis';
 
 
-import { BULLMQ_PLUGIN_OPTIONS } from './constants';
+import { BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
 import { BullMQPluginOptions } from './types';
 import { BullMQPluginOptions } from './types';
 
 
+const BUFFER_LIST_PREFIX = 'vendure-job-buffer';
+
 export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
 export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
     private redis: Redis.Redis | Redis.Cluster;
     private redis: Redis.Redis | Redis.Cluster;
 
 
@@ -18,15 +20,52 @@ export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
         }
         }
     }
     }
 
 
-    async add(processorId: string, job: Job<any>): Promise<Job<any>> {
+    async add(bufferId: string, job: Job<any>): Promise<Job<any>> {
+        const result = await this.redis.lpush(this.keyName(bufferId), this.toJobConfigString(job));
         return job;
         return job;
     }
     }
 
 
     async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
     async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
-        throw new Error('Method not implemented.');
+        const result: { [bufferId: string]: number } = {};
+        for (const id of bufferIds || []) {
+            const key = this.keyName(id);
+            const count = await this.redis.llen(key);
+            result[id] = count;
+        }
+        return result;
+    }
+
+    async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }> {
+        const result: { [bufferId: string]: Job[] } = {};
+        for (const id of bufferIds || []) {
+            const key = this.keyName(id);
+            const items = await this.redis.lrange(key, 0, -1);
+            await this.redis.del(key);
+            result[id] = items.map(item => this.toJob(item));
+        }
+        return result;
     }
     }
 
 
-    async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Array<Job<any>> }> {
-        throw new Error('Method not implemented.');
+    private keyName(bufferId: string) {
+        return `${BUFFER_LIST_PREFIX}:${bufferId}`;
+    }
+
+    private toJobConfigString(job: Job<any>): string {
+        const jobConfig: JobConfig<any> = {
+            ...job,
+            data: job.data,
+            id: job.id ?? undefined,
+        };
+        return JSON.stringify(jobConfig);
+    }
+
+    private toJob(jobConfigString: string): Job {
+        try {
+            const jobConfig: JobConfig<any> = JSON.parse(jobConfigString);
+            return new Job(jobConfig);
+        } catch (e) {
+            Logger.error(`Could not parse buffered job:\n${e.message}`, loggerCtx, e.stack);
+            throw e;
+        }
     }
     }
 }
 }