|
|
@@ -3,14 +3,17 @@ import { Cluster, Redis, RedisOptions } from 'ioredis';
|
|
|
|
|
|
import { BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
|
|
|
import { BullMQPluginOptions } from './types';
|
|
|
+import { getPrefix } from './utils';
|
|
|
|
|
|
const BUFFER_LIST_PREFIX = 'vendure-job-buffer';
|
|
|
|
|
|
export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
|
|
|
private redis: Redis | Cluster;
|
|
|
+ private prefix: string;
|
|
|
|
|
|
init(injector: Injector) {
|
|
|
const options = injector.get<BullMQPluginOptions>(BULLMQ_PLUGIN_OPTIONS);
|
|
|
+ this.prefix = `${getPrefix(options)}:`;
|
|
|
if (options.connection instanceof Redis) {
|
|
|
this.redis = options.connection;
|
|
|
} else if (options.connection instanceof Cluster) {
|
|
|
@@ -49,7 +52,7 @@ export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
|
|
|
}
|
|
|
|
|
|
private keyName(bufferId: string) {
|
|
|
- return `${BUFFER_LIST_PREFIX}:${bufferId}`;
|
|
|
+ return `${this.prefix}${BUFFER_LIST_PREFIX}:${bufferId}`;
|
|
|
}
|
|
|
|
|
|
private toJobConfigString(job: Job<any>): string {
|
|
|
@@ -81,14 +84,11 @@ export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
|
|
|
}
|
|
|
|
|
|
private async getAllBufferIds(): Promise<string[]> {
|
|
|
+ const keyPrefix = `${this.prefix}${BUFFER_LIST_PREFIX}:`;
|
|
|
const stream =
|
|
|
this.redis instanceof Redis
|
|
|
- ? this.redis.scanStream({
|
|
|
- match: `${BUFFER_LIST_PREFIX}:*`,
|
|
|
- })
|
|
|
- : this.redis.nodes()[0].scanStream({
|
|
|
- match: `${BUFFER_LIST_PREFIX}:*`,
|
|
|
- });
|
|
|
+ ? this.redis.scanStream({ match: `${keyPrefix}*` })
|
|
|
+ : this.redis.nodes()[0].scanStream({ match: `${keyPrefix}*` });
|
|
|
const keys = await new Promise<string[]>((resolve, reject) => {
|
|
|
const allKeys: string[] = [];
|
|
|
stream.on('data', _keys => allKeys.push(..._keys));
|
|
|
@@ -96,6 +96,6 @@ export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
|
|
|
stream.on('error', err => reject(err));
|
|
|
});
|
|
|
|
|
|
- return keys.map(key => key.replace(`${BUFFER_LIST_PREFIX}:`, ''));
|
|
|
+ return keys.map(key => key.replace(keyPrefix, ''));
|
|
|
}
|
|
|
}
|