1
0

transaction-wrapper.ts 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import { from, lastValueFrom, Observable } from 'rxjs';
  2. import { retryWhen, take, tap } from 'rxjs/operators';
  3. import { DataSource, EntityManager, QueryRunner } from 'typeorm';
  4. import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';
  5. import { RequestContext } from '../api/common/request-context';
  6. import { TransactionIsolationLevel, TransactionMode } from '../api/decorators/transaction.decorator';
  7. import { TRANSACTION_MANAGER_KEY } from '../common/constants';
  8. /**
  9. * @description
  10. * This helper class is used to wrap operations in a TypeORM transaction in order to ensure
  11. * atomic operations on the database.
  12. */
  13. export class TransactionWrapper {
  14. /**
  15. * @description
  16. * Executes the `work` function within the context of a transaction. If the `work` function
  17. * resolves / completes, then all the DB operations it contains will be committed. If it
  18. * throws an error or rejects, then all DB operations will be rolled back.
  19. *
  20. * @note
  21. * This function does not mutate your context. Instead, this function makes a copy and passes
  22. * context to work function.
  23. */
  24. async executeInTransaction<T>(
  25. originalCtx: RequestContext,
  26. work: (ctx: RequestContext) => Observable<T> | Promise<T>,
  27. mode: TransactionMode,
  28. isolationLevel: TransactionIsolationLevel | undefined,
  29. connection: DataSource,
  30. ): Promise<T> {
  31. // Copy to make sure original context will remain valid after transaction completes
  32. const ctx = originalCtx.copy();
  33. const entityManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY];
  34. let queryRunner = entityManager?.queryRunner;
  35. if (!queryRunner || queryRunner.isReleased) {
  36. queryRunner = connection.createQueryRunner();
  37. }
  38. if (mode === 'auto') {
  39. await this.startTransaction(queryRunner, isolationLevel);
  40. }
  41. (ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager;
  42. try {
  43. const maxRetries = 5;
  44. const result = await lastValueFrom(
  45. from(work(ctx)).pipe(
  46. retryWhen(errors =>
  47. errors.pipe(
  48. tap(err => {
  49. if (!this.isRetriableError(err)) {
  50. throw err;
  51. }
  52. }),
  53. take(maxRetries),
  54. ),
  55. ),
  56. ),
  57. );
  58. if (queryRunner.isTransactionActive) {
  59. await queryRunner.commitTransaction();
  60. }
  61. return result;
  62. } catch (error) {
  63. if (queryRunner.isTransactionActive) {
  64. await queryRunner.rollbackTransaction();
  65. }
  66. throw error;
  67. } finally {
  68. if (!queryRunner.isTransactionActive && queryRunner.isReleased === false) {
  69. // There is a check for an active transaction
  70. // because this could be a nested transaction (savepoint).
  71. await queryRunner.release();
  72. }
  73. }
  74. }
  75. /**
  76. * Attempts to start a DB transaction, with retry logic in the case that a transaction
  77. * is already started for the connection (which is mainly a problem with SQLite/Sql.js)
  78. */
  79. private async startTransaction(
  80. queryRunner: QueryRunner,
  81. isolationLevel: TransactionIsolationLevel | undefined,
  82. ) {
  83. const maxRetries = 25;
  84. let attempts = 0;
  85. let lastError: any;
  86. // Returns false if a transaction is already in progress
  87. async function attemptStartTransaction(): Promise<boolean> {
  88. try {
  89. await queryRunner.startTransaction(isolationLevel);
  90. return true;
  91. } catch (err: any) {
  92. lastError = err;
  93. if (err instanceof TransactionAlreadyStartedError) {
  94. return false;
  95. }
  96. throw err;
  97. }
  98. }
  99. while (attempts < maxRetries) {
  100. const result = await attemptStartTransaction();
  101. if (result) {
  102. return;
  103. }
  104. attempts++;
  105. // insert an increasing delay before retrying
  106. await new Promise(resolve => setTimeout(resolve, attempts * 20));
  107. }
  108. throw lastError;
  109. }
  110. /**
  111. * If the resolver function throws an error, there are certain cases in which
  112. * we want to retry the whole thing again - notably in the case of a deadlock
  113. * situation, which can usually be retried with success.
  114. */
  115. private isRetriableError(err: any): boolean {
  116. const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK';
  117. const postgresDeadlock = err.code === 'deadlock_detected';
  118. return mysqlDeadlock || postgresDeadlock;
  119. }
  120. }