|
|
@@ -24,6 +24,28 @@ export type JobUpdate<T extends JobData<T>> = Pick<
|
|
|
'id' | 'state' | 'progress' | 'result' | 'error' | 'data'
|
|
|
>;
|
|
|
|
|
|
+/**
|
|
|
+ * @description
|
|
|
+ * Job update options, that you can specify by calling {@link SubscribableJob.updates updates()} method.
|
|
|
+ *
|
|
|
+ * @docsCategory JobQueue
|
|
|
+ * @docsPage types
|
|
|
+ */
|
|
|
+export type JobUpdateOptions = {
|
|
|
+ /**
|
|
|
+ * Polling interval. Defaults to 200ms
|
|
|
+ */
|
|
|
+ pollInterval?: number;
|
|
|
+ /**
|
|
|
+ * Polling timeout in milliseconds. Defaults to 1 hour
|
|
|
+ */
|
|
|
+ timeoutMs?: number;
|
|
|
+ /**
|
|
|
+ * Observable sequence will end with an error if true. Default to false
|
|
|
+ */
|
|
|
+ errorOnFail?: boolean;
|
|
|
+};
|
|
|
+
|
|
|
/**
|
|
|
* @description
|
|
|
* This is a type of Job object that allows you to subscribe to updates to the Job. It is returned
|
|
|
@@ -52,10 +74,9 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
|
|
|
* Returns an Observable stream of updates to the Job. Works by polling the current JobQueueStrategy's `findOne()` method
|
|
|
* to obtain updates. If this updates are not subscribed to, then no polling occurs.
|
|
|
*
|
|
|
- * The polling interval defaults to 200ms, but can be configured by passing in an options argument. Polling will also timeout
|
|
|
- * after 1 hour, but this timeout can also be configured by passing the `timeoutMs` option.
|
|
|
+ * Polling interval, timeout and other options may be configured with an options arguments {@link JobUpdateOptions}.
|
|
|
*/
|
|
|
- updates(options?: { pollInterval?: number; timeoutMs?: number }): Observable<JobUpdate<T>> {
|
|
|
+ updates(options?: JobUpdateOptions): Observable<JobUpdate<T>> {
|
|
|
const pollInterval = Math.max(50, options?.pollInterval ?? 200);
|
|
|
const timeoutMs = Math.max(pollInterval, options?.timeoutMs ?? ms('1h'));
|
|
|
const strategy = this.jobQueueStrategy;
|
|
|
@@ -90,7 +111,7 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
|
|
|
true,
|
|
|
),
|
|
|
tap(job => {
|
|
|
- if (job.state === JobState.FAILED) {
|
|
|
+ if (job.state === JobState.FAILED && (options?.errorOnFail ?? true)) {
|
|
|
throw new Error(job.error);
|
|
|
}
|
|
|
}),
|