|
|
@@ -1,6 +1,6 @@
|
|
|
import { AssetType } from '@vendure/common/lib/generated-types';
|
|
|
import { ID } from '@vendure/common/lib/shared-types';
|
|
|
-import { Observable } from 'rxjs';
|
|
|
+import { Observable, Observer } from 'rxjs';
|
|
|
|
|
|
/**
|
|
|
* Takes a predicate function and returns a negated version.
|
|
|
@@ -75,3 +75,40 @@ export async function awaitPromiseOrObservable<T>(value: T | Promise<T> | Observ
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
|
+
|
|
|
+/**
|
|
|
+ * @description
|
|
|
+ * Returns an observable which executes the given async work function and completes with
|
|
|
+ * the returned value. This is useful in methods which need to return
|
|
|
+ * an Observable but also want to work with async (Promise-returning) code.
|
|
|
+ *
|
|
|
+ * @example
|
|
|
+ * ```TypeScript
|
|
|
+ * \@Controller()
|
|
|
+ * export class MyWorkerController {
|
|
|
+ *
|
|
|
+ * \@MessagePattern('test')
|
|
|
+ * handleTest() {
|
|
|
+ * return asyncObservable(async observer => {
|
|
|
+ * const value = await this.connection.fetchSomething();
|
|
|
+ * return value;
|
|
|
+ * });
|
|
|
+ * }
|
|
|
+ * }
|
|
|
+ * ```
|
|
|
+ */
|
|
|
+export function asyncObservable<T>(work: (observer: Observer<T>) => Promise<T | void>): Observable<T> {
|
|
|
+ return new Observable<T>(subscriber => {
|
|
|
+ (async () => {
|
|
|
+ try {
|
|
|
+ const result = await work(subscriber);
|
|
|
+ if (result) {
|
|
|
+ subscriber.next(result);
|
|
|
+ }
|
|
|
+ subscriber.complete();
|
|
|
+ } catch (e) {
|
|
|
+ subscriber.error(e);
|
|
|
+ }
|
|
|
+ })();
|
|
|
+ });
|
|
|
+}
|