Skip to content

Commit

Permalink
MSEARCH-794: ecs: save records only in central tenant's db schema
Browse files Browse the repository at this point in the history
  • Loading branch information
mukhiddin-yusuf committed Aug 21, 2024
1 parent 2031540 commit bc27ac1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -12,7 +13,6 @@
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections4.CollectionUtils;
import org.folio.search.configuration.properties.ReindexConfigurationProperties;
import org.folio.search.exception.FolioIntegrationException;
import org.folio.search.integration.InventoryService;
import org.folio.search.model.reindex.MergeRangeEntity;
import org.folio.search.model.types.InventoryRecordType;
Expand All @@ -27,17 +27,14 @@ public class ReindexMergeRangeIndexService {

private final Map<ReindexEntityType, MergeRangeRepository> repositories;
private final InventoryService inventoryService;
private final ReindexStatusService statusService;
private final ReindexConfigurationProperties reindexConfig;

public ReindexMergeRangeIndexService(List<MergeRangeRepository> repositories,
InventoryService inventoryService,
ReindexStatusService statusService,
ReindexConfigurationProperties reindexConfig) {
this.repositories = repositories.stream()
.collect(Collectors.toMap(MergeRangeRepository::entityType, Function.identity()));
this.inventoryService = inventoryService;
this.statusService = statusService;
this.reindexConfig = reindexConfig;
}

Expand All @@ -47,23 +44,22 @@ public void deleteAllRangeRecords() {
repositories.get(ReindexEntityType.INSTANCE).truncateMergeRanges();
}

public void createMergeRanges(String tenantId) {
var repository = repositories.get(ReindexEntityType.INSTANCE);
public void saveMergeRanges(List<MergeRangeEntity> ranges) {
repositories.values().iterator().next().saveMergeRanges(ranges);
}

public List<MergeRangeEntity> createMergeRanges(String tenantId) {
List<MergeRangeEntity> mergeRangeEntities = new ArrayList<>();
for (var recordType : InventoryRecordType.values()) {
try {
var recordsCount = inventoryService.fetchInventoryRecordsCount(recordType);
var rangeSize = reindexConfig.getMergeRangeSize();
var ranges = constructMergeRangeRecords(recordsCount, rangeSize, recordType, tenantId);
if (CollectionUtils.isNotEmpty(ranges)) {
log.info("Creating [{} {}] ranges for [tenant: {}]", ranges.size(), recordType, tenantId);
repository.saveMergeRanges(ranges);
}
} catch (FolioIntegrationException e) {
log.warn("Skip creating merge ranges for [tenant: {}]. Exception: {}", tenantId, e.getMessage());
statusService.updateReindexMergeFailed(List.of(asEntityType(recordType)));
var recordsCount = inventoryService.fetchInventoryRecordsCount(recordType);
var rangeSize = reindexConfig.getMergeRangeSize();
var ranges = constructMergeRangeRecords(recordsCount, rangeSize, recordType, tenantId);
if (CollectionUtils.isNotEmpty(ranges)) {
log.info("Constructed [{} {}] ranges for [tenant: {}]", ranges.size(), recordType, tenantId);
mergeRangeEntities.addAll(ranges);
}
}

return mergeRangeEntities;
}

public List<MergeRangeEntity> fetchMergeRanges(ReindexEntityType entityType) {
Expand Down
45 changes: 23 additions & 22 deletions src/main/java/org/folio/search/service/reindex/ReindexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import static org.folio.search.service.reindex.ReindexConstants.MERGE_RANGE_ENTITY_TYPES;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;
import lombok.extern.log4j.Log4j2;
import org.folio.search.exception.FolioIntegrationException;
import org.apache.commons.collections4.CollectionUtils;
import org.folio.search.exception.RequestValidationException;
import org.folio.search.integration.InventoryService;
import org.folio.search.model.reindex.MergeRangeEntity;
import org.folio.search.service.consortium.ConsortiumTenantService;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -50,8 +54,13 @@ public CompletableFuture<Void> initFullReindex(String tenantId) {
statusService.recreateMergeStatusRecords();

var future = CompletableFuture.runAsync(() -> {
mergeRangeService.createMergeRanges(tenantId);
processForConsortium(tenantId);
var rangesForAllTenants = Stream.of(
mergeRangeService.createMergeRanges(tenantId),
processForConsortium(tenantId)
)
.flatMap(List::stream)
.toList();
mergeRangeService.saveMergeRanges(rangesForAllTenants);
}, reindexExecutor)
.thenRun(this::publishRecordsRange)
.handle((unused, throwable) -> {
Expand All @@ -66,32 +75,24 @@ public CompletableFuture<Void> initFullReindex(String tenantId) {
return future;
}

private void processForConsortium(String tenantId) {
try {
var memberTenants = consortiumService.getConsortiumTenants(tenantId);
for (var memberTenant : memberTenants) {
executionService.executeAsyncSystemUserScoped(memberTenant, () ->
mergeRangeService.createMergeRanges(memberTenant));
}
} catch (FolioIntegrationException e) {
log.warn("Skip creating merge ranges for [tenant: {}]. Exception: {}", tenantId, e.getMessage());
statusService.updateReindexMergeFailed();
private List<MergeRangeEntity> processForConsortium(String tenantId) {
List<MergeRangeEntity> mergeRangeEntities = new ArrayList<>();
var memberTenants = consortiumService.getConsortiumTenants(tenantId);
for (var memberTenant : memberTenants) {
executionService.executeAsyncSystemUserScoped(memberTenant, () ->
mergeRangeEntities.addAll(mergeRangeService.createMergeRanges(memberTenant)));
}
return mergeRangeEntities;
}

private void publishRecordsRange() {
for (var entityType : MERGE_RANGE_ENTITY_TYPES) {
var rangeEntities = mergeRangeService.fetchMergeRanges(entityType);
statusService.updateReindexMergeStarted(entityType, rangeEntities.size());

for (var rangeEntity : rangeEntities) {
try {
if (CollectionUtils.isNotEmpty(rangeEntities)) {
log.info("Publishing {} {} range entities", rangeEntities.size(), entityType);
statusService.updateReindexMergeStarted(entityType, rangeEntities.size());
for (var rangeEntity : rangeEntities) {
inventoryService.publishReindexRecordsRange(rangeEntity);
} catch (FolioIntegrationException e) {
log.error("Failed to publish records range entity [rangeEntity: {}]. Exception: {}",
rangeEntity, e.getMessage());
statusService.updateReindexMergeFailed();
return;
}
}
}
Expand Down

0 comments on commit bc27ac1

Please sign in to comment.