|
@@ -1,12 +1,14 @@
|
|
|
import { ApolloQueryResult, NetworkStatus } from '@apollo/client/core';
|
|
import { ApolloQueryResult, NetworkStatus } from '@apollo/client/core';
|
|
|
import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
|
|
import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
|
|
|
import { Apollo, QueryRef } from 'apollo-angular';
|
|
import { Apollo, QueryRef } from 'apollo-angular';
|
|
|
-import { merge, Observable, Subject } from 'rxjs';
|
|
|
|
|
-import { distinctUntilChanged, filter, finalize, map, skip, take, takeUntil, tap } from 'rxjs/operators';
|
|
|
|
|
|
|
+import { DocumentNode } from 'graphql/index';
|
|
|
|
|
+import { merge, Observable, Subject, Subscription } from 'rxjs';
|
|
|
|
|
+import { distinctUntilChanged, filter, finalize, map, skip, take, takeUntil } from 'rxjs/operators';
|
|
|
|
|
|
|
|
-import { GetUserStatusQuery } from '../common/generated-types';
|
|
|
|
|
|
|
+import { CustomFieldConfig, GetUserStatusQuery } from '../common/generated-types';
|
|
|
|
|
|
|
|
import { GET_USER_STATUS } from './definitions/client-definitions';
|
|
import { GET_USER_STATUS } from './definitions/client-definitions';
|
|
|
|
|
+import { addCustomFields } from './utils/add-custom-fields';
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* @description
|
|
* @description
|
|
@@ -17,12 +19,42 @@ import { GET_USER_STATUS } from './definitions/client-definitions';
|
|
|
* @docsPage DataService
|
|
* @docsPage DataService
|
|
|
*/
|
|
*/
|
|
|
export class QueryResult<T, V extends Record<string, any> = Record<string, any>> {
|
|
export class QueryResult<T, V extends Record<string, any> = Record<string, any>> {
|
|
|
- constructor(private queryRef: QueryRef<T, V>, private apollo: Apollo) {
|
|
|
|
|
- this.valueChanges = queryRef.valueChanges;
|
|
|
|
|
|
|
+ constructor(
|
|
|
|
|
+ private queryRef: QueryRef<T, V>,
|
|
|
|
|
+ private apollo: Apollo,
|
|
|
|
|
+ private customFieldMap: Map<string, CustomFieldConfig[]>,
|
|
|
|
|
+ ) {
|
|
|
|
|
+ this.lastQuery = queryRef.options.query;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- completed$ = new Subject<void>();
|
|
|
|
|
- private valueChanges: Observable<ApolloQueryResult<T>>;
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * Causes any subscriptions to the QueryRef to complete, via the use
|
|
|
|
|
+ * of the `takeUntil` operator.
|
|
|
|
|
+ */
|
|
|
|
|
+ private completed$ = new Subject<void>();
|
|
|
|
|
+ /**
|
|
|
|
|
+ * The subscription to the current QueryRef.valueChanges Observable.
|
|
|
|
|
+ * This is stored so that it can be unsubscribed from when the QueryRef
|
|
|
|
|
+ * changes.
|
|
|
|
|
+ */
|
|
|
|
|
+ private valueChangesSubscription: Subscription;
|
|
|
|
|
+ /**
|
|
|
|
|
+ * This Subject is used to emit new values from the QueryRef.valueChanges Observable.
|
|
|
|
|
+ * We use this rather than directly subscribing to the QueryRef.valueChanges Observable
|
|
|
|
|
+ * so that we are able to change the QueryRef and re-subscribe when necessary.
|
|
|
|
|
+ */
|
|
|
|
|
+ private valueChangeSubject = new Subject<ApolloQueryResult<T>>();
|
|
|
|
|
+ /**
|
|
|
|
|
+ * We keep track of the QueryRefs which have been subscribed to so that we can avoid
|
|
|
|
|
+ * re-subscribing to the same QueryRef multiple times.
|
|
|
|
|
+ */
|
|
|
|
|
+ private queryRefSubscribed = new WeakMap<QueryRef<T, V>, boolean>();
|
|
|
|
|
+ /**
|
|
|
|
|
+ * We store a reference to the last query so that we can compare it with the next query
|
|
|
|
|
+ * and avoid re-fetching the same query multiple times. This is applicable to the code
|
|
|
|
|
+ * paths that actually change the query, i.e. refetchOnCustomFieldsChange().
|
|
|
|
|
+ */
|
|
|
|
|
+ private lastQuery: DocumentNode;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* @description
|
|
* @description
|
|
@@ -47,17 +79,44 @@ export class QueryResult<T, V extends Record<string, any> = Record<string, any>>
|
|
|
takeUntil(this.completed$),
|
|
takeUntil(this.completed$),
|
|
|
);
|
|
);
|
|
|
|
|
|
|
|
- this.valueChanges = merge(activeChannelId$, this.queryRef.valueChanges).pipe(
|
|
|
|
|
- tap(val => {
|
|
|
|
|
|
|
+ merge(activeChannelId$, this.valueChangeSubject)
|
|
|
|
|
+ .pipe(takeUntil(loggedOut$), takeUntil(this.completed$))
|
|
|
|
|
+ .subscribe(val => {
|
|
|
if (typeof val === 'string') {
|
|
if (typeof val === 'string') {
|
|
|
new Promise(resolve => setTimeout(resolve, 50)).then(() => this.queryRef.refetch());
|
|
new Promise(resolve => setTimeout(resolve, 50)).then(() => this.queryRef.refetch());
|
|
|
}
|
|
}
|
|
|
- }),
|
|
|
|
|
- filter<any>(val => typeof val !== 'string'),
|
|
|
|
|
- takeUntil(loggedOut$),
|
|
|
|
|
- takeUntil(this.completed$),
|
|
|
|
|
- );
|
|
|
|
|
- this.queryRef.valueChanges = this.valueChanges;
|
|
|
|
|
|
|
+ });
|
|
|
|
|
+ return this;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * @description
|
|
|
|
|
+ * Re-fetch this query whenever the custom fields change, updating the query to include the
|
|
|
|
|
+ * specified custom fields.
|
|
|
|
|
+ *
|
|
|
|
|
+ * @since 3.0.4
|
|
|
|
|
+ */
|
|
|
|
|
+ refetchOnCustomFieldsChange(customFieldsToInclude$: Observable<string[]>): QueryResult<T, V> {
|
|
|
|
|
+ customFieldsToInclude$
|
|
|
|
|
+ .pipe(
|
|
|
|
|
+ filter(customFields => {
|
|
|
|
|
+ const newQuery = addCustomFields(this.lastQuery, this.customFieldMap, customFields);
|
|
|
|
|
+ const hasChanged = JSON.stringify(newQuery) !== JSON.stringify(this.lastQuery);
|
|
|
|
|
+ return hasChanged;
|
|
|
|
|
+ }),
|
|
|
|
|
+ takeUntil(this.completed$),
|
|
|
|
|
+ )
|
|
|
|
|
+ .subscribe(customFields => {
|
|
|
|
|
+ const newQuery = addCustomFields(this.lastQuery, this.customFieldMap, customFields);
|
|
|
|
|
+ this.lastQuery = newQuery;
|
|
|
|
|
+ const queryRef = this.apollo.watchQuery<T, V>({
|
|
|
|
|
+ query: newQuery,
|
|
|
|
|
+ variables: this.queryRef.variables,
|
|
|
|
|
+ fetchPolicy: this.queryRef.options.fetchPolicy,
|
|
|
|
|
+ });
|
|
|
|
|
+ this.queryRef = queryRef;
|
|
|
|
|
+ this.subscribeToQueryRef(queryRef);
|
|
|
|
|
+ });
|
|
|
return this;
|
|
return this;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -66,7 +125,7 @@ export class QueryResult<T, V extends Record<string, any> = Record<string, any>>
|
|
|
* Returns an Observable which emits a single result and then completes.
|
|
* Returns an Observable which emits a single result and then completes.
|
|
|
*/
|
|
*/
|
|
|
get single$(): Observable<T> {
|
|
get single$(): Observable<T> {
|
|
|
- return this.valueChanges.pipe(
|
|
|
|
|
|
|
+ return this.currentQueryRefValueChanges.pipe(
|
|
|
filter(result => result.networkStatus === NetworkStatus.ready),
|
|
filter(result => result.networkStatus === NetworkStatus.ready),
|
|
|
take(1),
|
|
take(1),
|
|
|
map(result => result.data),
|
|
map(result => result.data),
|
|
@@ -82,7 +141,7 @@ export class QueryResult<T, V extends Record<string, any> = Record<string, any>>
|
|
|
* Returns an Observable which emits until unsubscribed.
|
|
* Returns an Observable which emits until unsubscribed.
|
|
|
*/
|
|
*/
|
|
|
get stream$(): Observable<T> {
|
|
get stream$(): Observable<T> {
|
|
|
- return this.valueChanges.pipe(
|
|
|
|
|
|
|
+ return this.currentQueryRefValueChanges.pipe(
|
|
|
filter(result => result.networkStatus === NetworkStatus.ready),
|
|
filter(result => result.networkStatus === NetworkStatus.ready),
|
|
|
map(result => result.data),
|
|
map(result => result.data),
|
|
|
finalize(() => {
|
|
finalize(() => {
|
|
@@ -111,4 +170,48 @@ export class QueryResult<T, V extends Record<string, any> = Record<string, any>>
|
|
|
mapStream<R>(mapFn: (item: T) => R): Observable<R> {
|
|
mapStream<R>(mapFn: (item: T) => R): Observable<R> {
|
|
|
return this.stream$.pipe(map(mapFn));
|
|
return this.stream$.pipe(map(mapFn));
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * @description
|
|
|
|
|
+ * Signals to the internal Observable subscriptions that they should complete.
|
|
|
|
|
+ */
|
|
|
|
|
+ destroy() {
|
|
|
|
|
+ this.completed$.next();
|
|
|
|
|
+ this.completed$.complete();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * @description
|
|
|
|
|
+ * Returns an Observable which emits the current value of the QueryRef.valueChanges Observable.
|
|
|
|
|
+ *
|
|
|
|
|
+ * We wrap the valueChanges Observable in a new Observable so that we can have a lazy
|
|
|
|
|
+ * evaluation of the valueChanges Observable. That is, we only fire the HTTP request when
|
|
|
|
|
+ * the returned Observable is subscribed to.
|
|
|
|
|
+ */
|
|
|
|
|
+ private get currentQueryRefValueChanges(): Observable<ApolloQueryResult<T>> {
|
|
|
|
|
+ return new Observable(subscriber => {
|
|
|
|
|
+ if (!this.queryRefSubscribed.get(this.queryRef)) {
|
|
|
|
|
+ this.subscribeToQueryRef(this.queryRef);
|
|
|
|
|
+ this.queryRefSubscribed.set(this.queryRef, true);
|
|
|
|
|
+ }
|
|
|
|
|
+ this.valueChangeSubject.subscribe(subscriber);
|
|
|
|
|
+ return () => {
|
|
|
|
|
+ this.queryRefSubscribed.delete(this.queryRef);
|
|
|
|
|
+ };
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * @description
|
|
|
|
|
+ * Subscribes to the valueChanges Observable of the given QueryRef, and stores the subscription
|
|
|
|
|
+ * so that it can be unsubscribed from when the QueryRef changes.
|
|
|
|
|
+ */
|
|
|
|
|
+ private subscribeToQueryRef(queryRef: QueryRef<T, V>) {
|
|
|
|
|
+ if (this.valueChangesSubscription) {
|
|
|
|
|
+ this.valueChangesSubscription.unsubscribe();
|
|
|
|
|
+ }
|
|
|
|
|
+ this.valueChangesSubscription = queryRef.valueChanges
|
|
|
|
|
+ .pipe(takeUntil(this.completed$))
|
|
|
|
|
+ .subscribe(this.valueChangeSubject);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|