Просмотр исходного кода

Merge branch 'minor' into major

Michael Bromley 3 лет назад
Родитель
Сommit
d893648347

+ 1 - 1
docs/content/developer-guide/importing-product-data.md

@@ -35,7 +35,7 @@ Here's an explanation of each column:
 * `name`: The name of the product. Rows with an empty "name" are interpreted as variants of the preceeding product row.
 * `slug`: The product's slug. Can be omitted, in which case will be generated from the name.
 * `description`: The product description.
-* `assets`: One or more asset file names separated by the pipe (`|`) character. The files must be located on the local file system, and the path is interpreted as being relative to the [`importAssetsDir`]({{< relref "/docs/typescript-api/import-export/import-export-options" >}}#importassetsdir) as defined in the VendureConfig. The first asset will be set as the featuredAsset.
+* `assets`: One or more asset file names separated by the pipe (`|`) character. The files can be located on the local file system, in which case the path is interpreted as being relative to the [`importAssetsDir`]({{< relref "/docs/typescript-api/import-export/import-export-options" >}}#importassetsdir) as defined in the VendureConfig. Files can also be urls which will be fetched from a remote http/https url. If you need more control over how assets are imported, you can implement a custom [AssetImportStrategy]({{< relref "asset-import-strategy" >}}). The first asset will be set as the featuredAsset.
 * `facets`: One or more facets to apply to the product separated by the pipe (`|`) character. A facet has the format `<facet-name>:<facet-value>`.
 * `optionGroups`: OptionGroups define what variants make up the product. Applies only to products with more than one variant. 
 * `optionValues`: For each optionGroup defined, a corresponding value must be specified for each variant. Applies only to products with more than one variant.

+ 10 - 0
docs/content/developer-guide/logging.md

@@ -0,0 +1,10 @@
+---
+title: "Logging"
+showtoc: true
+---
+
+# Logging
+
+- log levels of default logger
+- To log database queries: set `dbConnectionOptions.logging: true` or eg `['query']` and the set the logLevel to Debug.
+- 

+ 73 - 25
packages/core/e2e/database-transactions.e2e-spec.ts

@@ -11,15 +11,14 @@ import {
     TransactionTestPlugin,
     TRIGGER_ATTEMPTED_READ_EMAIL,
     TRIGGER_ATTEMPTED_UPDATE_EMAIL,
+    TRIGGER_NO_OPERATION,
 } from './fixtures/test-plugins/transaction-test-plugin';
 
 type DBType = 'mysql' | 'postgres' | 'sqlite' | 'sqljs';
 
 const itIfDb = (dbs: DBType[]) => {
-    return dbs.includes(process.env.DB as DBType || 'sqljs')
-        ? it
-        : it.skip
-} 
+    return dbs.includes((process.env.DB as DBType) || 'sqljs') ? it : it.skip;
+};
 
 describe('Transaction infrastructure', () => {
     const { server, adminClient } = createTestEnvironment(
@@ -82,8 +81,8 @@ describe('Transaction infrastructure', () => {
             await adminClient.query(CREATE_N_ADMINS, {
                 emailAddress: 'testN-',
                 failFactor: 0.4,
-                n: 10
-            })
+                n: 10,
+            });
             fail('Should have thrown');
         } catch (e) {
             expect(e.message).toContain('Failed!');
@@ -155,26 +154,30 @@ describe('Transaction infrastructure', () => {
         expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false);
     });
 
-    itIfDb(['postgres', 'mysql'])('failing mutation inside connection.withTransaction() wrapper with context and promise concurrent execution', async () => {
-        try {
-            await adminClient.query(CREATE_N_ADMINS2, {
-                emailAddress: 'testN-',
-                failFactor: 0.4,
-                n: 10
-            })
-            fail('Should have thrown');
-        } catch (e) {
-            expect(e.message)
-                .toMatch(/^Failed!|Query runner already released. Cannot run queries anymore.$/);
-        }
+    itIfDb(['postgres', 'mysql'])(
+        'failing mutation inside connection.withTransaction() wrapper with context and promise concurrent execution',
+        async () => {
+            try {
+                await adminClient.query(CREATE_N_ADMINS2, {
+                    emailAddress: 'testN-',
+                    failFactor: 0.4,
+                    n: 10,
+                });
+                fail('Should have thrown');
+            } catch (e) {
+                expect(e.message).toMatch(
+                    /^Failed!|Query runner already released. Cannot run queries anymore.$/,
+                );
+            }
 
-        const { verify } = await adminClient.query(VERIFY_TEST);
+            const { verify } = await adminClient.query(VERIFY_TEST);
 
-        expect(verify.admins.length).toBe(2);
-        expect(verify.users.length).toBe(3);
-        expect(!!verify.admins.find((a: any) => a.emailAddress.includes('testN'))).toBe(false);
-        expect(!!verify.users.find((u: any) => u.identifier.includes('testN'))).toBe(false);
-    });
+            expect(verify.admins.length).toBe(2);
+            expect(verify.users.length).toBe(3);
+            expect(!!verify.admins.find((a: any) => a.emailAddress.includes('testN'))).toBe(false);
+            expect(!!verify.users.find((u: any) => u.identifier.includes('testN'))).toBe(false);
+        },
+    );
 
     it('failing mutation inside connection.withTransaction() wrapper without request context', async () => {
         try {
@@ -196,6 +199,45 @@ describe('Transaction infrastructure', () => {
         expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false);
     });
 
+    it('non-failing mutation inside connection.withTransaction() wrapper with failing nested transactions and request context', async () => {
+        await adminClient.query(CREATE_N_ADMINS3, {
+            emailAddress: 'testNestedTransactionsN-',
+            failFactor: 0.5,
+            n: 2,
+        });
+
+        const { verify } = await adminClient.query(VERIFY_TEST);
+
+        expect(verify.admins.length).toBe(3);
+        expect(verify.users.length).toBe(4);
+        expect(
+            verify.admins.filter((a: any) => a.emailAddress.includes('testNestedTransactionsN')),
+        ).toHaveLength(1);
+        expect(
+            verify.users.filter((u: any) => u.identifier.includes('testNestedTransactionsN')),
+        ).toHaveLength(1);
+    });
+
+    it('event do not publish after transaction rollback', async () => {
+        TransactionTestPlugin.reset();
+        try {
+            await adminClient.query(CREATE_N_ADMINS, {
+                emailAddress: TRIGGER_NO_OPERATION,
+                failFactor: 0.5,
+                n: 2,
+            });
+            fail('Should have thrown');
+        } catch (e) {
+            expect(e.message).toContain('Failed!');
+        }
+
+        // Wait a bit to see an events in handlers
+        await new Promise(resolve => setTimeout(resolve, 100));
+
+        expect(TransactionTestPlugin.callHandler).not.toHaveBeenCalled();
+        expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled();
+    });
+
     // Testing https://github.com/vendure-ecommerce/vendure/issues/520
     it('passing transaction via EventBus', async () => {
         TransactionTestPlugin.reset();
@@ -289,6 +331,12 @@ const CREATE_N_ADMINS2 = gql`
     }
 `;
 
+const CREATE_N_ADMINS3 = gql`
+    mutation CreateNTestAdmins3($emailAddress: String!, $failFactor: Float!, $n: Int!) {
+        createNTestAdministrators3(emailAddress: $emailAddress, failFactor: $failFactor, n: $n)
+    }
+`;
+
 const VERIFY_TEST = gql`
     query VerifyTest {
         verify {
@@ -302,4 +350,4 @@ const VERIFY_TEST = gql`
             }
         }
     }
-`;
+`;

BIN
packages/core/e2e/fixtures/assets/guitar.png


+ 2 - 0
packages/core/e2e/fixtures/e2e-product-import-asset-urls.csv

@@ -0,0 +1,2 @@
+name  ,slug  ,description,assets                          ,facets,optionGroups,optionValues,sku    ,price ,taxCategory,stockOnHand,trackInventory,variantAssets,variantFacets,product:pageType,variant:weight,product:owner,product:keywords,product:localName
+Guitar,guitar,A guitar.  ,http://localhost:3456/guitar.png,      ,            ,            ,GUIT001,133.99,standard   ,0          ,false         ,             ,             ,default         ,100           ,             ,music|instrument,

+ 98 - 38
packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts

@@ -23,6 +23,7 @@ export class TestEvent extends VendureEvent {
     }
 }
 
+export const TRIGGER_NO_OPERATION = 'trigger-no-operation';
 export const TRIGGER_ATTEMPTED_UPDATE_EMAIL = 'trigger-attempted-update-email';
 export const TRIGGER_ATTEMPTED_READ_EMAIL = 'trigger-attempted-read-email';
 
@@ -48,7 +49,7 @@ class TestUserService {
         );
 
         return this.connection.getRepository(ctx, User).findOne({
-            where: { identifier }
+            where: { identifier },
         });
     }
 }
@@ -143,21 +144,30 @@ class TestResolver {
     async createNTestAdministrators(@Ctx() ctx: RequestContext, @Args() args: any) {
         let error: any;
 
-        const promises: Promise<any>[] = []
+        const promises: Promise<any>[] = [];
         for (let i = 0; i < args.n; i++) {
             promises.push(
-                new Promise(resolve => setTimeout(resolve, i * 10)).then(() =>
-                    this.testAdminService.createAdministrator(ctx, `${args.emailAddress}${i}`, i < args.n * args.failFactor)
-                )
-            )
+                new Promise(resolve => setTimeout(resolve, i * 10))
+                    .then(() =>
+                        this.testAdminService.createAdministrator(
+                            ctx,
+                            `${args.emailAddress}${i}`,
+                            i < args.n * args.failFactor,
+                        ),
+                    )
+                    .then(admin => {
+                        this.eventBus.publish(new TestEvent(ctx, admin));
+                        return admin;
+                    }),
+            );
         }
 
         const result = await Promise.all(promises).catch((e: any) => {
             error = e;
-        })
+        });
+
+        await this.allSettled(promises);
 
-        await this.allSettled(promises)
-    
         if (error) {
             throw error;
         }
@@ -169,23 +179,29 @@ class TestResolver {
     async createNTestAdministrators2(@Ctx() ctx: RequestContext, @Args() args: any) {
         let error: any;
 
-        const promises: Promise<any>[] = []
-        const result = await this.connection.withTransaction(ctx, _ctx => {
-            for (let i = 0; i < args.n; i++) {
-                promises.push(
-                    new Promise(resolve => setTimeout(resolve, i * 10)).then(() =>
-                        this.testAdminService.createAdministrator(_ctx, `${args.emailAddress}${i}`, i < args.n * args.failFactor)
-                    )
-                )
-            }
+        const promises: Promise<any>[] = [];
+        const result = await this.connection
+            .withTransaction(ctx, _ctx => {
+                for (let i = 0; i < args.n; i++) {
+                    promises.push(
+                        new Promise(resolve => setTimeout(resolve, i * 10)).then(() =>
+                            this.testAdminService.createAdministrator(
+                                _ctx,
+                                `${args.emailAddress}${i}`,
+                                i < args.n * args.failFactor,
+                            ),
+                        ),
+                    );
+                }
 
-            return Promise.all(promises);
-        }).catch((e: any) => {
-            error = e;
-        })
+                return Promise.all(promises);
+            })
+            .catch((e: any) => {
+                error = e;
+            });
+
+        await this.allSettled(promises);
 
-        await this.allSettled(promises)
-    
         if (error) {
             throw error;
         }
@@ -193,6 +209,38 @@ class TestResolver {
         return result;
     }
 
+    @Mutation()
+    @Transaction()
+    async createNTestAdministrators3(@Ctx() ctx: RequestContext, @Args() args: any) {
+        const result: any[] = [];
+
+        const admin = await this.testAdminService.createAdministrator(
+            ctx,
+            `${args.emailAddress}${args.n}`,
+            args.failFactor >= 1,
+        );
+
+        result.push(admin);
+
+        if (args.n > 0) {
+            try {
+                const admins = await this.connection.withTransaction(ctx, _ctx =>
+                    this.createNTestAdministrators3(_ctx, {
+                        ...args,
+                        n: args.n - 1,
+                        failFactor: (args.n * args.failFactor) / (args.n - 1),
+                    }),
+                );
+
+                result.push(...admins);
+            } catch (e) {
+                /* */
+            }
+        }
+
+        return result;
+    }
+
     @Query()
     async verify() {
         const admins = await this.connection.getRepository(Administrator).find();
@@ -205,20 +253,22 @@ class TestResolver {
 
     // Promise.allSettled polyfill
     // Same as Promise.all but waits until all promises will be fulfilled or rejected.
-    private allSettled<T>(promises: Promise<T>[]): Promise<({status: 'fulfilled', value: T} | { status: 'rejected', reason: any})[]> {
+    private allSettled<T>(
+        promises: Promise<T>[],
+    ): Promise<({ status: 'fulfilled'; value: T } | { status: 'rejected'; reason: any })[]> {
         return Promise.all(
             promises.map((promise, i) =>
-              promise
-                .then(value => ({
-                  status: "fulfilled" as const,
-                  value,
-                }))
-                .catch(reason => ({
-                  status: "rejected" as const,
-                  reason,
-                }))
-            )
-          );
+                promise
+                    .then(value => ({
+                        status: 'fulfilled' as const,
+                        value,
+                    }))
+                    .catch(reason => ({
+                        status: 'rejected' as const,
+                        reason,
+                    })),
+            ),
+        );
     }
 }
 
@@ -239,6 +289,7 @@ class TestResolver {
                 ): Administrator
                 createNTestAdministrators(emailAddress: String!, failFactor: Float!, n: Int!): JSON
                 createNTestAdministrators2(emailAddress: String!, failFactor: Float!, n: Int!): JSON
+                createNTestAdministrators3(emailAddress: String!, failFactor: Float!, n: Int!): JSON
             }
             type VerifyResult {
                 admins: [Administrator!]!
@@ -253,6 +304,7 @@ class TestResolver {
 })
 export class TransactionTestPlugin implements OnApplicationBootstrap {
     private subscription: Subscription;
+    static callHandler = jest.fn();
     static errorHandler = jest.fn();
     static eventHandlerComplete$ = new ReplaySubject(1);
 
@@ -260,6 +312,7 @@ export class TransactionTestPlugin implements OnApplicationBootstrap {
 
     static reset() {
         this.eventHandlerComplete$ = new ReplaySubject(1);
+        this.callHandler.mockClear();
         this.errorHandler.mockClear();
     }
 
@@ -268,7 +321,13 @@ export class TransactionTestPlugin implements OnApplicationBootstrap {
         // when used in an Event subscription
         this.subscription = this.eventBus.ofType(TestEvent).subscribe(async event => {
             const { ctx, administrator } = event;
-            if (administrator.emailAddress === TRIGGER_ATTEMPTED_UPDATE_EMAIL) {
+
+            if (administrator.emailAddress?.includes(TRIGGER_NO_OPERATION)) {
+                TransactionTestPlugin.callHandler();
+                TransactionTestPlugin.eventHandlerComplete$.complete();
+            }
+            if (administrator.emailAddress?.includes(TRIGGER_ATTEMPTED_UPDATE_EMAIL)) {
+                TransactionTestPlugin.callHandler();
                 const adminRepository = this.connection.getRepository(ctx, Administrator);
                 await new Promise(resolve => setTimeout(resolve, 50));
                 administrator.lastName = 'modified';
@@ -280,7 +339,8 @@ export class TransactionTestPlugin implements OnApplicationBootstrap {
                     TransactionTestPlugin.eventHandlerComplete$.complete();
                 }
             }
-            if (administrator.emailAddress === TRIGGER_ATTEMPTED_READ_EMAIL) {
+            if (administrator.emailAddress?.includes(TRIGGER_ATTEMPTED_READ_EMAIL)) {
+                TransactionTestPlugin.callHandler();
                 // note the ctx is not passed here, so we are not inside the ongoing transaction
                 const adminRepository = this.connection.getRepository(Administrator);
                 try {

+ 95 - 0
packages/core/e2e/import.e2e-spec.ts

@@ -1,7 +1,9 @@
 import { omit } from '@vendure/common/lib/omit';
 import { User } from '@vendure/core';
 import { createTestEnvironment } from '@vendure/testing';
+import * as fs from 'fs';
 import gql from 'graphql-tag';
+import http from 'http';
 import path from 'path';
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
@@ -407,4 +409,97 @@ describe('Import resolver', () => {
         // Import localeString custom fields
         expect(paperStretcher.customFields.localName).toEqual('纸张拉伸器');
     }, 20000);
+
+    describe('asset urls', () => {
+        let staticServer: http.Server;
+
+        beforeAll(() => {
+            // Set up minimal static file server
+            staticServer = http
+                .createServer((req, res) => {
+                    const filePath = path.join(__dirname, 'fixtures/assets', req?.url ?? '');
+                    fs.readFile(filePath, (err, data) => {
+                        if (err) {
+                            res.writeHead(404);
+                            res.end(JSON.stringify(err));
+                            return;
+                        }
+                        res.writeHead(200);
+                        res.end(data);
+                    });
+                })
+                .listen(3456);
+        });
+
+        afterAll(() => {
+            if (staticServer) {
+                return new Promise<void>((resolve, reject) => {
+                    staticServer.close(err => {
+                        if (err) {
+                            reject(err);
+                        } else {
+                            resolve();
+                        }
+                    });
+                });
+            }
+        });
+
+        it('imports assets with url paths', async () => {
+            const timeout = process.env.CI ? 2000 : 1000;
+            await new Promise(resolve => {
+                setTimeout(resolve, timeout);
+            });
+
+            const csvFile = path.join(__dirname, 'fixtures', 'e2e-product-import-asset-urls.csv');
+            const result = await adminClient.fileUploadMutation({
+                mutation: gql`
+                    mutation ImportProducts($csvFile: Upload!) {
+                        importProducts(csvFile: $csvFile) {
+                            imported
+                            processed
+                            errors
+                        }
+                    }
+                `,
+                filePaths: [csvFile],
+                mapVariables: () => ({ csvFile: null }),
+            });
+
+            expect(result.importProducts.errors).toEqual([]);
+            expect(result.importProducts.imported).toBe(1);
+            expect(result.importProducts.processed).toBe(1);
+
+            const productResult = await adminClient.query(
+                gql`
+                    query GetProducts($options: ProductListOptions) {
+                        products(options: $options) {
+                            totalItems
+                            items {
+                                id
+                                name
+                                featuredAsset {
+                                    id
+                                    name
+                                    preview
+                                }
+                            }
+                        }
+                    }
+                `,
+                {
+                    options: {
+                        filter: {
+                            name: { contains: 'guitar' },
+                        },
+                    },
+                },
+            );
+
+            expect(productResult.products.items.length).toBe(1);
+            expect(productResult.products.items[0].featuredAsset.preview).toBe(
+                'test-url/test-assets/guitar__preview.png',
+            );
+        });
+    });
 });

+ 1 - 1
packages/core/package.json

@@ -77,7 +77,7 @@
     "progress": "^2.0.3",
     "reflect-metadata": "^0.1.13",
     "rxjs": "^7.5.4",
-    "typeorm": "0.2.41"
+    "typeorm": "0.2.45"
   },
   "devDependencies": {
     "@types/bcrypt": "^3.0.0",

+ 25 - 0
packages/core/src/config/asset-import-strategy/asset-import-strategy.ts

@@ -0,0 +1,25 @@
+import { Readable } from 'stream';
+
+import { InjectableStrategy } from '../../common/types/injectable-strategy';
+
+/**
+ * @description
+ * The AssetImportStrategy determines how asset files get imported based on the path given in the
+ * import CSV or via the {@link AssetImporter} `getAssets()` method.
+ *
+ * The {@link DefaultAssetImportStrategy} is able to load files from either the local filesystem
+ * or from a remote URL.
+ *
+ * A custom strategy could be created which could e.g. get the asset file from an S3 bucket.
+ *
+ * @since 1.7.0
+ * @docsCategory import-export
+ */
+export interface AssetImportStrategy extends InjectableStrategy {
+    /**
+     * @description
+     * Given an asset path, this method should return a Stream of file data. This could
+     * e.g. be read from a file system or fetch from a remote location.
+     */
+    getStreamFromPath(assetPath: string): Readable | Promise<Readable>;
+}

+ 106 - 0
packages/core/src/config/asset-import-strategy/default-asset-import-strategy.ts

@@ -0,0 +1,106 @@
+import fs from 'fs-extra';
+import http from 'http';
+import https from 'https';
+import path from 'path';
+import { from } from 'rxjs';
+import { delay, retryWhen, take, tap } from 'rxjs/operators';
+import { Readable } from 'stream';
+import { URL } from 'url';
+
+import { Injector } from '../../common/index';
+import { ConfigService } from '../config.service';
+import { Logger } from '../logger/vendure-logger';
+
+import { AssetImportStrategy } from './asset-import-strategy';
+
+function fetchUrl(urlString: string): Promise<Readable> {
+    return new Promise((resolve, reject) => {
+        const url = new URL(urlString);
+        const get = url.protocol.startsWith('https') ? https.get : http.get;
+        get(
+            url,
+            {
+                timeout: 5000,
+            },
+            res => {
+                const { statusCode } = res;
+                if (statusCode !== 200) {
+                    Logger.error(`Failed to fetch "${urlString.substr(0, 100)}", statusCode: ${statusCode}`);
+                    reject(new Error(`Request failed. Status code: ${statusCode}`));
+                } else {
+                    resolve(res);
+                }
+            },
+        );
+    });
+}
+
+/**
+ * @description
+ * The DefaultAssetImportStrategy is able to import paths from the local filesystem (taking into account the
+ * `importExportOptions.importAssetsDir` setting) as well as remote http/https urls.
+ *
+ * @since 1.7.0
+ * @docsCategory import-export
+ */
+export class DefaultAssetImportStrategy implements AssetImportStrategy {
+    private configService: ConfigService;
+
+    constructor(
+        private options?: {
+            retryDelayMs: number;
+            retryCount: number;
+        },
+    ) {}
+
+    init(injector: Injector) {
+        this.configService = injector.get(ConfigService);
+    }
+
+    getStreamFromPath(assetPath: string) {
+        if (/^https?:\/\//.test(assetPath)) {
+            return this.getStreamFromUrl(assetPath);
+        } else {
+            return this.getStreamFromLocalFile(assetPath);
+        }
+    }
+
+    private getStreamFromUrl(assetUrl: string): Promise<Readable> {
+        const { retryCount, retryDelayMs } = this.options ?? {};
+        return from(fetchUrl(assetUrl))
+            .pipe(
+                retryWhen(errors =>
+                    errors.pipe(
+                        tap(value => {
+                            Logger.verbose(value);
+                            Logger.verbose(`DefaultAssetImportStrategy: retrying fetchUrl for ${assetUrl}`);
+                        }),
+                        delay(retryDelayMs ?? 200),
+                        take(retryCount ?? 3),
+                    ),
+                ),
+            )
+            .toPromise();
+    }
+
+    private getStreamFromLocalFile(assetPath: string): Readable {
+        const { importAssetsDir } = this.configService.importExportOptions;
+        const filename = path.join(importAssetsDir, assetPath);
+
+        if (fs.existsSync(filename)) {
+            const fileStat = fs.statSync(filename);
+            if (fileStat.isFile()) {
+                try {
+                    const stream = fs.createReadStream(filename);
+                    return stream;
+                } catch (err) {
+                    throw err;
+                }
+            } else {
+                throw new Error(`Could not find file "${filename}"`);
+            }
+        } else {
+            throw new Error(`File "${filename}" does not exist`);
+        }
+    }
+}

+ 2 - 0
packages/core/src/config/config.module.ts

@@ -83,6 +83,7 @@ export class ConfigModule implements OnApplicationBootstrap, OnApplicationShutdo
         const { entityIdStrategy: entityIdStrategyDeprecated } = this.configService;
         const { entityIdStrategy } = this.configService.entityOptions;
         const { healthChecks } = this.configService.systemOptions;
+        const { assetImportStrategy } = this.configService.importExportOptions;
         return [
             ...adminAuthenticationStrategy,
             ...shopAuthenticationStrategy,
@@ -109,6 +110,7 @@ export class ConfigModule implements OnApplicationBootstrap, OnApplicationShutdo
             stockAllocationStrategy,
             stockDisplayStrategy,
             ...healthChecks,
+            assetImportStrategy,
         ];
     }
 

+ 2 - 0
packages/core/src/config/default-config.ts

@@ -9,6 +9,7 @@ import { TypeORMHealthCheckStrategy } from '../health-check/typeorm-health-check
 import { InMemoryJobQueueStrategy } from '../job-queue/in-memory-job-queue-strategy';
 import { InMemoryJobBufferStorageStrategy } from '../job-queue/job-buffer/in-memory-job-buffer-storage-strategy';
 
+import { DefaultAssetImportStrategy } from './asset-import-strategy/default-asset-import-strategy';
 import { DefaultAssetNamingStrategy } from './asset-naming-strategy/default-asset-naming-strategy';
 import { NoAssetPreviewStrategy } from './asset-preview-strategy/no-asset-preview-strategy';
 import { NoAssetStorageStrategy } from './asset-storage-strategy/no-asset-storage-strategy';
@@ -148,6 +149,7 @@ export const defaultConfig: RuntimeVendureConfig = {
     },
     importExportOptions: {
         importAssetsDir: __dirname,
+        assetImportStrategy: new DefaultAssetImportStrategy(),
     },
     jobQueueOptions: {
         jobQueueStrategy: new InMemoryJobQueueStrategy(),

+ 1 - 0
packages/core/src/config/index.ts

@@ -1,3 +1,4 @@
+export * from './asset-import-strategy/asset-import-strategy';
 export * from './asset-naming-strategy/asset-naming-strategy';
 export * from './asset-naming-strategy/default-asset-naming-strategy';
 export * from './asset-preview-strategy/asset-preview-strategy';

+ 9 - 0
packages/core/src/config/vendure-config.ts

@@ -9,6 +9,7 @@ import { Middleware } from '../common';
 import { PermissionDefinition } from '../common/permission-definition';
 import { JobBufferStorageStrategy } from '../job-queue/job-buffer/job-buffer-storage-strategy';
 
+import { AssetImportStrategy } from './asset-import-strategy/asset-import-strategy';
 import { AssetNamingStrategy } from './asset-naming-strategy/asset-naming-strategy';
 import { AssetPreviewStrategy } from './asset-preview-strategy/asset-preview-strategy';
 import { AssetStorageStrategy } from './asset-storage-strategy/asset-storage-strategy';
@@ -773,6 +774,14 @@ export interface ImportExportOptions {
      * @default __dirname
      */
     importAssetsDir?: string;
+    /**
+     * @description
+     * This strategy determines how asset files get imported based on the path given in the
+     * import CSV or via the {@link AssetImporter} `getAssets()` method.
+     *
+     * @since 1.7.0
+     */
+    assetImportStrategy?: AssetImportStrategy;
 }
 
 /**

+ 51 - 10
packages/core/src/connection/transaction-subscriber.ts

@@ -1,14 +1,28 @@
 import { Injectable } from '@nestjs/common';
 import { InjectConnection } from '@nestjs/typeorm';
-import { lastValueFrom, merge, Subject } from 'rxjs';
-import { delay, filter, map, take } from 'rxjs/operators';
+import { lastValueFrom, merge, ObservableInput, Subject } from 'rxjs';
+import { delay, filter, map, take, tap } from 'rxjs/operators';
 import { Connection, EntitySubscriberInterface } from 'typeorm';
 import { EntityManager } from 'typeorm/entity-manager/EntityManager';
 import { QueryRunner } from 'typeorm/query-runner/QueryRunner';
 import { TransactionCommitEvent } from 'typeorm/subscriber/event/TransactionCommitEvent';
 import { TransactionRollbackEvent } from 'typeorm/subscriber/event/TransactionRollbackEvent';
 
+/**
+ * This error should be thrown by an event subscription if types do not match
+ *
+ * @internal
+ */
+export class TransactionSubscriberError extends Error {}
+
+export type TransactionSubscriberEventType = 'commit' | 'rollback';
+
 export interface TransactionSubscriberEvent {
+    /**
+     * Event type. Either commit or rollback.
+     */
+    type: TransactionSubscriberEventType;
+
     /**
      * Connection used in the event.
      */
@@ -34,8 +48,7 @@ export interface TransactionSubscriberEvent {
  */
 @Injectable()
 export class TransactionSubscriber implements EntitySubscriberInterface {
-    private commit$ = new Subject<TransactionSubscriberEvent>();
-    private rollback$ = new Subject<TransactionSubscriberEvent>();
+    private subject$ = new Subject<TransactionSubscriberEvent>();
 
     constructor(@InjectConnection() private connection: Connection) {
         if (!connection.subscribers.find(subscriber => subscriber.constructor === TransactionSubscriber)) {
@@ -44,19 +57,47 @@ export class TransactionSubscriber implements EntitySubscriberInterface {
     }
 
     afterTransactionCommit(event: TransactionCommitEvent) {
-        this.commit$.next(event);
+        this.subject$.next({
+            type: 'commit',
+            ...event,
+        });
     }
 
     afterTransactionRollback(event: TransactionRollbackEvent) {
-        this.rollback$.next(event);
+        this.subject$.next({
+            type: 'rollback',
+            ...event,
+        });
+    }
+
+    awaitCommit(queryRunner: QueryRunner): Promise<QueryRunner> {
+        return this.awaitTransactionEvent(queryRunner, 'commit');
+    }
+
+    awaitRollback(queryRunner: QueryRunner): Promise<QueryRunner> {
+        return this.awaitTransactionEvent(queryRunner, 'rollback');
     }
 
     awaitRelease(queryRunner: QueryRunner): Promise<QueryRunner> {
+        return this.awaitTransactionEvent(queryRunner);
+    }
+
+    private awaitTransactionEvent(
+        queryRunner: QueryRunner,
+        type?: TransactionSubscriberEventType,
+    ): Promise<QueryRunner> {
         if (queryRunner.isTransactionActive) {
-            return lastValueFrom(
-                merge(this.commit$, this.rollback$).pipe(
-                    filter(event => event.queryRunner === queryRunner),
+            return lastValueFrom(this.subject$
+                .pipe(
+                    filter(
+                        event => !event.queryRunner.isTransactionActive && event.queryRunner === queryRunner,
+                    ),
                     take(1),
+                    tap(event => {
+                        if (type && event.type !== type) {
+                            throw new TransactionSubscriberError(`Unexpected event type: ${event.type}. Expected ${type}.`);
+                        }
+                    }),
                     map(event => event.queryRunner),
                     // This `delay(0)` call appears to be necessary with the upgrade to TypeORM
                     // v0.2.41, otherwise an active queryRunner can still get picked up in an event
@@ -65,7 +106,7 @@ export class TransactionSubscriber implements EntitySubscriberInterface {
                     // in the database-transactions.e2e-spec.ts suite, and a bunch of errors
                     // in the default-search-plugin.e2e-spec.ts suite when using sqljs.
                     delay(0),
-                ),
+                )
             );
         } else {
             return Promise.resolve(queryRunner);

+ 9 - 10
packages/core/src/connection/transaction-wrapper.ts

@@ -1,6 +1,6 @@
 import { from, lastValueFrom, Observable, of } from 'rxjs';
 import { retryWhen, take, tap } from 'rxjs/operators';
-import { Connection, QueryRunner } from 'typeorm';
+import { Connection, EntityManager, QueryRunner } from 'typeorm';
 import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';
 
 import { RequestContext } from '../api/common/request-context';
@@ -32,14 +32,9 @@ export class TransactionWrapper {
         // Copy to make sure original context will remain valid after transaction completes
         const ctx = originalCtx.copy();
 
-        const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY];
-        if (queryRunnerExists) {
-            // If a QueryRunner already exists on the RequestContext, there must be an existing
-            // outer transaction in progress. In that case, we just execute the work function
-            // as usual without needing to further wrap in a transaction.
-            return lastValueFrom(from(work(ctx)));
-        }
-        const queryRunner = connection.createQueryRunner();
+        const entityManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY];
+        const queryRunner = entityManager?.queryRunner || connection.createQueryRunner();
+
         if (mode === 'auto') {
             await this.startTransaction(queryRunner);
         }
@@ -71,7 +66,11 @@ export class TransactionWrapper {
             }
             throw error;
         } finally {
-            if (queryRunner?.isReleased === false) {
+            if (!queryRunner.isTransactionActive
+                && queryRunner.isReleased === false) {
+                // There is a check for an active transaction
+                // because this could be a nested transaction (savepoint).
+
                 await queryRunner.release();
             }
         }

+ 13 - 19
packages/core/src/data-import/providers/asset-importer/asset-importer.ts

@@ -1,8 +1,8 @@
 import { Injectable } from '@nestjs/common';
-import fs from 'fs-extra';
 import path from 'path';
 
 import { RequestContext } from '../../../api/index';
+import { isGraphQlErrorResult } from '../../../common/index';
 import { ConfigService } from '../../../config/config.service';
 import { Asset } from '../../../entity/asset/asset.entity';
 import { AssetService } from '../../../service/services/asset.service';
@@ -33,32 +33,26 @@ export class AssetImporter {
     ): Promise<{ assets: Asset[]; errors: string[] }> {
         const assets: Asset[] = [];
         const errors: string[] = [];
-        const { importAssetsDir } = this.configService.importExportOptions;
+        const { assetImportStrategy } = this.configService.importExportOptions;
         const uniqueAssetPaths = new Set(assetPaths);
         for (const assetPath of uniqueAssetPaths.values()) {
             const cachedAsset = this.assetMap.get(assetPath);
             if (cachedAsset) {
                 assets.push(cachedAsset);
             } else {
-                const filename = path.join(importAssetsDir, assetPath);
-
-                if (fs.existsSync(filename)) {
-                    const fileStat = fs.statSync(filename);
-                    if (fileStat.isFile()) {
-                        try {
-                            const stream = fs.createReadStream(filename);
-                            const asset = (await this.assetService.createFromFileStream(
-                                stream,
-                                ctx,
-                            )) as Asset;
-                            this.assetMap.set(assetPath, asset);
-                            assets.push(asset);
-                        } catch (err: any) {
-                            errors.push(err.toString());
+                try {
+                    const stream = await assetImportStrategy.getStreamFromPath(assetPath);
+                    if (stream) {
+                        const asset = await this.assetService.createFromFileStream(stream, assetPath, ctx);
+                        if (isGraphQlErrorResult(asset)) {
+                            errors.push(asset.message);
+                        } else {
+                            this.assetMap.set(assetPath, asset as Asset);
+                            assets.push(asset as Asset);
                         }
                     }
-                } else {
-                    errors.push(`File "${filename}" does not exist`);
+                } catch (e: any) {
+                    errors.push(e.message);
                 }
             }
         }

+ 17 - 4
packages/core/src/event-bus/event-bus.ts

@@ -3,10 +3,11 @@ import { Type } from '@vendure/common/lib/shared-types';
 import { Observable, Subject } from 'rxjs';
 import { filter, mergeMap, takeUntil } from 'rxjs/operators';
 import { EntityManager } from 'typeorm';
+import { notNullOrUndefined } from '../../../common/lib/shared-utils';
 
 import { RequestContext } from '../api/common/request-context';
 import { TRANSACTION_MANAGER_KEY } from '../common/constants';
-import { TransactionSubscriber } from '../connection/transaction-subscriber';
+import { TransactionSubscriber, TransactionSubscriberError } from '../connection/transaction-subscriber';
 
 import { VendureEvent } from './vendure-event';
 
@@ -82,6 +83,7 @@ export class EventBus implements OnModuleDestroy {
             takeUntil(this.destroy$),
             filter(e => (e as any).constructor === type),
             mergeMap(event => this.awaitActiveTransactions(event)),
+            filter(notNullOrUndefined)
         ) as Observable<T>;
     }
 
@@ -109,7 +111,7 @@ export class EventBus implements OnModuleDestroy {
      * * https://github.com/vendure-ecommerce/vendure/issues/520
      * * https://github.com/vendure-ecommerce/vendure/issues/1107
      */
-    private async awaitActiveTransactions<T extends VendureEvent>(event: T): Promise<T> {
+    private async awaitActiveTransactions<T extends VendureEvent>(event: T): Promise<T | undefined> {
         const entry = Object.entries(event).find(([_, value]) => value instanceof RequestContext);
 
         if (!entry) {
@@ -123,7 +125,9 @@ export class EventBus implements OnModuleDestroy {
             return event;
         }
 
-        return this.transactionSubscriber.awaitRelease(transactionManager.queryRunner).then(() => {
+        try {
+            await this.transactionSubscriber.awaitCommit(transactionManager.queryRunner);
+
             // Copy context and remove transaction manager
             // This will prevent queries to released query runner
             const newContext = ctx.copy();
@@ -133,6 +137,15 @@ export class EventBus implements OnModuleDestroy {
             (event as any)[key] = newContext
 
             return event;
-        });
+        } catch (e: any) {
+            if (e instanceof TransactionSubscriberError) {
+                // Expected commit, but rollback or something else happened.
+                // This is still reliable behavior, return undefined
+                // as event should not be exposed from this transaction
+                return;
+            }
+
+            throw e;
+        }
     }
 }

+ 25 - 4
packages/core/src/job-queue/subscribable-job.ts

@@ -24,6 +24,28 @@ export type JobUpdate<T extends JobData<T>> = Pick<
     'id' | 'state' | 'progress' | 'result' | 'error' | 'data'
 >;
 
+/**
+ * @description
+ * Job update options, that you can specify by calling {@link SubscribableJob.updates updates()} method.
+ * 
+ * @docsCategory JobQueue
+ * @docsPage types
+ */
+export type JobUpdateOptions = { 
+    /**
+     * Polling interval. Defaults to 200ms
+     */
+    pollInterval?: number; 
+    /**
+     * Polling timeout in milliseconds. Defaults to 1 hour
+     */
+    timeoutMs?: number;
+    /**
+     * Observable sequence will end with an error if true. Default to false
+     */
+    errorOnFail?: boolean; 
+};
+
 /**
  * @description
  * This is a type of Job object that allows you to subscribe to updates to the Job. It is returned
@@ -52,10 +74,9 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
      * Returns an Observable stream of updates to the Job. Works by polling the current JobQueueStrategy's `findOne()` method
      * to obtain updates. If this updates are not subscribed to, then no polling occurs.
      *
-     * The polling interval defaults to 200ms, but can be configured by passing in an options argument. Polling will also timeout
-     * after 1 hour, but this timeout can also be configured by passing the `timeoutMs` option.
+     * Polling interval, timeout and other options may be configured with an options arguments {@link JobUpdateOptions}.
      */
-    updates(options?: { pollInterval?: number; timeoutMs?: number }): Observable<JobUpdate<T>> {
+    updates(options?: JobUpdateOptions): Observable<JobUpdate<T>> {
         const pollInterval = Math.max(50, options?.pollInterval ?? 200);
         const timeoutMs = Math.max(pollInterval, options?.timeoutMs ?? ms('1h'));
         const strategy = this.jobQueueStrategy;
@@ -90,7 +111,7 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
                     true,
                 ),
                 tap(job => {
-                    if (job.state === JobState.FAILED) {
+                    if (job.state === JobState.FAILED && (options?.errorOnFail ?? true)) {
                         throw new Error(job.error);
                     }
                 }),

+ 25 - 4
packages/core/src/service/services/asset.service.ts

@@ -17,6 +17,7 @@ import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 import { unique } from '@vendure/common/lib/unique';
 import { ReadStream as FSReadStream } from 'fs';
 import { ReadStream } from 'fs-extra';
+import { IncomingMessage } from 'http';
 import mime from 'mime-types';
 import path from 'path';
 import { Readable, Stream } from 'stream';
@@ -433,26 +434,46 @@ export class AssetService {
      * Create an Asset from a file stream, for example to create an Asset during data import.
      */
     async createFromFileStream(stream: ReadStream, ctx?: RequestContext): Promise<CreateAssetResult>;
-    async createFromFileStream(stream: Readable, filePath: string): Promise<CreateAssetResult>;
+    async createFromFileStream(
+        stream: Readable,
+        filePath: string,
+        ctx?: RequestContext,
+    ): Promise<CreateAssetResult>;
     async createFromFileStream(
         stream: ReadStream | Readable,
         maybeFilePathOrCtx?: string | RequestContext,
+        maybeCtx?: RequestContext,
     ): Promise<CreateAssetResult> {
+        const { assetImportStrategy } = this.configService.importExportOptions;
         const filePathFromArgs =
             maybeFilePathOrCtx instanceof RequestContext ? undefined : maybeFilePathOrCtx;
         const filePath =
             stream instanceof ReadStream || stream instanceof FSReadStream ? stream.path : filePathFromArgs;
         if (typeof filePath === 'string') {
-            const filename = path.basename(filePath);
-            const mimetype = mime.lookup(filename) || 'application/octet-stream';
+            const filename = path.basename(filePath).split('?')[0];
+            const mimetype = this.getMimeType(stream, filename);
             const ctx =
-                maybeFilePathOrCtx instanceof RequestContext ? maybeFilePathOrCtx : RequestContext.empty();
+                maybeFilePathOrCtx instanceof RequestContext
+                    ? maybeFilePathOrCtx
+                    : maybeCtx instanceof RequestContext
+                    ? maybeCtx
+                    : RequestContext.empty();
             return this.createAssetInternal(ctx, stream, filename, mimetype);
         } else {
             throw new InternalServerError(`error.path-should-be-a-string-got-buffer`);
         }
     }
 
+    private getMimeType(stream: Readable, filename: string): string {
+        if (stream instanceof IncomingMessage) {
+            const contentType = stream.headers['content-type'];
+            if (contentType) {
+                return contentType;
+            }
+        }
+        return mime.lookup(filename) || 'application/octet-stream';
+    }
+
     /**
      * @description
      * Unconditionally delete given assets.

+ 5 - 4
yarn.lock

@@ -17732,10 +17732,10 @@ typedarray@^0.0.6:
   resolved "https://registry.npmjs.org/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777"
   integrity sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c=
 
-typeorm@0.2.41:
-  version "0.2.41"
-  resolved "https://registry.npmjs.org/typeorm/-/typeorm-0.2.41.tgz#88758101ac158dc0a0a903d70eaacea2974281cc"
-  integrity sha512-/d8CLJJxKPgsnrZWiMyPI0rz2MFZnBQrnQ5XP3Vu3mswv2WPexb58QM6BEtmRmlTMYN5KFWUz8SKluze+wS9xw==
+typeorm@0.2.45:
+  version "0.2.45"
+  resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.2.45.tgz#e5bbb3af822dc4646bad96cfa48cd22fa4687cea"
+  integrity sha512-c0rCO8VMJ3ER7JQ73xfk0zDnVv0WDjpsP6Q1m6CVKul7DB9iVdWLRjPzc8v2eaeBuomsbZ2+gTaYr8k1gm3bYA==
   dependencies:
     "@sqltools/formatter" "^1.2.2"
     app-root-path "^3.0.0"
@@ -17750,6 +17750,7 @@ typeorm@0.2.41:
     reflect-metadata "^0.1.13"
     sha.js "^2.4.11"
     tslib "^2.1.0"
+    uuid "^8.3.2"
     xml2js "^0.4.23"
     yargs "^17.0.1"
     zen-observable-ts "^1.0.0"