transaction-wrapper.ts 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import { from, Observable, of } from 'rxjs';
  2. import { retryWhen, take, tap } from 'rxjs/operators';
  3. import { Connection, EntityManager, QueryRunner } from 'typeorm';
  4. import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';
  5. import { RequestContext } from '../api/common/request-context';
  6. import { TransactionMode } from '../api/decorators/transaction.decorator';
  7. import { TRANSACTION_MANAGER_KEY } from '../common/constants';
  8. /**
  9. * @description
  10. * This helper class is used to wrap operations in a TypeORM transaction in order to ensure
  11. * atomic operations on the database.
  12. */
  13. export class TransactionWrapper {
  14. /**
  15. * @description
  16. * Executes the `work` function within the context of a transaction. If the `work` function
  17. * resolves / completes, then all the DB operations it contains will be committed. If it
  18. * throws an error or rejects, then all DB operations will be rolled back.
  19. *
  20. * @note
  21. * This function does not mutate your context. Instead, this function makes a copy and passes
  22. * context to work function.
  23. */
  24. async executeInTransaction<T>(
  25. originalCtx: RequestContext,
  26. work: (ctx: RequestContext) => Observable<T> | Promise<T>,
  27. mode: TransactionMode,
  28. connection: Connection,
  29. ): Promise<T> {
  30. // Copy to make sure original context will remain valid after transaction completes
  31. const ctx = originalCtx.copy();
  32. const entityManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY];
  33. const queryRunner = entityManager?.queryRunner || connection.createQueryRunner();
  34. if (mode === 'auto') {
  35. await this.startTransaction(queryRunner);
  36. }
  37. (ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager;
  38. try {
  39. const maxRetries = 5;
  40. const result = await from(work(ctx))
  41. .pipe(
  42. retryWhen(errors =>
  43. errors.pipe(
  44. tap(err => {
  45. if (!this.isRetriableError(err)) {
  46. throw err;
  47. }
  48. }),
  49. take(maxRetries),
  50. ),
  51. ),
  52. )
  53. .toPromise();
  54. if (queryRunner.isTransactionActive) {
  55. await queryRunner.commitTransaction();
  56. }
  57. return result;
  58. } catch (error) {
  59. if (queryRunner.isTransactionActive) {
  60. await queryRunner.rollbackTransaction();
  61. }
  62. throw error;
  63. } finally {
  64. if (!queryRunner.isTransactionActive
  65. && queryRunner.isReleased === false) {
  66. // There is a check for an active transaction
  67. // because this could be a nested transaction (savepoint).
  68. await queryRunner.release();
  69. }
  70. }
  71. }
  72. /**
  73. * Attempts to start a DB transaction, with retry logic in the case that a transaction
  74. * is already started for the connection (which is mainly a problem with SQLite/Sql.js)
  75. */
  76. private async startTransaction(queryRunner: QueryRunner) {
  77. const maxRetries = 25;
  78. let attempts = 0;
  79. let lastError: any;
  80. // Returns false if a transaction is already in progress
  81. async function attemptStartTransaction(): Promise<boolean> {
  82. try {
  83. await queryRunner.startTransaction();
  84. return true;
  85. } catch (err) {
  86. lastError = err;
  87. if (err instanceof TransactionAlreadyStartedError) {
  88. return false;
  89. }
  90. throw err;
  91. }
  92. }
  93. while (attempts < maxRetries) {
  94. const result = await attemptStartTransaction();
  95. if (result) {
  96. return;
  97. }
  98. attempts++;
  99. // insert an increasing delay before retrying
  100. await new Promise(resolve => setTimeout(resolve, attempts * 20));
  101. }
  102. throw lastError;
  103. }
  104. /**
  105. * If the resolver function throws an error, there are certain cases in which
  106. * we want to retry the whole thing again - notably in the case of a deadlock
  107. * situation, which can usually be retried with success.
  108. */
  109. private isRetriableError(err: any): boolean {
  110. const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK';
  111. const postgresDeadlock = err.code === 'deadlock_detected';
  112. return mysqlDeadlock || postgresDeadlock;
  113. }
  114. }