entities.data-processor.ts 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import { Injector, Logger, TransactionalConnection, VendureEntity } from '@vendure/core';
  2. import { EntityMetadata } from 'typeorm';
  3. import { Serializable } from 'node:child_process';
  4. import { loggerCtx } from '../constants';
  5. import { EntityDataMapperService } from '../entity-data-mapper/entity-data-mapper.service';
  6. import { EntitySearchIndexItem } from '../types';
  7. import { BaseDataProcessor } from './base-data-processor';
  8. export class EntitiesDataProcessor extends BaseDataProcessor {
  9. private connection: TransactionalConnection;
  10. private entityDataMapperService: EntityDataMapperService;
  11. private entityMetadatas: EntityMetadata[] = [];
  12. private batchSize = 100;
  13. init(injector: Injector) {
  14. super.init(injector);
  15. this.connection = injector.get(TransactionalConnection);
  16. this.entityDataMapperService = injector.get(EntityDataMapperService);
  17. // Get all entity metadata from TypeORM DataSource
  18. this.entityMetadatas = this.connection.rawConnection.entityMetadatas;
  19. Logger.info(`Found ${this.entityMetadatas.length} entities to process for search indexing`);
  20. }
  21. getBatchSize(): number {
  22. return this.batchSize;
  23. }
  24. async getTotalResults(metadata: Record<string, Serializable> | undefined): Promise<number> {
  25. if (!metadata?.entityName) {
  26. Logger.error(`Entity target is required`, loggerCtx);
  27. return 0;
  28. }
  29. const entityMetadata = this.entityMetadatas.find(em => em.targetName === metadata.entityName);
  30. if (!entityMetadata) {
  31. Logger.error(`Could not find entity metadata for ${metadata.entityName}`, loggerCtx);
  32. return 0;
  33. }
  34. try {
  35. const repository = this.connection.rawConnection.getRepository(entityMetadata.target);
  36. return await repository.count();
  37. } catch (error) {
  38. Logger.error(`Failed to count entities for ${metadata.entityName}: ${error}`);
  39. return 0;
  40. }
  41. }
  42. async *processBatch(
  43. skip: number,
  44. limit: number,
  45. // eslint-disable-next-line @typescript-eslint/no-unused-vars
  46. metadata: Record<string, any> | undefined,
  47. ): AsyncGenerator<void> {
  48. let processedCount = 0;
  49. let currentSkip = skip;
  50. for (const entityMetadata of this.entityMetadatas) {
  51. try {
  52. const repository = this.connection.rawConnection.getRepository(entityMetadata.target);
  53. const totalInEntity = await repository.count();
  54. // Calculate how many to skip/take from this entity
  55. if (currentSkip >= totalInEntity) {
  56. currentSkip -= totalInEntity;
  57. continue;
  58. }
  59. const takeFromEntity = Math.min(limit - processedCount, totalInEntity - currentSkip);
  60. const entities = (await repository.find({
  61. skip: currentSkip,
  62. take: takeFromEntity,
  63. })) as VendureEntity[];
  64. const searchIndexItems: EntitySearchIndexItem[] = [];
  65. for (const entity of entities) {
  66. const searchIndexItem = await this.mapEntityToSearchIndexItem(entity, entityMetadata);
  67. if (searchIndexItem) {
  68. searchIndexItems.push(searchIndexItem);
  69. }
  70. }
  71. if (searchIndexItems.length > 0) {
  72. await this.searchIndexingStrategy.persist(searchIndexItems);
  73. }
  74. processedCount += entities.length;
  75. currentSkip = 0; // Reset for next entity type
  76. yield;
  77. if (processedCount >= limit) {
  78. break;
  79. }
  80. } catch (error) {
  81. Logger.error(`Failed to process batch for entity ${entityMetadata.name}: ${error}`);
  82. }
  83. }
  84. }
  85. async processOne(id: string, metadata: Record<string, Serializable> | undefined): Promise<void> {
  86. if (!metadata?.entityName) {
  87. Logger.error(`Entity metadata is required`, loggerCtx);
  88. return;
  89. }
  90. const entityName = metadata.entityName as string;
  91. const entityMetadata = this.entityMetadatas.find(m => m.name === entityName);
  92. if (!entityMetadata) {
  93. Logger.warn(`Entity type ${entityName} not found in registered entities`, loggerCtx);
  94. return;
  95. }
  96. try {
  97. const repository = this.connection.rawConnection.getRepository(entityMetadata.target);
  98. const entity = (await repository.findOne({ where: { id } })) as VendureEntity | null;
  99. if (entity) {
  100. const searchIndexItem = await this.mapEntityToSearchIndexItem(entity, entityMetadata);
  101. if (searchIndexItem) {
  102. await this.searchIndexingStrategy.persist([searchIndexItem]);
  103. }
  104. } else {
  105. Logger.warn(`Entity ${entityName} with id ${id} not found`);
  106. // Remove from index if entity no longer exists
  107. await this.searchIndexingStrategy.remove(`entity_${entityName}_${id}`);
  108. }
  109. } catch (error) {
  110. Logger.error(`Failed to process entity ${entityName} with id ${id}: ${error}`);
  111. }
  112. return;
  113. }
  114. private async mapEntityToSearchIndexItem(
  115. entity: VendureEntity,
  116. metadata: EntityMetadata,
  117. ): Promise<EntitySearchIndexItem | null> {
  118. const mappedData = await this.entityDataMapperService.map(metadata.name, entity);
  119. return {
  120. ...mappedData,
  121. entityId: entity.id,
  122. entityName: metadata.name,
  123. };
  124. }
  125. }