Просмотр исходного кода

fix(core): Use RequestContext where available in all DB operations (#1639)

* fix(core): Use RequestContext if possible anywhere
* fix(core): Expose refresh arguments for self-refreshing cache. Pass ctx to channel and zone cache refreshing mechanism.

This fix handles a situation where the server acquires too many new DB connections, causing it to reach the limit of the connection pool and hang.
Alexander Shitikov 3 лет назад
Родитель
Сommit
a683ef5429
30 измененных файлов с 172 добавлено и 123 удалено
  1. 2 2
      packages/core/src/api/resolvers/admin/channel.resolver.ts
  2. 2 2
      packages/core/src/common/self-refreshing-cache.spec.ts
  3. 7 2
      packages/core/src/common/self-refreshing-cache.ts
  4. 1 1
      packages/core/src/config/promotion/actions/facet-values-percentage-discount-action.ts
  5. 1 1
      packages/core/src/config/promotion/conditions/has-facet-values-condition.ts
  6. 3 2
      packages/core/src/config/promotion/utils/facet-value-checker.ts
  7. 5 5
      packages/core/src/data-import/providers/importer/fast-importer.service.ts
  8. 1 0
      packages/core/src/data-import/providers/importer/importer.ts
  9. 1 1
      packages/core/src/data-import/providers/populator/populator.ts
  10. 5 2
      packages/core/src/plugin/default-job-queue-plugin/sql-job-buffer-storage-strategy.ts
  11. 28 24
      packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts
  12. 5 5
      packages/core/src/plugin/default-search-plugin/search-strategy/mysql-search-strategy.ts
  13. 4 4
      packages/core/src/plugin/default-search-plugin/search-strategy/postgres-search-strategy.ts
  14. 4 4
      packages/core/src/plugin/default-search-plugin/search-strategy/sqlite-search-strategy.ts
  15. 1 1
      packages/core/src/service/helpers/external-authentication/external-authentication.service.ts
  16. 1 1
      packages/core/src/service/helpers/list-query-builder/list-query-builder.ts
  17. 1 1
      packages/core/src/service/initializer.service.ts
  18. 8 7
      packages/core/src/service/services/administrator.service.ts
  19. 19 13
      packages/core/src/service/services/channel.service.ts
  20. 8 4
      packages/core/src/service/services/collection.service.ts
  21. 1 1
      packages/core/src/service/services/customer.service.ts
  22. 12 4
      packages/core/src/service/services/facet-value.service.ts
  23. 22 10
      packages/core/src/service/services/facet.service.ts
  24. 3 3
      packages/core/src/service/services/global-settings.service.ts
  25. 2 2
      packages/core/src/service/services/product-variant.service.ts
  26. 2 2
      packages/core/src/service/services/promotion.service.ts
  27. 13 9
      packages/core/src/service/services/role.service.ts
  28. 7 7
      packages/core/src/service/services/session.service.ts
  29. 2 2
      packages/core/src/service/services/shipping-method.service.ts
  30. 1 1
      packages/core/src/service/services/user.service.ts

+ 2 - 2
packages/core/src/api/resolvers/admin/channel.resolver.ts

@@ -52,8 +52,8 @@ export class ChannelResolver {
         if (isGraphQlErrorResult(result)) {
             return result;
         }
-        const superAdminRole = await this.roleService.getSuperAdminRole();
-        const customerRole = await this.roleService.getCustomerRole();
+        const superAdminRole = await this.roleService.getSuperAdminRole(ctx);
+        const customerRole = await this.roleService.getCustomerRole(ctx);
         await this.roleService.assignRoleToChannel(ctx, superAdminRole.id, result.id);
         await this.roleService.assignRoleToChannel(ctx, customerRole.id, result.id);
         return result;

+ 2 - 2
packages/core/src/common/self-refreshing-cache.spec.ts

@@ -36,8 +36,8 @@ describe('SelfRefreshingCache', () => {
 
     it('automatically refresh after ttl expires', async () => {
         currentTime = 1001;
-        const result = await testCache.value();
-        expect(result).toBe(7);
+        const result = await testCache.value('custom');
+        expect(result).toBe(6);
         expect(fetchFn.mock.calls.length).toBe(2);
     });
 

+ 7 - 2
packages/core/src/common/self-refreshing-cache.ts

@@ -13,6 +13,7 @@ export interface SelfRefreshingCache<V, RefreshArgs extends any[] = []> {
      * the fresh value will be returned.
      */
     value(): Promise<V>;
+    value(...refreshArgs: RefreshArgs | [undefined]): Promise<V>;
 
     /**
      * @description
@@ -40,6 +41,9 @@ export interface SelfRefreshingCacheConfig<V, RefreshArgs extends any[]> {
     ttl: number;
     refresh: {
         fn: (...args: RefreshArgs) => Promise<V>;
+        /**
+         * Default arguments, passed to refresh function
+         */
         defaultArgs: RefreshArgs;
     };
     /**
@@ -61,10 +65,11 @@ export interface SelfRefreshingCacheConfig<V, RefreshArgs extends any[]> {
  */
 export async function createSelfRefreshingCache<V, RefreshArgs extends any[]>(
     config: SelfRefreshingCacheConfig<V, RefreshArgs>,
+    refreshArgs?: RefreshArgs
 ): Promise<SelfRefreshingCache<V, RefreshArgs>> {
     const { ttl, name, refresh, getTimeFn } = config;
     const getTimeNow = getTimeFn ?? (() => new Date().getTime());
-    const initialValue = await refresh.fn(...refresh.defaultArgs);
+    const initialValue = await refresh.fn(...(refreshArgs ?? refresh.defaultArgs));
     let value = initialValue;
     let expires = getTimeNow() + ttl;
     const memoCache = new Map<string, { expires: number; value: any }>();
@@ -114,7 +119,7 @@ export async function createSelfRefreshingCache<V, RefreshArgs extends any[]>(
         return result;
     };
     return {
-        value: getValue,
+        value: (...args) => getValue(!args.length || args.length === 1 && args[0] === undefined ? undefined : args as RefreshArgs),
         refresh: (...args) => refreshValue(true, args),
         memoize,
     };

+ 1 - 1
packages/core/src/config/promotion/actions/facet-values-percentage-discount-action.ts

@@ -26,7 +26,7 @@ export const discountOnItemWithFacets = new PromotionItemAction({
         facetValueChecker = new FacetValueChecker(injector.get(TransactionalConnection));
     },
     async execute(ctx, orderItem, orderLine, args) {
-        if (await facetValueChecker.hasFacetValues(orderLine, args.facets)) {
+        if (await facetValueChecker.hasFacetValues(orderLine, args.facets, ctx)) {
             const unitPrice = ctx.channel.pricesIncludeTax ? orderLine.unitPriceWithTax : orderLine.unitPrice;
             return -unitPrice * (args.discount / 100);
         }

+ 1 - 1
packages/core/src/config/promotion/conditions/has-facet-values-condition.ts

@@ -22,7 +22,7 @@ export const hasFacetValues = new PromotionCondition({
     async check(ctx, order, args) {
         let matches = 0;
         for (const line of order.lines) {
-            if (await facetValueChecker.hasFacetValues(line, args.facets)) {
+            if (await facetValueChecker.hasFacetValues(line, args.facets, ctx)) {
                 matches += line.quantity;
             }
         }

+ 3 - 2
packages/core/src/config/promotion/utils/facet-value-checker.ts

@@ -1,5 +1,6 @@
 import { ID } from '@vendure/common/lib/shared-types';
 import { unique } from '@vendure/common/lib/unique';
+import { RequestContext } from '../../../api';
 
 import { TtlCache } from '../../../common/ttl-cache';
 import { idsAreEqual } from '../../../common/utils';
@@ -55,11 +56,11 @@ export class FacetValueChecker {
      * `true` if the associated {@link ProductVariant} & {@link Product} together
      * have *all* the specified {@link FacetValue}s.
      */
-    async hasFacetValues(orderLine: OrderLine, facetValueIds: ID[]): Promise<boolean> {
+    async hasFacetValues(orderLine: OrderLine, facetValueIds: ID[], ctx?: RequestContext): Promise<boolean> {
         let variant = this.variantCache.get(orderLine.productVariant.id);
         if (!variant) {
             variant = await this.connection
-                .getRepository(ProductVariant)
+                .getRepository(ctx, ProductVariant)
                 .findOne(orderLine.productVariant.id, {
                     relations: ['product', 'product.facetValues', 'facetValues'],
                 });

+ 5 - 5
packages/core/src/data-import/providers/importer/fast-importer.service.ts

@@ -66,7 +66,7 @@ export class FastImporterService {
                   channelOrToken: channel,
               })
             : RequestContext.empty();
-        this.defaultChannel = await this.channelService.getDefaultChannel();
+        this.defaultChannel = await this.channelService.getDefaultChannel(this.importCtx);
     }
 
     async createProduct(input: CreateProductInput): Promise<ID> {
@@ -95,7 +95,7 @@ export class FastImporterService {
                         position: i,
                     }),
             );
-            await this.connection.getRepository(ProductAsset).save(productAssets, { reload: false });
+            await this.connection.getRepository(this.importCtx, ProductAsset).save(productAssets, { reload: false });
         }
         return product.id;
     }
@@ -126,7 +126,7 @@ export class FastImporterService {
     async addOptionGroupToProduct(productId: ID, optionGroupId: ID) {
         this.ensureInitialized();
         await this.connection
-            .getRepository(Product)
+            .getRepository(this.importCtx, Product)
             .createQueryBuilder()
             .relation('optionGroups')
             .of(productId)
@@ -177,7 +177,7 @@ export class FastImporterService {
                         position: i,
                     }),
             );
-            await this.connection.getRepository(ProductVariantAsset).save(variantAssets, { reload: false });
+            await this.connection.getRepository(this.importCtx, ProductVariantAsset).save(variantAssets, { reload: false });
         }
         if (input.stockOnHand != null && input.stockOnHand !== 0) {
             await this.stockMovementService.adjustProductVariantStock(
@@ -194,7 +194,7 @@ export class FastImporterService {
                 channelId,
             });
             variantPrice.variant = createdVariant;
-            await this.connection.getRepository(ProductVariantPrice).save(variantPrice, { reload: false });
+            await this.connection.getRepository(this.importCtx, ProductVariantPrice).save(variantPrice, { reload: false });
         }
 
         return createdVariant.id;

+ 1 - 0
packages/core/src/data-import/providers/importer/importer.ts

@@ -314,6 +314,7 @@ export class Importer {
                 facetEntity = cachedFacet;
             } else {
                 const existing = await this.facetService.findByCode(
+                    ctx,
                     normalizeString(facetName, '-'),
                     languageCode,
                 );

+ 1 - 1
packages/core/src/data-import/providers/populator/populator.ts

@@ -110,7 +110,7 @@ export class Populator {
     async populateCollections(data: InitialData, channel?: Channel) {
         const ctx = await this.createRequestContext(data, channel);
 
-        const allFacetValues = await this.facetValueService.findAll(ctx.languageCode);
+        const allFacetValues = await this.facetValueService.findAll(ctx, ctx.languageCode);
         const collectionMap = new Map<string, Collection>();
         for (const collectionDef of data.collections) {
             const parent = collectionDef.parentName && collectionMap.get(collectionDef.parentName);

+ 5 - 2
packages/core/src/plugin/default-job-queue-plugin/sql-job-buffer-storage-strategy.ts

@@ -18,7 +18,7 @@ export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
     }
 
     async add(bufferId: string, job: Job): Promise<Job> {
-        await this.connection.getRepository(JobRecordBuffer).save(
+        await this.connection.rawConnection.getRepository(JobRecordBuffer).save(
             new JobRecordBuffer({
                 bufferId,
                 job: this.toJobConfig(job),
@@ -30,6 +30,7 @@ export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
 
     async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
         const qb = await this.connection
+            .rawConnection
             .getRepository(JobRecordBuffer)
             .createQueryBuilder('record')
             .select(`COUNT(*)`, 'count')
@@ -49,7 +50,9 @@ export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
     }
 
     async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }> {
-        const selectQb = this.connection.getRepository(JobRecordBuffer).createQueryBuilder('record');
+        const selectQb = this.connection.rawConnection
+            .getRepository(JobRecordBuffer)
+            .createQueryBuilder('record');
         if (bufferIds?.length) {
             selectQb.where(`record.bufferId IN (:...bufferIds)`, { bufferIds });
         }

+ 28 - 24
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -65,13 +65,13 @@ export class IndexerController {
         const ctx = MutableRequestContext.deserialize(rawContext);
         return asyncObservable(async observer => {
             const timeStart = Date.now();
-            const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
+            const qb = this.getSearchIndexQueryBuilder(ctx, ctx.channelId);
             const count = await qb.getCount();
             Logger.verbose(`Reindexing ${count} variants for channel ${ctx.channel.code}`, workerLoggerCtx);
             const batches = Math.ceil(count / BATCH_SIZE);
 
             await this.connection
-                .getRepository(SearchIndexItem)
+                .getRepository(ctx, SearchIndexItem)
                 .delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
             Logger.verbose('Deleted existing index items', workerLoggerCtx);
 
@@ -116,7 +116,7 @@ export class IndexerController {
                     const end = begin + BATCH_SIZE;
                     Logger.verbose(`Updating ids from index ${begin} to ${end}`);
                     const batchIds = ids.slice(begin, end);
-                    const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
+                    const batch = await this.connection.getRepository(ctx, ProductVariant).findByIds(batchIds, {
                         relations: variantRelations,
                         where: { deletedAt: null },
                     });
@@ -154,7 +154,7 @@ export class IndexerController {
 
     async deleteVariant(data: UpdateVariantMessageData): Promise<boolean> {
         const ctx = MutableRequestContext.deserialize(data.ctx);
-        const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
+        const variants = await this.connection.getRepository(ctx, ProductVariant).findByIds(data.variantIds);
         if (variants.length) {
             const languageVariants = unique([
                 ...variants
@@ -162,6 +162,7 @@ export class IndexerController {
                     .map(t => t.languageCode),
             ]);
             await this.removeSearchIndexItems(
+                ctx,
                 ctx.channelId,
                 variants.map(v => v.id),
                 languageVariants,
@@ -187,14 +188,15 @@ export class IndexerController {
 
     async removeVariantFromChannel(data: VariantChannelMessageData): Promise<boolean> {
         const ctx = MutableRequestContext.deserialize(data.ctx);
-        const variant = await this.connection.getRepository(ProductVariant).findOne(data.productVariantId);
+        const variant = await this.connection.getRepository(ctx, ProductVariant).findOne(data.productVariantId);
         const languageVariants = variant?.translations.map(t => t.languageCode) ?? [];
-        await this.removeSearchIndexItems(data.channelId, [data.productVariantId], languageVariants);
+        await this.removeSearchIndexItems(ctx, data.channelId, [data.productVariantId], languageVariants);
         return true;
     }
 
     async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
         const id = data.asset.id;
+        const ctx = MutableRequestContext.deserialize(data.ctx);
 
         function getFocalPoint(point?: { x: number; y: number }) {
             return point && point.x && point.y ? point : null;
@@ -202,21 +204,23 @@ export class IndexerController {
 
         const focalPoint = getFocalPoint(data.asset.focalPoint);
         await this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .update({ productAssetId: id }, { productPreviewFocalPoint: focalPoint });
         await this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .update({ productVariantAssetId: id }, { productVariantPreviewFocalPoint: focalPoint });
         return true;
     }
 
     async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
         const id = data.asset.id;
+        const ctx = MutableRequestContext.deserialize(data.ctx);
+
         await this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .update({ productAssetId: id }, { productAssetId: null });
         await this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .update({ productVariantAssetId: id }, { productVariantAssetId: null });
         return true;
     }
@@ -226,11 +230,11 @@ export class IndexerController {
         productId: ID,
         channelId: ID,
     ): Promise<boolean> {
-        const product = await this.connection.getRepository(Product).findOne(productId, {
+        const product = await this.connection.getRepository(ctx, Product).findOne(productId, {
             relations: ['variants'],
         });
         if (product) {
-            const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(
+            const updatedVariants = await this.connection.getRepository(ctx, ProductVariant).findByIds(
                 product.variants.map(v => v.id),
                 {
                     relations: variantRelations,
@@ -260,7 +264,7 @@ export class IndexerController {
         variantIds: ID[],
         channelId: ID,
     ): Promise<boolean> {
-        const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
+        const variants = await this.connection.getRepository(ctx, ProductVariant).findByIds(variantIds, {
             relations: variantRelations,
             where: { deletedAt: null },
         });
@@ -276,7 +280,7 @@ export class IndexerController {
         productId: ID,
         channelId: ID,
     ): Promise<boolean> {
-        const product = await this.connection.getRepository(Product).findOne(productId, {
+        const product = await this.connection.getRepository(ctx, Product).findOne(productId, {
             relations: ['variants'],
         });
         if (product) {
@@ -289,14 +293,14 @@ export class IndexerController {
 
             const removedVariantIds = product.variants.map(v => v.id);
             if (removedVariantIds.length) {
-                await this.removeSearchIndexItems(channelId, removedVariantIds, languageVariants);
+                await this.removeSearchIndexItems(ctx, channelId, removedVariantIds, languageVariants);
             }
         }
         return true;
     }
 
-    private getSearchIndexQueryBuilder(channelId: ID) {
-        const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
+    private getSearchIndexQueryBuilder(ctx: RequestContext, channelId: ID) {
+        const qb = this.connection.getRepository(ctx, ProductVariant).createQueryBuilder('variants');
         FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
             relations: variantRelations,
         });
@@ -316,7 +320,7 @@ export class IndexerController {
     private async saveVariants(ctx: MutableRequestContext, variants: ProductVariant[]) {
         const items: SearchIndexItem[] = [];
 
-        await this.removeSyntheticVariants(variants);
+        await this.removeSyntheticVariants(ctx, variants);
         const productMap = new Map<ID, Product>();
 
         for (const variant of variants) {
@@ -403,7 +407,7 @@ export class IndexerController {
         }
 
         await this.queue.push(() =>
-            this.connection.getRepository(SearchIndexItem).save(items, { chunk: 2500 }),
+            this.connection.getRepository(ctx, SearchIndexItem).save(items, { chunk: 2500 }),
         );
     }
 
@@ -438,17 +442,17 @@ export class IndexerController {
             collectionIds: [],
             collectionSlugs: [],
         });
-        await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(item));
+        await this.queue.push(() => this.connection.getRepository(ctx, SearchIndexItem).save(item));
     }
 
     /**
      * Removes any synthetic variants for the given product
      */
-    private async removeSyntheticVariants(variants: ProductVariant[]) {
+    private async removeSyntheticVariants(ctx: RequestContext, variants: ProductVariant[]) {
         const prodIds = unique(variants.map(v => v.productId));
         for (const productId of prodIds) {
             await this.queue.push(() =>
-                this.connection.getRepository(SearchIndexItem).delete({
+                this.connection.getRepository(ctx, SearchIndexItem).delete({
                     productId,
                     sku: '',
                     price: 0,
@@ -483,7 +487,7 @@ export class IndexerController {
     /**
      * Remove items from the search index
      */
-    private async removeSearchIndexItems(channelId: ID, variantIds: ID[], languageCodes: LanguageCode[]) {
+    private async removeSearchIndexItems(ctx: RequestContext, channelId: ID, variantIds: ID[], languageCodes: LanguageCode[]) {
         const keys: Array<Partial<SearchIndexItem>> = [];
         for (const productVariantId of variantIds) {
             for (const languageCode of languageCodes) {
@@ -494,7 +498,7 @@ export class IndexerController {
                 });
             }
         }
-        await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(keys as any));
+        await this.queue.push(() => this.connection.getRepository(ctx, SearchIndexItem).delete(keys as any));
     }
 
     /**

+ 5 - 5
packages/core/src/plugin/default-search-plugin/search-strategy/mysql-search-strategy.ts

@@ -38,7 +38,7 @@ export class MysqlSearchStrategy implements SearchStrategy {
         enabledOnly: boolean,
     ): Promise<Map<ID, number>> {
         const facetValuesQb = this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .createQueryBuilder('si')
             .select(['MIN(productId)', 'MIN(productVariantId)'])
             .addSelect('GROUP_CONCAT(facetValueIds)', 'facetValues');
@@ -60,7 +60,7 @@ export class MysqlSearchStrategy implements SearchStrategy {
         enabledOnly: boolean,
     ): Promise<Map<ID, number>> {
         const collectionsQb = this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .createQueryBuilder('si')
             .select(['MIN(productId)', 'MIN(productVariantId)'])
             .addSelect('GROUP_CONCAT(collectionIds)', 'collections');
@@ -85,7 +85,7 @@ export class MysqlSearchStrategy implements SearchStrategy {
         const skip = input.skip || 0;
         const sort = input.sort;
         const qb = this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .createQueryBuilder('si')
             .select(this.createMysqlSelect(!!input.groupByProduct));
         if (input.groupByProduct) {
@@ -122,7 +122,7 @@ export class MysqlSearchStrategy implements SearchStrategy {
         const innerQb = this.applyTermAndFilters(
             ctx,
             this.connection
-                .getRepository(SearchIndexItem)
+                .getRepository(ctx, SearchIndexItem)
                 .createQueryBuilder('si')
                 .select(this.createMysqlSelect(!!input.groupByProduct)),
             input,
@@ -149,7 +149,7 @@ export class MysqlSearchStrategy implements SearchStrategy {
 
         if (term && term.length > this.minTermLength) {
             const termScoreQuery = this.connection
-                .getRepository(SearchIndexItem)
+                .getRepository(ctx, SearchIndexItem)
                 .createQueryBuilder('si_inner')
                 .select('si_inner.productId', 'inner_productId')
                 .addSelect('si_inner.productVariantId', 'inner_productVariantId')

+ 4 - 4
packages/core/src/plugin/default-search-plugin/search-strategy/postgres-search-strategy.ts

@@ -38,7 +38,7 @@ export class PostgresSearchStrategy implements SearchStrategy {
         enabledOnly: boolean,
     ): Promise<Map<ID, number>> {
         const facetValuesQb = this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .createQueryBuilder('si')
             .select(['"si"."productId"', 'MAX("si"."productVariantId")'])
             .addSelect(`string_agg("si"."facetValueIds",',')`, 'facetValues');
@@ -60,7 +60,7 @@ export class PostgresSearchStrategy implements SearchStrategy {
         enabledOnly: boolean,
     ): Promise<Map<ID, number>> {
         const collectionsQb = this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .createQueryBuilder('si')
             .select(['"si"."productId"', 'MAX("si"."productVariantId")'])
             .addSelect(`string_agg("si"."collectionIds",',')`, 'collections');
@@ -85,7 +85,7 @@ export class PostgresSearchStrategy implements SearchStrategy {
         const skip = input.skip || 0;
         const sort = input.sort;
         const qb = this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .createQueryBuilder('si')
             .select(this.createPostgresSelect(!!input.groupByProduct));
         if (input.groupByProduct) {
@@ -125,7 +125,7 @@ export class PostgresSearchStrategy implements SearchStrategy {
         const innerQb = this.applyTermAndFilters(
             ctx,
             this.connection
-                .getRepository(SearchIndexItem)
+                .getRepository(ctx, SearchIndexItem)
                 .createQueryBuilder('si')
                 .select(this.createPostgresSelect(!!input.groupByProduct)),
             input,

+ 4 - 4
packages/core/src/plugin/default-search-plugin/search-strategy/sqlite-search-strategy.ts

@@ -38,7 +38,7 @@ export class SqliteSearchStrategy implements SearchStrategy {
         enabledOnly: boolean,
     ): Promise<Map<ID, number>> {
         const facetValuesQb = this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .createQueryBuilder('si')
             .select(['productId', 'productVariantId'])
             .addSelect('GROUP_CONCAT(si.facetValueIds)', 'facetValues');
@@ -60,7 +60,7 @@ export class SqliteSearchStrategy implements SearchStrategy {
         enabledOnly: boolean,
     ): Promise<Map<ID, number>> {
         const collectionsQb = this.connection
-            .getRepository(SearchIndexItem)
+            .getRepository(ctx, SearchIndexItem)
             .createQueryBuilder('si')
             .select(['productId', 'productVariantId'])
             .addSelect('GROUP_CONCAT(si.collectionIds)', 'collections');
@@ -84,7 +84,7 @@ export class SqliteSearchStrategy implements SearchStrategy {
         const take = input.take || 25;
         const skip = input.skip || 0;
         const sort = input.sort;
-        const qb = this.connection.getRepository(SearchIndexItem).createQueryBuilder('si');
+        const qb = this.connection.getRepository(ctx, SearchIndexItem).createQueryBuilder('si');
         if (input.groupByProduct) {
             qb.addSelect('MIN(price)', 'minPrice').addSelect('MAX(price)', 'maxPrice');
             qb.addSelect('MIN(priceWithTax)', 'minPriceWithTax').addSelect(
@@ -120,7 +120,7 @@ export class SqliteSearchStrategy implements SearchStrategy {
     async getTotalCount(ctx: RequestContext, input: SearchInput, enabledOnly: boolean): Promise<number> {
         const innerQb = this.applyTermAndFilters(
             ctx,
-            this.connection.getRepository(SearchIndexItem).createQueryBuilder('si'),
+            this.connection.getRepository(ctx, SearchIndexItem).createQueryBuilder('si'),
             input,
         );
 

+ 1 - 1
packages/core/src/service/helpers/external-authentication/external-authentication.service.ts

@@ -98,7 +98,7 @@ export class ExternalAuthenticationService {
         if (existingUser) {
             user = existingUser;
         } else {
-            const customerRole = await this.roleService.getCustomerRole();
+            const customerRole = await this.roleService.getCustomerRole(ctx);
             user = new User({
                 identifier: config.emailAddress,
                 roles: [customerRole],

+ 1 - 1
packages/core/src/service/helpers/list-query-builder/list-query-builder.ts

@@ -195,7 +195,7 @@ export class ListQueryBuilder implements OnApplicationBootstrap {
 
         const repo = extendedOptions.ctx
             ? this.connection.getRepository(extendedOptions.ctx, entity)
-            : this.connection.getRepository(entity);
+            : this.connection.rawConnection.getRepository(entity);
 
         const qb = repo.createQueryBuilder(extendedOptions.entityAlias || entity.name.toLowerCase());
         const minimumRequiredRelations = this.getMinimumRequiredRelations(repo, options, extendedOptions);

+ 1 - 1
packages/core/src/service/initializer.service.ts

@@ -57,7 +57,7 @@ export class InitializerService {
         const delayMs = 100;
         for (let attempt = 0; attempt < retries; attempt++) {
             try {
-                const result = await this.connection.getRepository(Administrator).find();
+                const result = await this.connection.rawConnection.getRepository(Administrator).find();
                 return;
             } catch (e) {
                 if (attempt < retries - 1) {

+ 8 - 7
packages/core/src/service/services/administrator.service.ts

@@ -158,7 +158,7 @@ export class AdministratorService {
         if (input.roleIds) {
             const isSoleSuperAdmin = await this.isSoleSuperadmin(ctx, input.id);
             if (isSoleSuperAdmin) {
-                const superAdminRole = await this.roleService.getSuperAdminRole();
+                const superAdminRole = await this.roleService.getSuperAdminRole(ctx);
                 if (!input.roleIds.find(id => idsAreEqual(id, superAdminRole.id))) {
                     throw new InternalServerError('error.superadmin-must-have-superadmin-role');
                 }
@@ -234,7 +234,7 @@ export class AdministratorService {
      * with SuperAdmin permissions.
      */
     private async isSoleSuperadmin(ctx: RequestContext, id: ID) {
-        const superAdminRole = await this.roleService.getSuperAdminRole();
+        const superAdminRole = await this.roleService.getSuperAdminRole(ctx);
         const allAdmins = await this.connection.getRepository(ctx, Administrator).find({
             relations: ['user', 'user.roles'],
         });
@@ -258,7 +258,7 @@ export class AdministratorService {
     private async ensureSuperAdminExists() {
         const { superadminCredentials } = this.configService.authOptions;
 
-        const superAdminUser = await this.connection.getRepository(User).findOne({
+        const superAdminUser = await this.connection.rawConnection.getRepository(User).findOne({
             where: {
                 identifier: superadminCredentials.identifier,
             },
@@ -274,7 +274,7 @@ export class AdministratorService {
                 roleIds: [superAdminRole.id],
             });
         } else {
-            const superAdministrator = await this.connection.getRepository(Administrator).findOne({
+            const superAdministrator = await this.connection.rawConnection.getRepository(Administrator).findOne({
                 where: {
                     user: superAdminUser,
                 },
@@ -286,18 +286,19 @@ export class AdministratorService {
                     lastName: 'Admin',
                 });
                 const createdAdministrator = await this.connection
+                    .rawConnection
                     .getRepository(Administrator)
                     .save(administrator);
                 createdAdministrator.user = superAdminUser;
-                await this.connection.getRepository(Administrator).save(createdAdministrator);
+                await this.connection.rawConnection.getRepository(Administrator).save(createdAdministrator);
             } else if (superAdministrator.deletedAt != null) {
                 superAdministrator.deletedAt = null;
-                await this.connection.getRepository(Administrator).save(superAdministrator);
+                await this.connection.rawConnection.getRepository(Administrator).save(superAdministrator);
             }
 
             if (superAdminUser.deletedAt != null) {
                 superAdminUser.deletedAt = null;
-                await this.connection.getRepository(User).save(superAdminUser);
+                await this.connection.rawConnection.getRepository(User).save(superAdminUser);
             }
         }
     }

+ 19 - 13
packages/core/src/service/services/channel.service.ts

@@ -85,7 +85,7 @@ export class ChannelService {
         entity: T,
         ctx: RequestContext,
     ): Promise<T> {
-        const defaultChannel = await this.getDefaultChannel();
+        const defaultChannel = await this.getDefaultChannel(ctx);
         const channelIds = unique([ctx.channelId, defaultChannel.id]);
         entity.channels = channelIds.map(id => ({ id })) as any;
         this.eventBus.publish(new ChangeChannelEvent(ctx, entity, [ctx.channelId], 'assigned'));
@@ -137,21 +137,27 @@ export class ChannelService {
         this.eventBus.publish(new ChangeChannelEvent(ctx, entity, channelIds, 'removed', entityType));
         return entity;
     }
-
     /**
      * @description
      * Given a channel token, returns the corresponding Channel if it exists, else will throw
      * a {@link ChannelNotFoundError}.
      */
-    async getChannelFromToken(token: string): Promise<Channel> {
-        const allChannels = await this.allChannels.value();
-        if (allChannels.length === 1 || token === '') {
+    async getChannelFromToken(token: string): Promise<Channel>;
+    async getChannelFromToken(ctx: RequestContext, token: string): Promise<Channel>;
+    async getChannelFromToken(ctxOrToken: RequestContext | string, token?: string): Promise<Channel> {
+        const [ctx, channelToken] = ctxOrToken instanceof RequestContext 
+            ? [ctxOrToken, token!]
+            : [undefined, ctxOrToken]
+
+        const allChannels = await this.allChannels.value(ctx);
+
+        if (allChannels.length === 1 || channelToken === '') {
             // there is only the default channel, so return it
-            return this.getDefaultChannel();
+            return this.getDefaultChannel(ctx);
         }
-        const channel = allChannels.find(c => c.token === token);
+        const channel = allChannels.find(c => c.token === channelToken);
         if (!channel) {
-            throw new ChannelNotFoundError(token);
+            throw new ChannelNotFoundError(channelToken);
         }
         return channel;
     }
@@ -160,8 +166,8 @@ export class ChannelService {
      * @description
      * Returns the default Channel.
      */
-    async getDefaultChannel(): Promise<Channel> {
-        const allChannels = await this.allChannels.value();
+    async getDefaultChannel(ctx?: RequestContext): Promise<Channel> {
+        const allChannels = await this.allChannels.value(ctx);
         const defaultChannel = allChannels.find(channel => channel.code === DEFAULT_CHANNEL_CODE);
 
         if (!defaultChannel) {
@@ -289,7 +295,7 @@ export class ChannelService {
      */
     private async ensureDefaultChannelExists() {
         const { defaultChannelToken } = this.configService;
-        const defaultChannel = await this.connection.getRepository(Channel).findOne({
+        const defaultChannel = await this.connection.rawConnection.getRepository(Channel).findOne({
             where: {
                 code: DEFAULT_CHANNEL_CODE,
             },
@@ -303,10 +309,10 @@ export class ChannelService {
                 currencyCode: CurrencyCode.USD,
                 token: defaultChannelToken,
             });
-            await this.connection.getRepository(Channel).save(newDefaultChannel, { reload: false });
+            await this.connection.rawConnection.getRepository(Channel).save(newDefaultChannel, { reload: false });
         } else if (defaultChannelToken && defaultChannel.token !== defaultChannelToken) {
             defaultChannel.token = defaultChannelToken;
-            await this.connection.getRepository(Channel).save(defaultChannel, { reload: false });
+            await this.connection.rawConnection.getRepository(Channel).save(defaultChannel, { reload: false });
         }
     }
 

+ 8 - 4
packages/core/src/service/services/collection.service.ts

@@ -90,6 +90,7 @@ export class CollectionService implements OnModuleInit {
             .pipe(debounceTime(50))
             .subscribe(async event => {
                 const collections = await this.connection
+                    .rawConnection
                     .getRepository(Collection)
                     .createQueryBuilder('collection')
                     .select('collection.id', 'id')
@@ -352,7 +353,7 @@ export class CollectionService implements OnModuleInit {
         const ancestors = await getParent(collectionId);
 
         return this.connection
-            .getRepository(Collection)
+            .getRepository(ctx, Collection)
             .findByIds(ancestors.map(c => c.id))
             .then(categories => {
                 const resultCategories: Array<Collection | Translated<Collection>> = [];
@@ -562,6 +563,7 @@ export class CollectionService implements OnModuleInit {
         const postIds = collection.productVariants.map(v => v.id);
         try {
             await this.connection
+                .rawConnection
                 .getRepository(Collection)
                 // Only update the exact changed properties, to avoid VERY hard-to-debug
                 // non-deterministic race conditions e.g. when the "position" is changed
@@ -592,7 +594,9 @@ export class CollectionService implements OnModuleInit {
             return [];
         }
         const { collectionFilters } = this.configService.catalogOptions;
-        let qb = this.connection.getRepository(ProductVariant).createQueryBuilder('productVariant');
+        let qb = this.connection.rawConnection
+            .getRepository(ProductVariant)
+            .createQueryBuilder('productVariant');
 
         for (const filterType of collectionFilters) {
             const filtersOfType = filters.filter(f => f.code === filterType.code);
@@ -680,7 +684,7 @@ export class CollectionService implements OnModuleInit {
         // We purposefully do not use the ctx in saving the new root Collection
         // so that even if the outer transaction fails, the root collection will still
         // get persisted.
-        const rootTranslation = await this.connection.getRepository(CollectionTranslation).save(
+        const rootTranslation = await this.connection.rawConnection.getRepository(CollectionTranslation).save(
             new CollectionTranslation({
                 languageCode: this.configService.defaultLanguageCode,
                 name: ROOT_COLLECTION_NAME,
@@ -689,7 +693,7 @@ export class CollectionService implements OnModuleInit {
             }),
         );
 
-        const newRoot = await this.connection.getRepository(Collection).save(
+        const newRoot = await this.connection.rawConnection.getRepository(Collection).save(
             new Collection({
                 isRoot: true,
                 position: 0,

+ 1 - 1
packages/core/src/service/services/customer.service.ts

@@ -235,7 +235,7 @@ export class CustomerService {
             // Customer already exists, bring to this Channel
             const updatedCustomer = patchEntity(existingCustomer, input);
             updatedCustomer.channels.push(ctx.channel);
-            return this.connection.getRepository(Customer).save(updatedCustomer);
+            return this.connection.getRepository(ctx, Customer).save(updatedCustomer);
         } else if (existingCustomer || existingUser) {
             // Not sure when this situation would occur
             return new EmailAddressConflictAdminError();

+ 12 - 4
packages/core/src/service/services/facet-value.service.ts

@@ -43,13 +43,21 @@ export class FacetValueService {
         private eventBus: EventBus,
     ) {}
 
-    findAll(lang: LanguageCode): Promise<Array<Translated<FacetValue>>> {
-        return this.connection
-            .getRepository(FacetValue)
+    /**
+     * @deprecated Use {@link FacetValueService.findAll findAll(ctx, lang)} instead
+     */
+    findAll(lang: LanguageCode): Promise<Array<Translated<FacetValue>>>;
+    findAll(ctx: RequestContext, lang: LanguageCode): Promise<Array<Translated<FacetValue>>>;
+    findAll(ctxOrLang: RequestContext | LanguageCode, lang?: LanguageCode): Promise<Array<Translated<FacetValue>>> {
+        const [repository, languageCode] = ctxOrLang instanceof RequestContext 
+            ? [this.connection.getRepository(ctxOrLang, FacetValue), lang!]
+            : [this.connection.rawConnection.getRepository(FacetValue), ctxOrLang];
+
+        return repository
             .find({
                 relations: ['facet'],
             })
-            .then(facetValues => facetValues.map(facetValue => translateDeep(facetValue, lang, ['facet'])));
+            .then(facetValues => facetValues.map(facetValue => translateDeep(facetValue, languageCode, ['facet'])));
     }
 
     findOne(ctx: RequestContext, id: ID): Promise<Translated<FacetValue> | undefined> {

+ 22 - 10
packages/core/src/service/services/facet.service.ts

@@ -81,17 +81,29 @@ export class FacetService {
             .then(facet => facet && translateDeep(facet, ctx.languageCode, ['values', ['values', 'facet']]));
     }
 
-    findByCode(facetCode: string, lang: LanguageCode): Promise<Translated<Facet> | undefined> {
+    /**
+     * @deprecated Use {@link FacetService.findByCode findByCode(ctx, facetCode, lang)} instead
+     */
+    findByCode(facetCode: string, lang: LanguageCode): Promise<Translated<Facet> | undefined>;
+    findByCode(ctx: RequestContext, facetCode: string, lang: LanguageCode): Promise<Translated<Facet> | undefined>;
+    findByCode(
+        ctxOrFacetCode: RequestContext | string, 
+        facetCodeOrLang: string | LanguageCode, 
+        lang?: LanguageCode
+    ): Promise<Translated<Facet> | undefined> {
         const relations = ['values', 'values.facet'];
-        return this.connection
-            .getRepository(Facet)
-            .findOne({
-                where: {
-                    code: facetCode,
-                },
-                relations,
-            })
-            .then(facet => facet && translateDeep(facet, lang, ['values', ['values', 'facet']]));
+        const [repository, facetCode, languageCode] = ctxOrFacetCode instanceof RequestContext 
+            ? [this.connection.getRepository(ctxOrFacetCode, Facet), facetCodeOrLang, lang!]
+            : [this.connection.rawConnection.getRepository(Facet), ctxOrFacetCode, facetCodeOrLang as LanguageCode];
+
+
+        return repository.findOne({
+            where: {
+                code: facetCode,
+            },
+            relations,
+        })
+        .then(facet => facet && translateDeep(facet, languageCode, ['values', ['values', 'facet']]));
     }
 
     /**

+ 3 - 3
packages/core/src/service/services/global-settings.service.ts

@@ -32,20 +32,20 @@ export class GlobalSettingsService {
      */
     async initGlobalSettings() {
         try {
-            const result = await this.connection.getRepository(GlobalSettings).find();
+            const result = await this.connection.rawConnection.getRepository(GlobalSettings).find();
             if (result.length === 0) {
                 throw new Error('No global settings');
             }
             if (1 < result.length) {
                 // Strange edge case, see https://github.com/vendure-ecommerce/vendure/issues/987
                 const toDelete = result.slice(1);
-                await this.connection.getRepository(GlobalSettings).remove(toDelete);
+                await this.connection.rawConnection.getRepository(GlobalSettings).remove(toDelete);
             }
         } catch (err) {
             const settings = new GlobalSettings({
                 availableLanguages: [this.configService.defaultLanguageCode],
             });
-            await this.connection.getRepository(GlobalSettings).save(settings, { reload: false });
+            await this.connection.rawConnection.getRepository(GlobalSettings).save(settings, { reload: false });
         }
     }
 

+ 2 - 2
packages/core/src/service/services/product-variant.service.ts

@@ -424,7 +424,7 @@ export class ProductVariantService {
             );
         }
 
-        const defaultChannelId = (await this.channelService.getDefaultChannel()).id;
+        const defaultChannelId = (await this.channelService.getDefaultChannel(ctx)).id;
         await this.createOrUpdateProductVariantPrice(ctx, createdVariant.id, input.price, ctx.channelId);
         if (!idsAreEqual(ctx.channelId, defaultChannelId)) {
             // When creating a ProductVariant _not_ in the default Channel, we still need to
@@ -689,7 +689,7 @@ export class ProductVariantService {
         if (!hasPermission) {
             throw new ForbiddenError();
         }
-        const defaultChannel = await this.channelService.getDefaultChannel();
+        const defaultChannel = await this.channelService.getDefaultChannel(ctx);
         if (idsAreEqual(input.channelId, defaultChannel.id)) {
             throw new UserInputError('error.products-cannot-be-removed-from-default-channel');
         }

+ 2 - 2
packages/core/src/service/services/promotion.service.ts

@@ -177,7 +177,7 @@ export class PromotionService {
         ctx: RequestContext,
         input: AssignPromotionsToChannelInput,
     ): Promise<Promotion[]> {
-        const defaultChannel = await this.channelService.getDefaultChannel();
+        const defaultChannel = await this.channelService.getDefaultChannel(ctx);
         if (!idsAreEqual(ctx.channelId, defaultChannel.id)) {
             throw new IllegalOperationError(`promotion-channels-can-only-be-changed-from-default-channel`);
         }
@@ -195,7 +195,7 @@ export class PromotionService {
     }
 
     async removePromotionsFromChannel(ctx: RequestContext, input: RemovePromotionsFromChannelInput) {
-        const defaultChannel = await this.channelService.getDefaultChannel();
+        const defaultChannel = await this.channelService.getDefaultChannel(ctx);
         if (!idsAreEqual(ctx.channelId, defaultChannel.id)) {
             throw new IllegalOperationError(`promotion-channels-can-only-be-changed-from-default-channel`);
         }

+ 13 - 9
packages/core/src/service/services/role.service.ts

@@ -89,8 +89,8 @@ export class RoleService {
      * @description
      * Returns the special SuperAdmin Role, which always exists in Vendure.
      */
-    getSuperAdminRole(): Promise<Role> {
-        return this.getRoleByCode(SUPER_ADMIN_ROLE_CODE).then(role => {
+    getSuperAdminRole(ctx?: RequestContext): Promise<Role> {
+        return this.getRoleByCode(ctx, SUPER_ADMIN_ROLE_CODE).then(role => {
             if (!role) {
                 throw new InternalServerError(`error.super-admin-role-not-found`);
             }
@@ -102,8 +102,8 @@ export class RoleService {
      * @description
      * Returns the special Customer Role, which always exists in Vendure.
      */
-    getCustomerRole(): Promise<Role> {
-        return this.getRoleByCode(CUSTOMER_ROLE_CODE).then(role => {
+    getCustomerRole(ctx?: RequestContext): Promise<Role> {
+        return this.getRoleByCode(ctx, CUSTOMER_ROLE_CODE).then(role => {
             if (!role) {
                 throw new InternalServerError(`error.customer-role-not-found`);
             }
@@ -228,8 +228,12 @@ export class RoleService {
         }
     }
 
-    private getRoleByCode(code: string): Promise<Role | undefined> {
-        return this.connection.getRepository(Role).findOne({
+    private getRoleByCode(ctx: RequestContext | undefined, code: string) {
+        const repository = ctx 
+            ? this.connection.getRepository(ctx, Role) 
+            : this.connection.rawConnection.getRepository(Role);
+
+        return repository.findOne({
             where: { code },
         });
     }
@@ -242,7 +246,7 @@ export class RoleService {
         try {
             const superAdminRole = await this.getSuperAdminRole();
             superAdminRole.permissions = assignablePermissions;
-            await this.connection.getRepository(Role).save(superAdminRole, { reload: false });
+            await this.connection.rawConnection.getRepository(Role).save(superAdminRole, { reload: false });
         } catch (err) {
             const defaultChannel = await this.channelService.getDefaultChannel();
             await this.createRoleForChannels(
@@ -284,13 +288,13 @@ export class RoleService {
      * permissions are removed from those Roles.
      */
     private async ensureRolesHaveValidPermissions() {
-        const roles = await this.connection.getRepository(Role).find();
+        const roles = await this.connection.rawConnection.getRepository(Role).find();
         const assignablePermissions = this.getAllAssignablePermissions();
         for (const role of roles) {
             const invalidPermissions = role.permissions.filter(p => !assignablePermissions.includes(p));
             if (invalidPermissions.length) {
                 role.permissions = role.permissions.filter(p => assignablePermissions.includes(p));
-                await this.connection.getRepository(Role).save(role);
+                await this.connection.rawConnection.getRepository(Role).save(role);
             }
         }
     }

+ 7 - 7
packages/core/src/service/services/session.service.ts

@@ -113,7 +113,7 @@ export class SessionService implements EntitySubscriberInterface {
             invalidated: false,
         });
         // save the new session
-        const newSession = await this.connection.getRepository(AnonymousSession).save(session);
+        const newSession = await this.connection.rawConnection.getRepository(AnonymousSession).save(session);
         const serializedSession = this.serializeSession(newSession);
         await this.sessionCacheStrategy.set(serializedSession);
         return serializedSession;
@@ -172,7 +172,7 @@ export class SessionService implements EntitySubscriberInterface {
      * Looks for a valid session with the given token and returns one if found.
      */
     private async findSessionByToken(token: string): Promise<Session | undefined> {
-        const session = await this.connection
+        const session = await this.connection.rawConnection
             .getRepository(Session)
             .createQueryBuilder('session')
             .leftJoinAndSelect('session.user', 'user')
@@ -198,7 +198,7 @@ export class SessionService implements EntitySubscriberInterface {
         order: Order,
     ): Promise<CachedSession> {
         const session = await this.connection
-            .getRepository(Session)
+            .getRepository(ctx, Session)
             .findOne(serializedSession.id, { relations: ['user', 'user.roles', 'user.roles.channels'] });
         if (session) {
             session.activeOrder = order;
@@ -217,7 +217,7 @@ export class SessionService implements EntitySubscriberInterface {
     async unsetActiveOrder(ctx: RequestContext, serializedSession: CachedSession): Promise<CachedSession> {
         if (serializedSession.activeOrderId) {
             const session = await this.connection
-                .getRepository(Session)
+                .getRepository(ctx, Session)
                 .findOne(serializedSession.id, { relations: ['user', 'user.roles', 'user.roles.channels'] });
             if (session) {
                 session.activeOrder = null;
@@ -235,12 +235,12 @@ export class SessionService implements EntitySubscriberInterface {
      * Sets the `activeChannel` on the given cached session object and updates the cache.
      */
     async setActiveChannel(serializedSession: CachedSession, channel: Channel): Promise<CachedSession> {
-        const session = await this.connection
+        const session = await this.connection.rawConnection
             .getRepository(Session)
             .findOne(serializedSession.id, { relations: ['user', 'user.roles', 'user.roles.channels'] });
         if (session) {
             session.activeChannel = channel;
-            await this.connection.getRepository(Session).save(session, { reload: false });
+            await this.connection.rawConnection.getRepository(Session).save(session, { reload: false });
             const updatedSerializedSession = this.serializeSession(session);
             await this.sessionCacheStrategy.set(updatedSerializedSession);
             return updatedSerializedSession;
@@ -285,7 +285,7 @@ export class SessionService implements EntitySubscriberInterface {
         if (session.expires.getTime() - now < this.sessionDurationInMs / 2) {
             const newExpiryDate = this.getExpiryDate(this.sessionDurationInMs);
             session.expires = newExpiryDate;
-            await this.connection
+            await this.connection.rawConnection
                 .getRepository(Session)
                 .update({ id: session.id }, { expires: newExpiryDate });
         }

+ 2 - 2
packages/core/src/service/services/shipping-method.service.ts

@@ -212,7 +212,7 @@ export class ShippingMethodService {
      * Ensures that all ShippingMethods have a valid fulfillmentHandlerCode
      */
     private async verifyShippingMethods() {
-        const activeShippingMethods = await this.connection.getRepository(ShippingMethod).find({
+        const activeShippingMethods = await this.connection.rawConnection.getRepository(ShippingMethod).find({
             where: { deletedAt: null },
         });
         for (const method of activeShippingMethods) {
@@ -220,7 +220,7 @@ export class ShippingMethodService {
             const verifiedHandlerCode = this.ensureValidFulfillmentHandlerCode(method.code, handlerCode);
             if (handlerCode !== verifiedHandlerCode) {
                 method.fulfillmentHandlerCode = verifiedHandlerCode;
-                await this.connection.getRepository(ShippingMethod).save(method);
+                await this.connection.rawConnection.getRepository(ShippingMethod).save(method);
             }
         }
     }

+ 1 - 1
packages/core/src/service/services/user.service.ts

@@ -69,7 +69,7 @@ export class UserService {
     ): Promise<User | PasswordValidationError> {
         const user = new User();
         user.identifier = identifier;
-        const customerRole = await this.roleService.getCustomerRole();
+        const customerRole = await this.roleService.getCustomerRole(ctx);
         user.roles = [customerRole];
         const addNativeAuthResult = await this.addNativeAuthenticationMethod(ctx, user, identifier, password);
         if (isGraphQlErrorResult(addNativeAuthResult)) {