Browse Source

feat(core): Implement cache invalidation by tags

Relates to #3043
Michael Bromley 1 year ago
parent
commit
382e314a7d

+ 66 - 0
packages/core/e2e/cache-service-default.e2e-spec.ts

@@ -0,0 +1,66 @@
+import { CacheService, DefaultCachePlugin, mergeConfig } from '@vendure/core';
+import { createTestEnvironment } from '@vendure/testing';
+import path from 'path';
+import { afterAll, beforeAll, describe, it } from 'vitest';
+
+import { initialData } from '../../../e2e-common/e2e-initial-data';
+import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
+import { TestingCacheTtlProvider } from '../src/cache/cache-ttl-provider';
+
+import {
+    deletesAKey,
+    evictsTheOldestKeyWhenCacheIsFull,
+    getReturnsUndefinedForNonExistentKey,
+    invalidatesALargeNumberOfKeysByTag,
+    invalidatesByMultipleTags,
+    invalidatesBySingleTag,
+    setsAKey,
+    setsAKeyWithTtl,
+} from './fixtures/cache-service-shared-tests';
+
+describe('CacheService with DefaultCachePlugin (sql)', () => {
+    const ttlProvider = new TestingCacheTtlProvider();
+
+    let cacheService: CacheService;
+    const { server, adminClient } = createTestEnvironment(
+        mergeConfig(testConfig(), {
+            plugins: [
+                DefaultCachePlugin.init({
+                    cacheSize: 5,
+                    cacheTtlProvider: ttlProvider,
+                }),
+            ],
+        }),
+    );
+
+    beforeAll(async () => {
+        await server.init({
+            initialData,
+            productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-minimal.csv'),
+            customerCount: 1,
+        });
+        await adminClient.asSuperAdmin();
+        cacheService = server.app.get(CacheService);
+    }, TEST_SETUP_TIMEOUT_MS);
+
+    afterAll(async () => {
+        await server.destroy();
+    });
+
+    it('get returns undefined for non-existent key', () =>
+        getReturnsUndefinedForNonExistentKey(cacheService));
+
+    it('sets a key', () => setsAKey(cacheService));
+
+    it('deletes a key', () => deletesAKey(cacheService));
+
+    it('sets a key with ttl', () => setsAKeyWithTtl(cacheService, ttlProvider));
+
+    it('evicts the oldest key when cache is full', () => evictsTheOldestKeyWhenCacheIsFull(cacheService));
+
+    it('invalidates by single tag', () => invalidatesBySingleTag(cacheService));
+
+    it('invalidates by multiple tags', () => invalidatesByMultipleTags(cacheService));
+
+    it('invalidates a large number of keys by tag', () => invalidatesALargeNumberOfKeysByTag(cacheService));
+});

+ 67 - 0
packages/core/e2e/cache-service-in-memory.e2e-spec.ts

@@ -0,0 +1,67 @@
+import { CacheService, mergeConfig } from '@vendure/core';
+import { createTestEnvironment } from '@vendure/testing';
+import path from 'path';
+import { afterAll, beforeAll, describe, it } from 'vitest';
+
+import { initialData } from '../../../e2e-common/e2e-initial-data';
+import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
+import { TestingCacheTtlProvider } from '../src/cache/cache-ttl-provider';
+import { InMemoryCacheStrategy } from '../src/config/system/in-memory-cache-strategy';
+
+import {
+    deletesAKey,
+    evictsTheOldestKeyWhenCacheIsFull,
+    getReturnsUndefinedForNonExistentKey,
+    invalidatesALargeNumberOfKeysByTag,
+    invalidatesByMultipleTags,
+    invalidatesBySingleTag,
+    setsAKey,
+    setsAKeyWithTtl,
+} from './fixtures/cache-service-shared-tests';
+
+describe('CacheService in-memory', () => {
+    const ttlProvider = new TestingCacheTtlProvider();
+
+    let cacheService: CacheService;
+    const { server, adminClient } = createTestEnvironment(
+        mergeConfig(testConfig(), {
+            systemOptions: {
+                cacheStrategy: new InMemoryCacheStrategy({
+                    cacheSize: 5,
+                    cacheTtlProvider: ttlProvider,
+                }),
+            },
+        }),
+    );
+
+    beforeAll(async () => {
+        await server.init({
+            initialData,
+            productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-minimal.csv'),
+            customerCount: 1,
+        });
+        await adminClient.asSuperAdmin();
+        cacheService = server.app.get(CacheService);
+    }, TEST_SETUP_TIMEOUT_MS);
+
+    afterAll(async () => {
+        await server.destroy();
+    });
+
+    it('get returns undefined for non-existent key', () =>
+        getReturnsUndefinedForNonExistentKey(cacheService));
+
+    it('sets a key', () => setsAKey(cacheService));
+
+    it('deletes a key', () => deletesAKey(cacheService));
+
+    it('sets a key with ttl', () => setsAKeyWithTtl(cacheService, ttlProvider));
+
+    it('evicts the oldest key when cache is full', () => evictsTheOldestKeyWhenCacheIsFull(cacheService));
+
+    it('invalidates by single tag', () => invalidatesBySingleTag(cacheService));
+
+    it('invalidates by multiple tags', () => invalidatesByMultipleTags(cacheService));
+
+    it('invalidates a large number of keys by tag', () => invalidatesALargeNumberOfKeysByTag(cacheService));
+});

+ 89 - 0
packages/core/e2e/fixtures/cache-service-shared-tests.ts

@@ -0,0 +1,89 @@
+import { CacheService } from '@vendure/core';
+import { expect } from 'vitest';
+
+import { TestingCacheTtlProvider } from '../../src/cache/cache-ttl-provider';
+
+export async function getReturnsUndefinedForNonExistentKey(cacheService: CacheService) {
+    const result = await cacheService.get('non-existent-key');
+    expect(result).toBeUndefined();
+}
+
+export async function setsAKey(cacheService: CacheService) {
+    await cacheService.set('test-key', 'test-value');
+    const result = await cacheService.get('test-key');
+    expect(result).toBe('test-value');
+}
+
+export async function deletesAKey(cacheService: CacheService) {
+    await cacheService.set('test-key', 'test-value');
+    await cacheService.delete('test-key');
+    const result = await cacheService.get('test-key');
+
+    expect(result).toBeUndefined();
+}
+
+export async function setsAKeyWithTtl(cacheService: CacheService, ttlProvider: TestingCacheTtlProvider) {
+    ttlProvider.setTime(new Date().getTime());
+    await cacheService.set('test-key', 'test-value', { ttl: 1000 });
+    const result = await cacheService.get('test-key');
+    expect(result).toBe('test-value');
+
+    ttlProvider.incrementTime(2000);
+
+    const result2 = await cacheService.get('test-key');
+
+    expect(result2).toBeUndefined();
+}
+
+export async function evictsTheOldestKeyWhenCacheIsFull(cacheService: CacheService) {
+    await cacheService.set('key1', 'value1');
+    await cacheService.set('key2', 'value2');
+    await cacheService.set('key3', 'value3');
+    await cacheService.set('key4', 'value4');
+    await cacheService.set('key5', 'value5');
+
+    const result1 = await cacheService.get('key1');
+    expect(result1).toBe('value1');
+
+    await cacheService.set('key6', 'value6');
+
+    const result2 = await cacheService.get('key1');
+    expect(result2).toBeUndefined();
+}
+
+export async function invalidatesBySingleTag(cacheService: CacheService) {
+    await cacheService.set('taggedKey1', 'value1', { tags: ['tag1'] });
+    await cacheService.set('taggedKey2', 'value2', { tags: ['tag2'] });
+
+    expect(await cacheService.get('taggedKey1')).toBe('value1');
+    expect(await cacheService.get('taggedKey2')).toBe('value2');
+
+    await cacheService.invalidateTags(['tag1']);
+
+    expect(await cacheService.get('taggedKey1')).toBeUndefined();
+    expect(await cacheService.get('taggedKey2')).toBe('value2');
+}
+
+export async function invalidatesByMultipleTags(cacheService: CacheService) {
+    await cacheService.set('taggedKey1', 'value1', { tags: ['tag1'] });
+    await cacheService.set('taggedKey2', 'value2', { tags: ['tag2'] });
+
+    expect(await cacheService.get('taggedKey1')).toBe('value1');
+    expect(await cacheService.get('taggedKey2')).toBe('value2');
+
+    await cacheService.invalidateTags(['tag1', 'tag2']);
+
+    expect(await cacheService.get('taggedKey1')).toBeUndefined();
+    expect(await cacheService.get('taggedKey2')).toBeUndefined();
+}
+
+export async function invalidatesALargeNumberOfKeysByTag(cacheService: CacheService) {
+    for (let i = 0; i < 100; i++) {
+        await cacheService.set(`taggedKey${i}`, `value${i}`, { tags: ['tag'] });
+    }
+    await cacheService.invalidateTags(['tag']);
+
+    for (let i = 0; i < 100; i++) {
+        expect(await cacheService.get(`taggedKey${i}`)).toBeUndefined();
+    }
+}

+ 51 - 0
packages/core/src/cache/cache-ttl-provider.ts

@@ -0,0 +1,51 @@
+/**
+ * @description
+ * This interface is used to provide the current time in milliseconds.
+ * The reason it is abstracted in this way is so that the cache
+ * implementations can be more easily tested.
+ *
+ * In an actual application you would not need to change the default.
+ */
+export interface CacheTtlProvider {
+    /**
+     * @description
+     * Returns the current timestamp in milliseconds.
+     */
+    getTime(): number;
+}
+
+/**
+ * @description
+ * The default implementation of the {@link CacheTtlProvider} which
+ * simply returns the current time.
+ */
+export class DefaultCacheTtlProvider implements CacheTtlProvider {
+    /**
+     * @description
+     * Returns the current timestamp in milliseconds.
+     */
+    getTime(): number {
+        return new Date().getTime();
+    }
+}
+
+/**
+ * @description
+ * A testing implementation of the {@link CacheTtlProvider} which
+ * allows the time to be set manually.
+ */
+export class TestingCacheTtlProvider implements CacheTtlProvider {
+    private time = 0;
+
+    setTime(timestampInMs: number) {
+        this.time = timestampInMs;
+    }
+
+    incrementTime(ms: number) {
+        this.time += ms;
+    }
+
+    getTime(): number {
+        return this.time;
+    }
+}

+ 17 - 0
packages/core/src/cache/cache.service.ts

@@ -71,4 +71,21 @@ export class CacheService {
             Logger.error(`Could not delete key [${key}] from CacheService`, undefined, e.stack);
         }
     }
+
+    /**
+     * @description
+     * Deletes all items from the cache which contain at least one matching tag.
+     */
+    async invalidateTags(tags: string[]): Promise<void> {
+        try {
+            await this.cacheStrategy.invalidateTags(tags);
+            Logger.debug(`Invalidated tags [${tags.join(', ')}] from CacheService`);
+        } catch (e: any) {
+            Logger.error(
+                `Could not invalidate tags [${tags.join(', ')}] from CacheService`,
+                undefined,
+                e.stack,
+            );
+        }
+    }
 }

+ 1 - 0
packages/core/src/cache/index.ts

@@ -1 +1,2 @@
 export * from './request-context-cache.service';
+export * from './cache.service';

+ 12 - 0
packages/core/src/config/system/cache-strategy.ts

@@ -15,6 +15,12 @@ export interface SetCacheKeyOptions {
      * this is equivalent to having an infinite ttl.
      */
     ttl?: number;
+    /**
+     * @description
+     * An array of tags which can be used to group cache keys together.
+     * This can be useful for bulk deletion of related keys.
+     */
+    tags?: string[];
 }
 
 /**
@@ -49,4 +55,10 @@ export interface CacheStrategy extends InjectableStrategy {
      * Deletes an item from the cache.
      */
     delete(key: string): Promise<void>;
+
+    /**
+     * @description
+     * Deletes all items from the cache which contain at least one matching tag.
+     */
+    invalidateTags(tags: string[]): Promise<void>;
 }

+ 27 - 4
packages/core/src/config/system/in-memory-cache-strategy.ts

@@ -1,5 +1,7 @@
 import { JsonCompatible } from '@vendure/common/lib/shared-types';
 
+import { CacheTtlProvider, DefaultCacheTtlProvider } from '../../cache/cache-ttl-provider';
+
 import { CacheStrategy, SetCacheKeyOptions } from './cache-strategy';
 
 export interface CacheItem<T> {
@@ -18,19 +20,21 @@ export interface CacheItem<T> {
  */
 export class InMemoryCacheStrategy implements CacheStrategy {
     protected cache = new Map<string, CacheItem<any>>();
+    protected cacheTags = new Map<string, Set<string>>();
     protected cacheSize = 10_000;
+    protected ttlProvider: CacheTtlProvider;
 
-    constructor(config?: { cacheSize?: number }) {
+    constructor(config?: { cacheSize?: number; cacheTtlProvider?: CacheTtlProvider }) {
         if (config?.cacheSize) {
             this.cacheSize = config.cacheSize;
         }
+        this.ttlProvider = config?.cacheTtlProvider || new DefaultCacheTtlProvider();
     }
 
     async get<T extends JsonCompatible<T>>(key: string): Promise<T | undefined> {
         const hit = this.cache.get(key);
         if (hit) {
-            const now = new Date().getTime();
-            if (!hit.expires || (hit.expires && now < hit.expires)) {
+            if (!hit.expires || (hit.expires && this.ttlProvider.getTime() < hit.expires)) {
                 return hit.value;
             } else {
                 this.cache.delete(key);
@@ -49,14 +53,33 @@ export class InMemoryCacheStrategy implements CacheStrategy {
         }
         this.cache.set(key, {
             value,
-            expires: options?.ttl ? new Date().getTime() + options.ttl : undefined,
+            expires: options?.ttl ? this.ttlProvider.getTime() + options.ttl : undefined,
         });
+        if (options?.tags) {
+            for (const tag of options.tags) {
+                const tagged = this.cacheTags.get(tag) || new Set<string>();
+                tagged.add(key);
+                this.cacheTags.set(tag, tagged);
+            }
+        }
     }
 
     async delete(key: string) {
         this.cache.delete(key);
     }
 
+    async invalidateTags(tags: string[]) {
+        for (const tag of tags) {
+            const tagged = this.cacheTags.get(tag);
+            if (tagged) {
+                for (const key of tagged) {
+                    this.cache.delete(key);
+                }
+                this.cacheTags.delete(tag);
+            }
+        }
+    }
+
     private first() {
         return this.cache.keys().next().value;
     }

+ 3 - 0
packages/core/src/plugin/default-cache-plugin/cache-item.entity.ts

@@ -9,6 +9,9 @@ export class CacheItem extends VendureEntity {
         super(input);
     }
 
+    @Column({ precision: 3 })
+    insertedAt: Date;
+
     @Index('cache_item_key')
     @Column({ unique: true })
     key: string;

+ 25 - 0
packages/core/src/plugin/default-cache-plugin/cache-tag.entity.ts

@@ -0,0 +1,25 @@
+import { DeepPartial } from '@vendure/common/lib/shared-types';
+import { Column, Entity, Index, ManyToOne, RelationId, Unique } from 'typeorm';
+
+import { VendureEntity } from '../../entity/base/base.entity';
+import { EntityId } from '../../entity/index';
+
+import { CacheItem } from './cache-item.entity';
+
+@Entity()
+@Unique(['tag', 'itemId'])
+export class CacheTag extends VendureEntity {
+    constructor(input: DeepPartial<CacheTag>) {
+        super(input);
+    }
+
+    @Index('cache_tag_tag')
+    @Column()
+    tag: string;
+
+    @ManyToOne(() => CacheItem, { onDelete: 'CASCADE' })
+    item: CacheItem;
+
+    @EntityId()
+    itemId: string;
+}

+ 14 - 1
packages/core/src/plugin/default-cache-plugin/default-cache-plugin.ts

@@ -1,21 +1,34 @@
+import { CacheTtlProvider } from '../../cache/cache-ttl-provider';
 import { PluginCommonModule } from '../plugin-common.module';
 import { VendurePlugin } from '../vendure-plugin';
 
 import { CacheItem } from './cache-item.entity';
+import { CacheTag } from './cache-tag.entity';
 import { PLUGIN_INIT_OPTIONS } from './constants';
 import { SqlCacheStrategy } from './sql-cache-strategy';
 
 export interface DefaultCachePluginInitOptions {
+    /**
+     * @description
+     * The maximum number of items to store in the cache. Once the cache reaches this size,
+     * the least-recently-used items will be evicted to make room for new items.
+     */
     cacheSize?: number;
+    /**
+     * Optionally provide a custom CacheTtlProvider to control the TTL of cache items.
+     * This is useful for testing.
+     */
+    cacheTtlProvider?: CacheTtlProvider;
 }
 
 @VendurePlugin({
     imports: [PluginCommonModule],
-    entities: [CacheItem],
+    entities: [CacheItem, CacheTag],
     providers: [{ provide: PLUGIN_INIT_OPTIONS, useFactory: () => DefaultCachePlugin.options }],
     configuration: config => {
         config.systemOptions.cacheStrategy = new SqlCacheStrategy({
             cacheSize: DefaultCachePlugin.options.cacheSize,
+            cacheTtlProvider: DefaultCachePlugin.options.cacheTtlProvider,
         });
         return config;
     },

+ 63 - 9
packages/core/src/plugin/default-cache-plugin/sql-cache-strategy.ts

@@ -1,11 +1,13 @@
 import { JsonCompatible } from '@vendure/common/lib/shared-types';
 
+import { CacheTtlProvider, DefaultCacheTtlProvider } from '../../cache/cache-ttl-provider';
 import { Injector } from '../../common/index';
 import { ConfigService, Logger } from '../../config/index';
 import { CacheStrategy, SetCacheKeyOptions } from '../../config/system/cache-strategy';
 import { TransactionalConnection } from '../../connection/index';
 
 import { CacheItem } from './cache-item.entity';
+import { CacheTag } from './cache-tag.entity';
 
 /**
  * A {@link CacheStrategy} that stores the cache in memory using a simple
@@ -18,11 +20,13 @@ import { CacheItem } from './cache-item.entity';
  */
 export class SqlCacheStrategy implements CacheStrategy {
     protected cacheSize = 10_000;
+    protected ttlProvider: CacheTtlProvider;
 
-    constructor(config?: { cacheSize?: number }) {
+    constructor(config?: { cacheSize?: number; cacheTtlProvider?: CacheTtlProvider }) {
         if (config?.cacheSize) {
             this.cacheSize = config.cacheSize;
         }
+        this.ttlProvider = config?.cacheTtlProvider || new DefaultCacheTtlProvider();
     }
 
     protected connection: TransactionalConnection;
@@ -41,8 +45,7 @@ export class SqlCacheStrategy implements CacheStrategy {
         });
 
         if (hit) {
-            const now = new Date().getTime();
-            if (!hit.expiresAt || (hit.expiresAt && now < hit.expiresAt.getTime())) {
+            if (!hit.expiresAt || (hit.expiresAt && this.ttlProvider.getTime() < hit.expiresAt.getTime())) {
                 try {
                     return JSON.parse(hit.value);
                 } catch (e: any) {
@@ -58,15 +61,15 @@ export class SqlCacheStrategy implements CacheStrategy {
 
     async set<T extends JsonCompatible<T>>(key: string, value: T, options?: SetCacheKeyOptions) {
         const cacheSize = await this.connection.rawConnection.getRepository(CacheItem).count();
-        if (cacheSize > this.cacheSize) {
+        if (cacheSize >= this.cacheSize) {
             // evict oldest
             const subQuery1 = this.connection.rawConnection
                 .getRepository(CacheItem)
                 .createQueryBuilder('item')
                 .select('item.id', 'item_id')
-                .orderBy('item.updatedAt', 'DESC')
+                .orderBy('item.insertedAt', 'DESC')
                 .limit(1000)
-                .offset(this.cacheSize);
+                .offset(Math.max(this.cacheSize - 1, 1));
             const subQuery2 = this.connection.rawConnection
                 .createQueryBuilder()
                 .select('t.item_id')
@@ -81,17 +84,34 @@ export class SqlCacheStrategy implements CacheStrategy {
             try {
                 await qb.execute();
             } catch (e: any) {
-                Logger.error(`An error occured when attempting to prune the cache: ${e.message as string}`);
+                Logger.error(`An error occurred when attempting to prune the cache: ${e.message as string}`);
             }
         }
-        await this.connection.rawConnection.getRepository(CacheItem).upsert(
+        const item = await this.connection.rawConnection.getRepository(CacheItem).upsert(
             new CacheItem({
                 key,
+                insertedAt: new Date(),
                 value: JSON.stringify(value),
-                expiresAt: options?.ttl ? new Date(new Date().getTime() + options.ttl) : undefined,
+                expiresAt: options?.ttl ? new Date(this.ttlProvider.getTime() + options.ttl) : undefined,
             }),
             ['key'],
         );
+
+        if (options?.tags) {
+            for (const tag of options.tags) {
+                try {
+                    await this.connection.rawConnection.getRepository(CacheTag).upsert(
+                        {
+                            tag,
+                            item: item.identifiers[0],
+                        } as any,
+                        ['tag', 'itemId'],
+                    );
+                } catch (e: any) {
+                    Logger.error(`Error inserting tag`, e.message);
+                }
+            }
+        }
     }
 
     async delete(key: string) {
@@ -99,4 +119,38 @@ export class SqlCacheStrategy implements CacheStrategy {
             key,
         });
     }
+
+    async invalidateTags(tags: string[]) {
+        await this.connection.withTransaction(async ctx => {
+            const itemIds = await this.connection
+                .getRepository(ctx, CacheTag)
+                .createQueryBuilder('cache_tag')
+                .select('cache_tag.itemId')
+                .where('cache_tag.tag IN (:...tags)', { tags })
+                .groupBy('cache_tag.itemId')
+                .groupBy('cache_tag.id')
+                .getMany();
+
+            await this.connection
+                .getRepository(ctx, CacheTag)
+                .createQueryBuilder('cache_tag')
+                .delete()
+                .where('cache_tag.tag IN (:...tags)', { tags })
+                .execute();
+
+            if (itemIds.length) {
+                const ids = itemIds.map(i => i.itemId);
+                const batchSize = 1000;
+
+                for (let i = 0; i < itemIds.length; i += batchSize) {
+                    const batch = ids.slice(i, batchSize);
+                    try {
+                        await this.connection.getRepository(ctx, CacheItem).delete(batch);
+                    } catch (e: any) {
+                        Logger.error(`Error deleting items`, e.message);
+                    }
+                }
+            }
+        });
+    }
 }