transaction-wrapper.ts 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import { from, lastValueFrom, Observable, of } from 'rxjs';
  2. import { retryWhen, take, tap } from 'rxjs/operators';
  3. import { Connection, EntityManager, QueryRunner } from 'typeorm';
  4. import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';
  5. import { RequestContext } from '../api/common/request-context';
  6. import { TransactionIsolationLevel, TransactionMode } from '../api/decorators/transaction.decorator';
  7. import { TRANSACTION_MANAGER_KEY } from '../common/constants';
  8. /**
  9. * @description
  10. * This helper class is used to wrap operations in a TypeORM transaction in order to ensure
  11. * atomic operations on the database.
  12. */
  13. export class TransactionWrapper {
  14. /**
  15. * @description
  16. * Executes the `work` function within the context of a transaction. If the `work` function
  17. * resolves / completes, then all the DB operations it contains will be committed. If it
  18. * throws an error or rejects, then all DB operations will be rolled back.
  19. *
  20. * @note
  21. * This function does not mutate your context. Instead, this function makes a copy and passes
  22. * context to work function.
  23. */
  24. async executeInTransaction<T>(
  25. originalCtx: RequestContext,
  26. work: (ctx: RequestContext) => Observable<T> | Promise<T>,
  27. mode: TransactionMode,
  28. isolationLevel: TransactionIsolationLevel | undefined,
  29. connection: Connection,
  30. ): Promise<T> {
  31. // Copy to make sure original context will remain valid after transaction completes
  32. const ctx = originalCtx.copy();
  33. const entityManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY];
  34. const queryRunner = entityManager ?.queryRunner || connection.createQueryRunner();
  35. if (mode === 'auto') {
  36. await this.startTransaction(queryRunner, isolationLevel);
  37. }
  38. (ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager;
  39. try {
  40. const maxRetries = 5;
  41. const result = await lastValueFrom(
  42. from(work(ctx)).pipe(
  43. retryWhen(errors =>
  44. errors.pipe(
  45. tap(err => {
  46. if (!this.isRetriableError(err)) {
  47. throw err;
  48. }
  49. }),
  50. take(maxRetries),
  51. ),
  52. ),
  53. ),
  54. );
  55. if (queryRunner.isTransactionActive) {
  56. await queryRunner.commitTransaction();
  57. }
  58. return result;
  59. } catch (error) {
  60. if (queryRunner.isTransactionActive) {
  61. await queryRunner.rollbackTransaction();
  62. }
  63. throw error;
  64. } finally {
  65. if (!queryRunner.isTransactionActive
  66. && queryRunner.isReleased === false) {
  67. // There is a check for an active transaction
  68. // because this could be a nested transaction (savepoint).
  69. await queryRunner.release();
  70. }
  71. }
  72. }
  73. /**
  74. * Attempts to start a DB transaction, with retry logic in the case that a transaction
  75. * is already started for the connection (which is mainly a problem with SQLite/Sql.js)
  76. */
  77. private async startTransaction(queryRunner: QueryRunner, isolationLevel: TransactionIsolationLevel | undefined) {
  78. const maxRetries = 25;
  79. let attempts = 0;
  80. let lastError: any;
  81. // Returns false if a transaction is already in progress
  82. async function attemptStartTransaction(): Promise<boolean> {
  83. try {
  84. await queryRunner.startTransaction(isolationLevel);
  85. return true;
  86. } catch (err: any) {
  87. lastError = err;
  88. if (err instanceof TransactionAlreadyStartedError) {
  89. return false;
  90. }
  91. throw err;
  92. }
  93. }
  94. while (attempts < maxRetries) {
  95. const result = await attemptStartTransaction();
  96. if (result) {
  97. return;
  98. }
  99. attempts++;
  100. // insert an increasing delay before retrying
  101. await new Promise(resolve => setTimeout(resolve, attempts * 20));
  102. }
  103. throw lastError;
  104. }
  105. /**
  106. * If the resolver function throws an error, there are certain cases in which
  107. * we want to retry the whole thing again - notably in the case of a deadlock
  108. * situation, which can usually be retried with success.
  109. */
  110. private isRetriableError(err: any): boolean {
  111. const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK';
  112. const postgresDeadlock = err.code === 'deadlock_detected';
  113. return mysqlDeadlock || postgresDeadlock;
  114. }
  115. }