Browse Source

test(core): Add e2e tests for task scheduler

Michael Bromley 9 months ago
parent
commit
c466ab6b7f

+ 138 - 0
packages/core/e2e/default-scheduler-plugin.e2e-spec.ts

@@ -0,0 +1,138 @@
+import { DefaultSchedulerPlugin, mergeConfig, ScheduledTask } from '@vendure/core';
+import { createTestEnvironment } from '@vendure/testing';
+import gql from 'graphql-tag';
+import path from 'path';
+import { afterAll, beforeAll, describe, expect, it, vi } from 'vitest';
+
+import { initialData } from '../../../e2e-common/e2e-initial-data';
+import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
+
+import {
+    GetTasksQuery,
+    RunTaskMutation,
+    RunTaskMutationVariables,
+    UpdateTagMutationVariables,
+    UpdateTaskMutation,
+    UpdateTaskMutationVariables,
+} from './graphql/generated-e2e-admin-types';
+import { awaitRunningJobs } from './utils/await-running-jobs';
+
+describe('Default scheduler plugin', () => {
+    const taskSpy = vi.fn();
+
+    const { server, adminClient } = createTestEnvironment(
+        mergeConfig(testConfig(), {
+            schedulerOptions: {
+                tasks: [
+                    new ScheduledTask({
+                        id: 'test-job',
+                        description: "A test job that doesn't do anything",
+                        schedule: cron => cron.every(5).minutes(),
+                        async execute(injector) {
+                            taskSpy();
+                            return { success: true };
+                        },
+                    }),
+                ],
+                runTasksInWorkerOnly: false,
+            },
+            plugins: [DefaultSchedulerPlugin.init({ manualTriggerCheckInterval: 50 })],
+        }),
+    );
+
+    beforeAll(async () => {
+        await server.init({
+            initialData,
+            productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-full.csv'),
+            customerCount: 1,
+        });
+        await adminClient.asSuperAdmin();
+        // We have extra time here because a lot of jobs are
+        // triggered from all the product updates
+        await awaitRunningJobs(adminClient, 10_000, 1000);
+    }, TEST_SETUP_TIMEOUT_MS);
+
+    afterAll(async () => {
+        await awaitRunningJobs(adminClient);
+        await server.destroy();
+    });
+
+    it('get tasks', async () => {
+        const { scheduledTasks } = await adminClient.query<GetTasksQuery>(GET_TASKS);
+        expect(scheduledTasks.length).toBe(1);
+        expect(scheduledTasks[0].id).toBe('test-job');
+        expect(scheduledTasks[0].description).toBe("A test job that doesn't do anything");
+        expect(scheduledTasks[0].schedule).toBe('*/5 * * * *');
+        expect(scheduledTasks[0].scheduleDescription).toBe('Every 5 minutes');
+        expect(scheduledTasks[0].enabled).toBe(true);
+    });
+
+    it('disable task', async () => {
+        const { updateScheduledTask } = await adminClient.query<
+            UpdateTaskMutation,
+            UpdateTaskMutationVariables
+        >(UPDATE_TASK, {
+            input: {
+                id: 'test-job',
+                enabled: false,
+            },
+        });
+        expect(updateScheduledTask.enabled).toBe(false);
+    });
+
+    it('enable task', async () => {
+        const { updateScheduledTask } = await adminClient.query<
+            UpdateTaskMutation,
+            UpdateTaskMutationVariables
+        >(UPDATE_TASK, {
+            input: {
+                id: 'test-job',
+                enabled: true,
+            },
+        });
+        expect(updateScheduledTask.enabled).toBe(true);
+    });
+
+    it('run task', async () => {
+        expect(taskSpy).toHaveBeenCalledTimes(0);
+
+        const { runScheduledTask } = await adminClient.query<RunTaskMutation, RunTaskMutationVariables>(
+            RUN_TASK,
+            { id: 'test-job' },
+        );
+        expect(runScheduledTask.success).toBe(true);
+
+        await new Promise(resolve => setTimeout(resolve, 100));
+        expect(taskSpy).toHaveBeenCalledTimes(1);
+    });
+});
+
+export const GET_TASKS = gql`
+    query GetTasks {
+        scheduledTasks {
+            id
+            description
+            schedule
+            scheduleDescription
+            lastResult
+            enabled
+        }
+    }
+`;
+
+export const UPDATE_TASK = gql`
+    mutation UpdateTask($input: UpdateScheduledTaskInput!) {
+        updateScheduledTask(input: $input) {
+            id
+            enabled
+        }
+    }
+`;
+
+export const RUN_TASK = gql`
+    mutation RunTask($id: String!) {
+        runScheduledTask(id: $id) {
+            success
+        }
+    }
+`;

+ 2 - 4
packages/core/e2e/default-search-plugin-uuids.e2e-spec.ts

@@ -1,11 +1,11 @@
 /* eslint-disable @typescript-eslint/no-non-null-assertion */
 import { DefaultJobQueuePlugin, DefaultSearchPlugin, mergeConfig, UuidIdStrategy } from '@vendure/core';
-import { createTestEnvironment, registerInitializer, SqljsInitializer } from '@vendure/testing';
+import { createTestEnvironment } from '@vendure/testing';
 import path from 'path';
 import { afterAll, beforeAll, describe, expect, it } from 'vitest';
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
-import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
+import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
 
 import {
     FacetValueFragment,
@@ -21,8 +21,6 @@ import { GET_FACET_LIST } from './graphql/shared-definitions';
 import { SEARCH_PRODUCTS_SHOP } from './graphql/shop-definitions';
 import { awaitRunningJobs } from './utils/await-running-jobs';
 
-registerInitializer('sqljs', new SqljsInitializer(path.join(__dirname, '__data__'), 1000));
-
 describe('Default search plugin with UUIDs', () => {
     const { server, adminClient, shopClient } = createTestEnvironment(
         mergeConfig(testConfig(), {

+ 44 - 52
packages/core/e2e/default-search-plugin.e2e-spec.ts

@@ -6,75 +6,69 @@ import {
     facetValueCollectionFilter,
     mergeConfig,
 } from '@vendure/core';
-import {
-    createTestEnvironment,
-    E2E_DEFAULT_CHANNEL_TOKEN,
-    registerInitializer,
-    SimpleGraphQLClient,
-    SqljsInitializer,
-} from '@vendure/testing';
+import { createTestEnvironment, E2E_DEFAULT_CHANNEL_TOKEN, SimpleGraphQLClient } from '@vendure/testing';
 import gql from 'graphql-tag';
 import path from 'path';
 import { afterAll, beforeAll, describe, expect, it } from 'vitest';
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
-import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
+import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
 
 import { SEARCH_PRODUCTS_ADMIN } from './graphql/admin-definitions';
 import {
+    AssignProductsToChannelMutation,
+    AssignProductsToChannelMutationVariables,
+    AssignProductVariantsToChannelMutation,
+    AssignProductVariantsToChannelMutationVariables,
     ChannelFragment,
-    CurrencyCode,
-    LanguageCode,
-    SearchInput,
-    SearchResultSortParameter,
-    SortOrder,
-    SearchProductsAdminQuery,
-    SearchProductsAdminQueryVariables,
-    SearchFacetValuesQuery,
-    SearchFacetValuesQueryVariables,
-    UpdateProductMutation,
-    UpdateProductMutationVariables,
-    SearchCollectionsQuery,
-    SearchCollectionsQueryVariables,
-    SearchGetPricesQuery,
-    SearchGetPricesQueryVariables,
-    CreateFacetMutation,
-    CreateFacetMutationVariables,
-    UpdateProductVariantsMutation,
-    UpdateProductVariantsMutationVariables,
-    DeleteProductVariantMutation,
-    DeleteProductVariantMutationVariables,
-    DeleteProductMutation,
-    DeleteProductMutationVariables,
-    UpdateCollectionMutation,
-    UpdateCollectionMutationVariables,
+    CreateChannelMutation,
+    CreateChannelMutationVariables,
     CreateCollectionMutation,
     CreateCollectionMutationVariables,
-    UpdateTaxRateMutation,
-    UpdateTaxRateMutationVariables,
-    SearchGetAssetsQuery,
-    SearchGetAssetsQueryVariables,
-    UpdateAssetMutation,
-    UpdateAssetMutationVariables,
-    DeleteAssetMutation,
-    DeleteAssetMutationVariables,
-    ReindexMutation,
+    CreateFacetMutation,
+    CreateFacetMutationVariables,
     CreateProductMutation,
     CreateProductMutationVariables,
     CreateProductVariantsMutation,
     CreateProductVariantsMutationVariables,
-    CreateChannelMutation,
-    CreateChannelMutationVariables,
-    AssignProductsToChannelMutation,
-    AssignProductsToChannelMutationVariables,
+    CurrencyCode,
+    DeleteAssetMutation,
+    DeleteAssetMutationVariables,
+    DeleteProductMutation,
+    DeleteProductMutationVariables,
+    DeleteProductVariantMutation,
+    DeleteProductVariantMutationVariables,
+    LanguageCode,
+    ReindexMutation,
     RemoveProductsFromChannelMutation,
     RemoveProductsFromChannelMutationVariables,
-    AssignProductVariantsToChannelMutation,
-    AssignProductVariantsToChannelMutationVariables,
     RemoveProductVariantsFromChannelMutation,
     RemoveProductVariantsFromChannelMutationVariables,
+    SearchCollectionsQuery,
+    SearchCollectionsQueryVariables,
+    SearchFacetValuesQuery,
+    SearchFacetValuesQueryVariables,
+    SearchGetAssetsQuery,
+    SearchGetAssetsQueryVariables,
+    SearchGetPricesQuery,
+    SearchGetPricesQueryVariables,
+    SearchInput,
+    SearchProductsAdminQuery,
+    SearchProductsAdminQueryVariables,
+    SearchResultSortParameter,
+    SortOrder,
+    UpdateAssetMutation,
+    UpdateAssetMutationVariables,
     UpdateChannelMutation,
     UpdateChannelMutationVariables,
+    UpdateCollectionMutation,
+    UpdateCollectionMutationVariables,
+    UpdateProductMutation,
+    UpdateProductMutationVariables,
+    UpdateProductVariantsMutation,
+    UpdateProductVariantsMutationVariables,
+    UpdateTaxRateMutation,
+    UpdateTaxRateMutationVariables,
 } from './graphql/generated-e2e-admin-types';
 import {
     LogicalOperator,
@@ -82,8 +76,8 @@ import {
     SearchProductsShopQueryVariables,
 } from './graphql/generated-e2e-shop-types';
 import {
-    ASSIGN_PRODUCTVARIANT_TO_CHANNEL,
     ASSIGN_PRODUCT_TO_CHANNEL,
+    ASSIGN_PRODUCTVARIANT_TO_CHANNEL,
     CREATE_CHANNEL,
     CREATE_COLLECTION,
     CREATE_FACET,
@@ -92,8 +86,8 @@ import {
     DELETE_ASSET,
     DELETE_PRODUCT,
     DELETE_PRODUCT_VARIANT,
-    REMOVE_PRODUCTVARIANT_FROM_CHANNEL,
     REMOVE_PRODUCT_FROM_CHANNEL,
+    REMOVE_PRODUCTVARIANT_FROM_CHANNEL,
     UPDATE_ASSET,
     UPDATE_CHANNEL,
     UPDATE_COLLECTION,
@@ -104,8 +98,6 @@ import {
 import { SEARCH_PRODUCTS_SHOP } from './graphql/shop-definitions';
 import { awaitRunningJobs } from './utils/await-running-jobs';
 
-registerInitializer('sqljs', new SqljsInitializer(path.join(__dirname, '__data__'), 1000));
-
 interface SearchProductsShopQueryVariablesExt extends SearchProductsShopQueryVariables {
     input: SearchProductsShopQueryVariables['input'] & {
         // This input field is dynamically added only when the `indexStockStatus` init option

+ 141 - 0
packages/core/e2e/graphql/generated-e2e-admin-types.ts

@@ -7050,6 +7050,31 @@ export type AddNoteToCustomerMutation = {
     };
 };
 
+export type GetTasksQueryVariables = Exact<{ [key: string]: never }>;
+
+export type GetTasksQuery = {
+    scheduledTasks: Array<{
+        id: string;
+        description: string;
+        schedule: string;
+        scheduleDescription: string;
+        lastResult?: any | null;
+        enabled: boolean;
+    }>;
+};
+
+export type UpdateTaskMutationVariables = Exact<{
+    input: UpdateScheduledTaskInput;
+}>;
+
+export type UpdateTaskMutation = { updateScheduledTask: { id: string; enabled: boolean } };
+
+export type RunTaskMutationVariables = Exact<{
+    id: Scalars['String']['input'];
+}>;
+
+export type RunTaskMutation = { runScheduledTask: { success: boolean } };
+
 export type ReindexMutationVariables = Exact<{ [key: string]: never }>;
 
 export type ReindexMutation = { reindex: { id: string } };
@@ -18064,6 +18089,122 @@ export const AddNoteToCustomerDocument = {
         },
     ],
 } as unknown as DocumentNode<AddNoteToCustomerMutation, AddNoteToCustomerMutationVariables>;
+export const GetTasksDocument = {
+    kind: 'Document',
+    definitions: [
+        {
+            kind: 'OperationDefinition',
+            operation: 'query',
+            name: { kind: 'Name', value: 'GetTasks' },
+            selectionSet: {
+                kind: 'SelectionSet',
+                selections: [
+                    {
+                        kind: 'Field',
+                        name: { kind: 'Name', value: 'scheduledTasks' },
+                        selectionSet: {
+                            kind: 'SelectionSet',
+                            selections: [
+                                { kind: 'Field', name: { kind: 'Name', value: 'id' } },
+                                { kind: 'Field', name: { kind: 'Name', value: 'description' } },
+                                { kind: 'Field', name: { kind: 'Name', value: 'schedule' } },
+                                { kind: 'Field', name: { kind: 'Name', value: 'scheduleDescription' } },
+                                { kind: 'Field', name: { kind: 'Name', value: 'lastResult' } },
+                                { kind: 'Field', name: { kind: 'Name', value: 'enabled' } },
+                            ],
+                        },
+                    },
+                ],
+            },
+        },
+    ],
+} as unknown as DocumentNode<GetTasksQuery, GetTasksQueryVariables>;
+export const UpdateTaskDocument = {
+    kind: 'Document',
+    definitions: [
+        {
+            kind: 'OperationDefinition',
+            operation: 'mutation',
+            name: { kind: 'Name', value: 'UpdateTask' },
+            variableDefinitions: [
+                {
+                    kind: 'VariableDefinition',
+                    variable: { kind: 'Variable', name: { kind: 'Name', value: 'input' } },
+                    type: {
+                        kind: 'NonNullType',
+                        type: {
+                            kind: 'NamedType',
+                            name: { kind: 'Name', value: 'UpdateScheduledTaskInput' },
+                        },
+                    },
+                },
+            ],
+            selectionSet: {
+                kind: 'SelectionSet',
+                selections: [
+                    {
+                        kind: 'Field',
+                        name: { kind: 'Name', value: 'updateScheduledTask' },
+                        arguments: [
+                            {
+                                kind: 'Argument',
+                                name: { kind: 'Name', value: 'input' },
+                                value: { kind: 'Variable', name: { kind: 'Name', value: 'input' } },
+                            },
+                        ],
+                        selectionSet: {
+                            kind: 'SelectionSet',
+                            selections: [
+                                { kind: 'Field', name: { kind: 'Name', value: 'id' } },
+                                { kind: 'Field', name: { kind: 'Name', value: 'enabled' } },
+                            ],
+                        },
+                    },
+                ],
+            },
+        },
+    ],
+} as unknown as DocumentNode<UpdateTaskMutation, UpdateTaskMutationVariables>;
+export const RunTaskDocument = {
+    kind: 'Document',
+    definitions: [
+        {
+            kind: 'OperationDefinition',
+            operation: 'mutation',
+            name: { kind: 'Name', value: 'RunTask' },
+            variableDefinitions: [
+                {
+                    kind: 'VariableDefinition',
+                    variable: { kind: 'Variable', name: { kind: 'Name', value: 'id' } },
+                    type: {
+                        kind: 'NonNullType',
+                        type: { kind: 'NamedType', name: { kind: 'Name', value: 'String' } },
+                    },
+                },
+            ],
+            selectionSet: {
+                kind: 'SelectionSet',
+                selections: [
+                    {
+                        kind: 'Field',
+                        name: { kind: 'Name', value: 'runScheduledTask' },
+                        arguments: [
+                            {
+                                kind: 'Argument',
+                                name: { kind: 'Name', value: 'id' },
+                                value: { kind: 'Variable', name: { kind: 'Name', value: 'id' } },
+                            },
+                        ],
+                        selectionSet: {
+                            kind: 'SelectionSet',
+                            selections: [{ kind: 'Field', name: { kind: 'Name', value: 'success' } }],
+                        },
+                    },
+                ],
+            },
+        },
+    ],
+} as unknown as DocumentNode<RunTaskMutation, RunTaskMutationVariables>;
 export const ReindexDocument = {
     kind: 'Document',
     definitions: [

+ 1 - 0
packages/core/src/config/default-config.ts

@@ -198,6 +198,7 @@ export const defaultConfig: RuntimeVendureConfig = {
     schedulerOptions: {
         schedulerStrategy: new NoopSchedulerStrategy(),
         tasks: [cleanSessionsTask],
+        runTasksInWorkerOnly: true,
     },
     customFields: {
         Address: [],

+ 12 - 0
packages/core/src/config/vendure-config.ts

@@ -985,6 +985,18 @@ export interface SchedulerOptions {
      * The tasks to be executed.
      */
     tasks?: ScheduledTask[];
+
+    /**
+     * @description
+     * Whether to run tasks only in the worker process. Generally this should
+     * be left as true, since tasks may involve expensive operations that should
+     * not be allowed to interfere with the server responsiveness.
+     *
+     * This option mainly exists for testing purposes.
+     *
+     * @default true
+     */
+    runTasksInWorkerOnly?: boolean;
 }
 
 /**

+ 22 - 7
packages/core/src/plugin/default-scheduler-plugin/default-scheduler-strategy.ts

@@ -4,6 +4,7 @@ import ms from 'ms';
 
 import { Injector } from '../../common';
 import { assertFound } from '../../common/utils';
+import { ConfigService } from '../../config/config.service';
 import { Logger } from '../../config/logger/vendure-logger';
 import { TransactionalConnection } from '../../connection';
 import { ProcessContext } from '../../process-context';
@@ -36,7 +37,11 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
         this.pluginOptions = injector.get(DEFAULT_SCHEDULER_PLUGIN_OPTIONS);
         this.injector = injector;
 
-        if (injector.get(ProcessContext).isWorker) {
+        const runTriggerCheck =
+            injector.get(ConfigService).schedulerOptions.runTasksInWorkerOnly === false ||
+            injector.get(ProcessContext).isWorker;
+
+        if (runTriggerCheck) {
             this.intervalRef = setInterval(
                 () => this.checkForManuallyTriggeredTasks(),
                 this.pluginOptions.manualTriggerCheckInterval as number,
@@ -131,7 +136,8 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
         };
     }
 
-    getTasks(): Promise<TaskReport[]> {
+    async getTasks(): Promise<TaskReport[]> {
+        await this.ensureAllTasksAreRegistered();
         return this.connection.rawConnection
             .getRepository(ScheduledTaskRecord)
             .createQueryBuilder('task')
@@ -141,7 +147,8 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
             });
     }
 
-    getTask(id: string): Promise<TaskReport | undefined> {
+    async getTask(id: string): Promise<TaskReport | undefined> {
+        await this.ensureTaskIsRegistered(id);
         return this.connection.rawConnection
             .getRepository(ScheduledTaskRecord)
             .createQueryBuilder('task')
@@ -205,18 +212,26 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
         };
     }
 
-    private async ensureTaskIsRegistered(task: ScheduledTask) {
-        if (!this.tasks.get(task.id)?.isRegistered) {
+    private async ensureAllTasksAreRegistered() {
+        for (const task of this.tasks.values()) {
+            await this.ensureTaskIsRegistered(task.task);
+        }
+    }
+
+    private async ensureTaskIsRegistered(taskOrId: ScheduledTask | string) {
+        const taskId = typeof taskOrId === 'string' ? taskOrId : taskOrId.id;
+        const task = this.tasks.get(taskId);
+        if (task && !task.isRegistered) {
             await this.connection.rawConnection
                 .getRepository(ScheduledTaskRecord)
                 .createQueryBuilder()
                 .insert()
                 .into(ScheduledTaskRecord)
-                .values({ taskId: task.id })
+                .values({ taskId })
                 .orIgnore()
                 .execute();
 
-            this.tasks.set(task.id, { task, isRegistered: true });
+            this.tasks.set(taskId, { task: task.task, isRegistered: true });
         }
     }
 }

+ 1 - 1
packages/core/src/plugin/default-scheduler-plugin/default-scheduler.plugin.ts

@@ -46,7 +46,7 @@ import { DefaultSchedulerPluginOptions } from './types';
     providers: [
         {
             provide: DEFAULT_SCHEDULER_PLUGIN_OPTIONS,
-            useValue: DefaultSchedulerPlugin.options,
+            useFactory: () => DefaultSchedulerPlugin.options,
         },
     ],
     compatibility: '>0.0.0',

+ 14 - 4
packages/core/src/scheduler/scheduler.service.ts

@@ -1,4 +1,4 @@
-import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
+import { Injectable, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common';
 import { Success, UpdateScheduledTaskInput } from '@vendure/common/lib/generated-types';
 import CronTime from 'cron-time-generator';
 import { Cron } from 'croner';
@@ -32,8 +32,9 @@ export interface TaskInfo {
  * @docsCategory scheduled-tasks
  */
 @Injectable()
-export class SchedulerService implements OnApplicationBootstrap {
+export class SchedulerService implements OnApplicationBootstrap, OnApplicationShutdown {
     private jobs: Map<string, { task: ScheduledTask; job: Cron }> = new Map();
+    private shouldRunTasks = false;
     constructor(
         private configService: ConfigService,
         private processContext: ProcessContext,
@@ -48,6 +49,9 @@ export class SchedulerService implements OnApplicationBootstrap {
             );
             return;
         }
+        this.shouldRunTasks =
+            this.configService.schedulerOptions.runTasksInWorkerOnly === false ||
+            this.processContext.isWorker;
         const scheduledTasks = this.configService.schedulerOptions.tasks ?? [];
 
         for (const task of scheduledTasks) {
@@ -56,7 +60,7 @@ export class SchedulerService implements OnApplicationBootstrap {
             if (!pattern) {
                 Logger.warn(`Invalid cron pattern for task ${task.id}`);
             } else {
-                if (this.processContext.isWorker) {
+                if (this.shouldRunTasks) {
                     const schedule = cronstrue.toString(pattern);
                     Logger.info(`Registered scheduled task: ${task.id} - ${schedule}`);
                 }
@@ -66,6 +70,12 @@ export class SchedulerService implements OnApplicationBootstrap {
         }
     }
 
+    onApplicationShutdown(signal?: string) {
+        for (const job of this.jobs.values()) {
+            job.job.stop();
+        }
+    }
+
     /**
      * @description
      * Returns a list of all the scheduled tasks and their current status.
@@ -152,7 +162,7 @@ export class SchedulerService implements OnApplicationBootstrap {
                 protect: task.options.preventOverlap ? protectCallback : undefined,
             },
             () => {
-                if (this.processContext.isWorker) {
+                if (this.shouldRunTasks) {
                     // Only execute the cron task on the worker process
                     // so that any expensive logic does not affect
                     // the responsiveness of server processes