diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/5150--delete-expunge-over-10k-resources-deletes-only-10k.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/5150--delete-expunge-over-10k-resources-deletes-only-10k.yaml new file mode 100644 index 000000000000..64666cbcf693 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/5150--delete-expunge-over-10k-resources-deletes-only-10k.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 5150 +title: "When running a $delete-expunge with over 10,000 resources, only the first 10,000 resources were deleted. + This is now fixed." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/Batch2SupportConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/Batch2SupportConfig.java index 7fbf12097034..0ea5cae51758 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/Batch2SupportConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/Batch2SupportConfig.java @@ -19,16 +19,21 @@ */ package ca.uhn.fhir.jpa.config; +import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc; import ca.uhn.fhir.jpa.dao.data.IResourceLinkDao; +import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.dao.expunge.ResourceTableFKProvider; +import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.delete.batch2.DeleteExpungeSqlBuilder; import ca.uhn.fhir.jpa.delete.batch2.DeleteExpungeSvcImpl; import ca.uhn.fhir.jpa.reindex.Batch2DaoSvcImpl; +import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -37,8 +42,20 @@ public class Batch2SupportConfig { @Bean - public IBatch2DaoSvc batch2DaoSvc() { - return new Batch2DaoSvcImpl(); + public IBatch2DaoSvc batch2DaoSvc( + IResourceTableDao theResourceTableDao, + MatchUrlService theMatchUrlService, + DaoRegistry theDaoRegistry, + FhirContext theFhirContext, + IHapiTransactionService theTransactionService, + JpaStorageSettings theJpaStorageSettings) { + return new Batch2DaoSvcImpl( + theResourceTableDao, + theMatchUrlService, + theDaoRegistry, + theFhirContext, + theTransactionService, + theJpaStorageSettings); } @Bean diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/reindex/Batch2DaoSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/reindex/Batch2DaoSvcImpl.java index f195fc25475e..1a8e3e638831 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/reindex/Batch2DaoSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/reindex/Batch2DaoSvcImpl.java @@ -21,7 +21,9 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.pid.EmptyResourcePidList; @@ -39,93 +41,121 @@ import ca.uhn.fhir.rest.api.SortSpec; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; -import ca.uhn.fhir.rest.param.DateRangeParam; -import ca.uhn.fhir.util.DateRangeUtil; -import org.springframework.beans.factory.annotation.Autowired; +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import static org.apache.commons.collections4.CollectionUtils.isNotEmpty; - public class Batch2DaoSvcImpl implements IBatch2DaoSvc { + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(Batch2DaoSvcImpl.class); + + private final IResourceTableDao myResourceTableDao; - @Autowired - private IResourceTableDao myResourceTableDao; + private final MatchUrlService myMatchUrlService; - @Autowired - private MatchUrlService myMatchUrlService; + private final DaoRegistry myDaoRegistry; - @Autowired - private DaoRegistry myDaoRegistry; + private final FhirContext myFhirContext; - @Autowired - private FhirContext myFhirContext; + private final IHapiTransactionService myTransactionService; - @Autowired - private IHapiTransactionService myTransactionService; + private final JpaStorageSettings myJpaStorageSettings; @Override public boolean isAllResourceTypeSupported() { return true; } + public Batch2DaoSvcImpl( + IResourceTableDao theResourceTableDao, + MatchUrlService theMatchUrlService, + DaoRegistry theDaoRegistry, + FhirContext theFhirContext, + IHapiTransactionService theTransactionService, + JpaStorageSettings theJpaStorageSettings) { + myResourceTableDao = theResourceTableDao; + myMatchUrlService = theMatchUrlService; + myDaoRegistry = theDaoRegistry; + myFhirContext = theFhirContext; + myTransactionService = theTransactionService; + myJpaStorageSettings = theJpaStorageSettings; + } + @Override public IResourcePidList fetchResourceIdsPage( - Date theStart, - Date theEnd, - @Nonnull Integer thePageSize, - @Nullable RequestPartitionId theRequestPartitionId, - @Nullable String theUrl) { + Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) { return myTransactionService .withSystemRequest() .withRequestPartitionId(theRequestPartitionId) .execute(() -> { if (theUrl == null) { - return fetchResourceIdsPageNoUrl(theStart, theEnd, thePageSize, theRequestPartitionId); + return fetchResourceIdsPageNoUrl(theStart, theEnd, theRequestPartitionId); } else { - return fetchResourceIdsPageWithUrl( - theStart, theEnd, thePageSize, theUrl, theRequestPartitionId); + return fetchResourceIdsPageWithUrl(theEnd, theUrl, theRequestPartitionId); } }); } - private IResourcePidList fetchResourceIdsPageWithUrl( - Date theStart, Date theEnd, int thePageSize, String theUrl, RequestPartitionId theRequestPartitionId) { + @Nonnull + private HomogeneousResourcePidList fetchResourceIdsPageWithUrl( + Date theEnd, @Nonnull String theUrl, @Nullable RequestPartitionId theRequestPartitionId) { + if (!theUrl.contains("?")) { + throw new InternalErrorException(Msg.code(2422) + "this should never happen: URL is missing a '?'"); + } + + final Integer internalSynchronousSearchSize = myJpaStorageSettings.getInternalSynchronousSearchSize(); + + if (internalSynchronousSearchSize == null || internalSynchronousSearchSize <= 0) { + throw new InternalErrorException(Msg.code(2423) + + "this should never happen: internalSynchronousSearchSize is null or less than or equal to 0"); + } + + List currentIds = fetchResourceIdsPageWithUrl(0, theUrl, theRequestPartitionId); + ourLog.debug("FIRST currentIds: {}", currentIds.size()); + final List allIds = new ArrayList<>(currentIds); + + while (internalSynchronousSearchSize < currentIds.size()) { + // Ensure the offset is set to the last ID in the cumulative List, otherwise, we'll be stuck in an infinite + // loop here: + currentIds = fetchResourceIdsPageWithUrl(allIds.size(), theUrl, theRequestPartitionId); + ourLog.debug("NEXT currentIds: {}", currentIds.size()); + + allIds.addAll(currentIds); + } + + final String resourceType = theUrl.substring(0, theUrl.indexOf('?')); + + return new HomogeneousResourcePidList(resourceType, allIds, theEnd, theRequestPartitionId); + } + + private List fetchResourceIdsPageWithUrl( + int theOffset, String theUrl, RequestPartitionId theRequestPartitionId) { String resourceType = theUrl.substring(0, theUrl.indexOf('?')); RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(resourceType); SearchParameterMap searchParamMap = myMatchUrlService.translateMatchUrl(theUrl, def); - searchParamMap.setSort(new SortSpec(Constants.PARAM_LASTUPDATED, SortOrderEnum.ASC)); - DateRangeParam chunkDateRange = - DateRangeUtil.narrowDateRange(searchParamMap.getLastUpdated(), theStart, theEnd); - searchParamMap.setLastUpdated(chunkDateRange); - searchParamMap.setCount(thePageSize); + searchParamMap.setSort(new SortSpec(Constants.PARAM_ID, SortOrderEnum.ASC)); + searchParamMap.setOffset(theOffset); + searchParamMap.setLoadSynchronousUpTo(myJpaStorageSettings.getInternalSynchronousSearchSize() + 1); IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); SystemRequestDetails request = new SystemRequestDetails(); request.setRequestPartitionId(theRequestPartitionId); - List ids = dao.searchForIds(searchParamMap, request); - - Date lastDate = null; - if (isNotEmpty(ids)) { - IResourcePersistentId lastResourcePersistentId = ids.get(ids.size() - 1); - lastDate = dao.readByPid(lastResourcePersistentId, true).getMeta().getLastUpdated(); - } - return new HomogeneousResourcePidList(resourceType, ids, lastDate, theRequestPartitionId); + return dao.searchForIds(searchParamMap, request); } @Nonnull private IResourcePidList fetchResourceIdsPageNoUrl( - Date theStart, Date theEnd, int thePagesize, RequestPartitionId theRequestPartitionId) { - Pageable page = Pageable.ofSize(thePagesize); + Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId) { + final Pageable page = Pageable.unpaged(); Slice slice; if (theRequestPartitionId == null || theRequestPartitionId.isAllPartitions()) { slice = myResourceTableDao.findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest( diff --git a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/GoldenResourceSearchSvcImpl.java b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/GoldenResourceSearchSvcImpl.java index 51712154b23c..794e19946370 100644 --- a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/GoldenResourceSearchSvcImpl.java +++ b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/GoldenResourceSearchSvcImpl.java @@ -82,7 +82,6 @@ private IResourcePidList fetchResourceIdsPageWithResourceType( DateRangeParam chunkDateRange = DateRangeUtil.narrowDateRange(searchParamMap.getLastUpdated(), theStart, theEnd); searchParamMap.setLastUpdated(chunkDateRange); - searchParamMap.setCount(thePageSize); // request this many pids searchParamMap.add( "_tag", new TokenParam(MdmConstants.SYSTEM_GOLDEN_RECORD_STATUS, MdmConstants.CODE_GOLDEN_RECORD)); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/MultitenantBatchOperationR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/MultitenantBatchOperationR4Test.java index d0b6638dccee..ef1acc0bf6c7 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/MultitenantBatchOperationR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/MultitenantBatchOperationR4Test.java @@ -32,10 +32,8 @@ import java.util.stream.Collectors; import static ca.uhn.fhir.jpa.model.util.JpaConstants.DEFAULT_PARTITION_NAME; -import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.isA; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -108,7 +106,7 @@ public void testDeleteExpungeOperation() { String jobId = BatchHelperR4.jobIdFromBatch2Parameters(response); myBatch2JobHelper.awaitJobCompletion(jobId); - assertThat(interceptor.requestPartitionIds, hasSize(5)); + assertThat(interceptor.requestPartitionIds, hasSize(4)); RequestPartitionId partitionId = interceptor.requestPartitionIds.get(0); assertEquals(TENANT_B_ID, partitionId.getFirstPartitionIdOrNull()); assertEquals(TENANT_B, partitionId.getFirstPartitionNameOrNull()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/Batch2DaoSvcImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/Batch2DaoSvcImplTest.java new file mode 100644 index 000000000000..d4f5dbf07d2d --- /dev/null +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/Batch2DaoSvcImplTest.java @@ -0,0 +1,144 @@ +package ca.uhn.fhir.jpa.reindex; + +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.pid.IResourcePidList; +import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; +import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; +import ca.uhn.fhir.jpa.searchparam.MatchUrlService; +import ca.uhn.fhir.jpa.test.BaseJpaR4Test; +import ca.uhn.fhir.model.primitive.IdDt; +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import org.hl7.fhir.instance.model.api.IIdType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.springframework.beans.factory.annotation.Autowired; + +import javax.annotation.Nonnull; +import java.time.LocalDate; +import java.time.Month; +import java.time.ZoneId; +import java.util.Date; +import java.util.List; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +class Batch2DaoSvcImplTest extends BaseJpaR4Test { + + private static final Date PREVIOUS_MILLENNIUM = toDate(LocalDate.of(1999, Month.DECEMBER, 31)); + private static final Date TOMORROW = toDate(LocalDate.now().plusDays(1)); + private static final String URL_PATIENT_EXPUNGE_TRUE = "Patient?_expunge=true"; + private static final String PATIENT = "Patient"; + private static final int INTERNAL_SYNCHRONOUS_SEARCH_SIZE = 10; + + @Autowired + private JpaStorageSettings myJpaStorageSettings; + @Autowired + private MatchUrlService myMatchUrlService; + @Autowired + private IHapiTransactionService myIHapiTransactionService ; + + private DaoRegistry mySpiedDaoRegistry; + + private IBatch2DaoSvc mySubject; + + @BeforeEach + void beforeEach() { + myJpaStorageSettings.setInternalSynchronousSearchSize(INTERNAL_SYNCHRONOUS_SEARCH_SIZE); + + mySpiedDaoRegistry = spy(myDaoRegistry); + + mySubject = new Batch2DaoSvcImpl(myResourceTableDao, myMatchUrlService, mySpiedDaoRegistry, myFhirContext, myIHapiTransactionService, myJpaStorageSettings); + } + + // TODO: LD this test won't work with the nonUrl variant yet: error: No existing transaction found for transaction marked with propagation 'mandatory' + + @Test + void fetchResourcesByUrlEmptyUrl() { + final InternalErrorException exception = assertThrows(InternalErrorException.class, () -> mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, 800, RequestPartitionId.defaultPartition(), "")); + + assertEquals("HAPI-2422: this should never happen: URL is missing a '?'", exception.getMessage()); + } + + @Test + void fetchResourcesByUrlSingleQuestionMark() { + final InternalErrorException exception = assertThrows(InternalErrorException.class, () -> mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, 800, RequestPartitionId.defaultPartition(), "?")); + + assertEquals("HAPI-2223: theResourceName must not be blank", exception.getMessage()); + } + + @Test + void fetchResourcesByUrlNonsensicalResource() { + final InternalErrorException exception = assertThrows(InternalErrorException.class, () -> mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, 800, RequestPartitionId.defaultPartition(), "Banana?_expunge=true")); + + assertEquals("HAPI-2223: HAPI-1684: Unknown resource name \"Banana\" (this name is not known in FHIR version \"R4\")", exception.getMessage()); + } + + @ParameterizedTest + @ValueSource(ints = {0, 9, 10, 11, 21, 22, 23, 45}) + void fetchResourcesByUrl(int expectedNumResults) { + final List patientIds = IntStream.range(0, expectedNumResults) + .mapToObj(num -> createPatient()) + .toList(); + + final IResourcePidList resourcePidList = mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, 800, RequestPartitionId.defaultPartition(), URL_PATIENT_EXPUNGE_TRUE); + + final List actualPatientIds = + resourcePidList.getTypedResourcePids() + .stream() + .map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId())) + .toList(); + assertIdsEqual(patientIds, actualPatientIds); + + verify(mySpiedDaoRegistry, times(getExpectedNumOfInvocations(expectedNumResults))).getResourceDao(PATIENT); + } + + @ParameterizedTest + @ValueSource(ints = {0, 9, 10, 11, 21, 22, 23, 45}) + void fetchResourcesNoUrl(int expectedNumResults) { + final int pageSizeWellBelowThreshold = 2; + final List patientIds = IntStream.range(0, expectedNumResults) + .mapToObj(num -> createPatient()) + .toList(); + + final IResourcePidList resourcePidList = mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, pageSizeWellBelowThreshold, RequestPartitionId.defaultPartition(), null); + + final List actualPatientIds = + resourcePidList.getTypedResourcePids() + .stream() + .map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId())) + .toList(); + assertIdsEqual(patientIds, actualPatientIds); + } + + private int getExpectedNumOfInvocations(int expectedNumResults) { + final int maxResultsPerQuery = INTERNAL_SYNCHRONOUS_SEARCH_SIZE + 1; + final int division = expectedNumResults / maxResultsPerQuery; + return division + 1; + } + + private static void assertIdsEqual(List expectedResourceIds, List actualResourceIds) { + assertEquals(expectedResourceIds.size(), actualResourceIds.size()); + + for (int index = 0; index < expectedResourceIds.size(); index++) { + final IIdType expectedIdType = expectedResourceIds.get(index); + final IIdType actualIdType = actualResourceIds.get(index); + + assertEquals(expectedIdType.getResourceType(), actualIdType.getResourceType()); + assertEquals(expectedIdType.getIdPartAsLong(), actualIdType.getIdPartAsLong()); + } + } + + @Nonnull + private static Date toDate(LocalDate theLocalDate) { + return Date.from(theLocalDate.atStartOfDay(ZoneId.systemDefault()).toInstant()); + } +} diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ResourceReindexSvcImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ResourceReindexSvcImplTest.java index 01b2afeeadfd..56c09865ea01 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ResourceReindexSvcImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ResourceReindexSvcImplTest.java @@ -3,16 +3,13 @@ import ca.uhn.fhir.jpa.api.pid.IResourcePidList; import ca.uhn.fhir.jpa.api.pid.TypedResourcePid; import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; -import ca.uhn.fhir.jpa.model.dao.JpaPid; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; -import org.hl7.fhir.r4.model.DateType; -import org.hl7.fhir.r4.model.InstantType; +import org.hl7.fhir.instance.model.api.IIdType; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -111,26 +108,26 @@ public void testFetchResourceIdsPage_WithUrl_WithData() { // Setup - createPatient(withActiveFalse()); + final Long patientId0 = createPatient(withActiveFalse()).getIdPartAsLong(); sleepUntilTimeChanges(); // Start of resources within range Date start = new Date(); sleepUntilTimeChanges(); - Long id0 = createPatient(withActiveFalse()).getIdPartAsLong(); + Long patientId1 = createPatient(withActiveFalse()).getIdPartAsLong(); createObservation(withObservationCode("http://foo", "bar")); createObservation(withObservationCode("http://foo", "bar")); sleepUntilTimeChanges(); Date beforeLastInRange = new Date(); sleepUntilTimeChanges(); - Long id1 = createPatient(withActiveFalse()).getIdPartAsLong(); + Long patientId2 = createPatient(withActiveFalse()).getIdPartAsLong(); sleepUntilTimeChanges(); Date end = new Date(); sleepUntilTimeChanges(); // End of resources within range createObservation(withObservationCode("http://foo", "bar")); - createPatient(withActiveFalse()); + final Long patientId3 = createPatient(withActiveFalse()).getIdPartAsLong(); sleepUntilTimeChanges(); // Execute @@ -140,13 +137,17 @@ public void testFetchResourceIdsPage_WithUrl_WithData() { // Verify - assertEquals(2, page.size()); + assertEquals(4, page.size()); List typedResourcePids = page.getTypedResourcePids(); - assertThat(page.getTypedResourcePids(), contains(new TypedResourcePid("Patient", id0), new TypedResourcePid("Patient", id1))); + assertThat(page.getTypedResourcePids(), + contains(new TypedResourcePid("Patient", patientId0), + new TypedResourcePid("Patient", patientId1), + new TypedResourcePid("Patient", patientId2), + new TypedResourcePid("Patient", patientId3))); assertTrue(page.getLastDate().after(beforeLastInRange)); - assertTrue(page.getLastDate().before(end)); + assertTrue(page.getLastDate().before(end) || page.getLastDate().equals(end)); - assertEquals(3, myCaptureQueriesListener.logSelectQueries().size()); + assertEquals(1, myCaptureQueriesListener.logSelectQueries().size()); assertEquals(0, myCaptureQueriesListener.countInsertQueries()); assertEquals(0, myCaptureQueriesListener.countUpdateQueries()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStep.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStep.java index 716886f1b8c2..c5b4f289ebcd 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStep.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStep.java @@ -30,18 +30,17 @@ import ca.uhn.fhir.batch2.jobs.parameters.PartitionedJobParameters; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.pid.IResourcePidList; -import ca.uhn.fhir.jpa.api.pid.TypedResourcePid; -import ca.uhn.fhir.system.HapiSystemProperties; import ca.uhn.fhir.util.Logs; +import com.google.common.collect.Iterators; +import com.google.common.collect.UnmodifiableIterator; import org.slf4j.Logger; -import java.util.ArrayList; import java.util.Collection; import java.util.Date; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nonnull; public class ResourceIdListStep @@ -75,11 +74,8 @@ public RunOutcome run( ourLog.info("Beginning scan for reindex IDs in range {} to {}", start, end); - Date nextStart = start; RequestPartitionId requestPartitionId = theStepExecutionDetails.getParameters().getRequestPartitionId(); - Set idBuffer = new LinkedHashSet<>(); - long previousLastTime = 0L; int totalIdsFound = 0; int chunkCount = 0; @@ -88,55 +84,29 @@ public RunOutcome run( // we won't go over MAX_BATCH_OF_IDS maxBatchId = Math.min(batchSize.intValue(), maxBatchId); } - while (true) { - IResourcePidList nextChunk = myIdChunkProducer.fetchResourceIdsPage( - nextStart, end, pageSize, requestPartitionId, theStepExecutionDetails.getData()); - - if (nextChunk.isEmpty()) { - ourLog.info("No data returned"); - break; - } - - // If we get the same last time twice in a row, we've clearly reached the end - if (nextChunk.getLastDate().getTime() == previousLastTime) { - ourLog.info("Matching final timestamp of {}, loading is completed", new Date(previousLastTime)); - break; - } - - ourLog.info("Found {} IDs from {} to {}", nextChunk.size(), nextStart, nextChunk.getLastDate()); - if (nextChunk.size() < 10 && HapiSystemProperties.isTestModeEnabled()) { - // TODO: I've added this in order to troubleshoot MultitenantBatchOperationR4Test - // which is failing intermittently. If that stops, makes sense to remove this - ourLog.info(" * PIDS: {}", nextChunk); - } - - for (TypedResourcePid typedResourcePid : nextChunk.getTypedResourcePids()) { - TypedPidJson nextId = new TypedPidJson(typedResourcePid); - idBuffer.add(nextId); - } - - previousLastTime = nextChunk.getLastDate().getTime(); - nextStart = nextChunk.getLastDate(); - - while (idBuffer.size() > maxBatchId) { - List submissionIds = new ArrayList<>(); - for (Iterator iter = idBuffer.iterator(); iter.hasNext(); ) { - submissionIds.add(iter.next()); - iter.remove(); - if (submissionIds.size() == maxBatchId) { - break; - } - } - - totalIdsFound += submissionIds.size(); - chunkCount++; - submitWorkChunk(submissionIds, nextChunk.getRequestPartitionId(), theDataSink); - } + + final IResourcePidList nextChunk = myIdChunkProducer.fetchResourceIdsPage( + start, end, pageSize, requestPartitionId, theStepExecutionDetails.getData()); + + if (nextChunk.isEmpty()) { + ourLog.info("No data returned"); } - totalIdsFound += idBuffer.size(); - chunkCount++; - submitWorkChunk(idBuffer, requestPartitionId, theDataSink); + ourLog.debug("Found {} IDs from {} to {}", nextChunk.size(), start, nextChunk.getLastDate()); + + final Set idBuffer = nextChunk.getTypedResourcePids().stream() + .map(TypedPidJson::new) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + final UnmodifiableIterator> partition = Iterators.partition(idBuffer.iterator(), maxBatchId); + + while (partition.hasNext()) { + final List submissionIds = partition.next(); + + totalIdsFound += submissionIds.size(); + chunkCount++; + submitWorkChunk(submissionIds, nextChunk.getRequestPartitionId(), theDataSink); + } ourLog.info("Submitted {} chunks with {} resource IDs", chunkCount, totalIdsFound); return RunOutcome.SUCCESS; diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStepTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStepTest.java index ed333913bbed..ac06d4ba3ced 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStepTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStepTest.java @@ -6,7 +6,6 @@ import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters; import ca.uhn.fhir.batch2.model.JobInstance; -import ca.uhn.fhir.jpa.api.pid.EmptyResourcePidList; import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList; import ca.uhn.fhir.jpa.api.pid.IResourcePidList; import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; @@ -39,8 +38,6 @@ public class LoadIdsStepTest { public static final Date DATE_1 = new InstantType("2022-01-01T00:00:00Z").getValue(); public static final Date DATE_2 = new InstantType("2022-01-02T00:00:00Z").getValue(); - public static final Date DATE_3 = new InstantType("2022-01-03T00:00:00Z").getValue(); - public static final Date DATE_4 = new InstantType("2022-01-04T00:00:00Z").getValue(); public static final Date DATE_END = new InstantType("2022-02-01T00:00:00Z").getValue(); @Mock @@ -73,24 +70,20 @@ public void testGenerateSteps() { when(myBatch2DaoSvc.fetchResourceIdsPage(eq(DATE_1), eq(DATE_END), eq(DEFAULT_PAGE_SIZE), isNull(), isNull())) .thenReturn(createIdChunk(0L, 20000L, DATE_2)); - when(myBatch2DaoSvc.fetchResourceIdsPage(eq(DATE_2), eq(DATE_END), eq(DEFAULT_PAGE_SIZE), isNull(), isNull())) - .thenReturn(createIdChunk(20000L, 40000L, DATE_3)); - when(myBatch2DaoSvc.fetchResourceIdsPage(eq(DATE_3), eq(DATE_END), eq(DEFAULT_PAGE_SIZE), isNull(), isNull())) - .thenReturn(createIdChunk(40000L, 40040L, DATE_4)); - when(myBatch2DaoSvc.fetchResourceIdsPage(eq(DATE_4), eq(DATE_END), eq(DEFAULT_PAGE_SIZE), isNull(), isNull())) - .thenReturn(new EmptyResourcePidList()); mySvc.run(details, mySink); - verify(mySink, times(81)).accept(myChunkIdsCaptor.capture()); - for (int i = 0; i < 80; i++) { + final int expectedLoops = 40; + verify(mySink, times(40)).accept(myChunkIdsCaptor.capture()); + + final List allCapturedValues = myChunkIdsCaptor.getAllValues(); + for (int i = 0; i < expectedLoops ; i++) { String expected = createIdChunk(i * 500, (i * 500) + 500).toString(); - String actual = myChunkIdsCaptor.getAllValues().get(i).toString(); + String actual = allCapturedValues.get(i).toString(); assertEquals(expected, actual); } - assertEquals(createIdChunk(40000, 40040).toString(), - myChunkIdsCaptor.getAllValues().get(80).toString()); - + final ResourceIdListWorkChunkJson expectedIdChunk = createIdChunk(19500, 20000); + assertEquals(expectedIdChunk.toString(), allCapturedValues.get(expectedLoops -1).toString()); } @Nonnull diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStepTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStepTest.java index 222ace385670..1fb51f4cc8d6 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStepTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStepTest.java @@ -56,7 +56,7 @@ void beforeEach() { } @ParameterizedTest - @ValueSource(ints = {1, 100, 500, 501, 2345}) + @ValueSource(ints = {1, 100, 500, 501, 2345, 10500}) void testResourceIdListBatchSizeLimit(int theListSize) { List idList = generateIdList(theListSize); when(myStepExecutionDetails.getData()).thenReturn(myData); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/svc/IBatch2DaoSvc.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/svc/IBatch2DaoSvc.java index 07d366fd2d1b..d6759d53bde1 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/svc/IBatch2DaoSvc.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/svc/IBatch2DaoSvc.java @@ -19,6 +19,7 @@ */ package ca.uhn.fhir.jpa.api.svc; +import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.pid.IResourcePidList; @@ -34,18 +35,37 @@ public interface IBatch2DaoSvc { boolean isAllResourceTypeSupported(); /** + * Fetches a page of resource IDs for all resource types. The page size is up to the discretion of the implementation. + * + * @param theStart The start of the date range, must be inclusive. + * @param theEnd The end of the date range, should be exclusive. + * @param theRequestPartitionId The request partition ID (may be null on non-partitioned systems) + * @param theUrl The search URL, or null to return IDs for all resources across all resource types. Null will only be supplied if {@link #isAllResourceTypeSupported()} returns true. + */ + default IResourcePidList fetchResourceIdsPage( + Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) { + throw new UnsupportedOperationException(Msg.code(2425) + "Not implemented unless explicitly overridden"); + } + + // TODO: LD: eliminate this call in all other implementors + /** + * @deprecated Please call (@link {@link #fetchResourceIdsPage(Date, Date, RequestPartitionId, String)} instead. + *

* Fetches a page of resource IDs for all resource types. The page size is up to the discretion of the implementation. * * @param theStart The start of the date range, must be inclusive. * @param theEnd The end of the date range, should be exclusive. * @param thePageSize The number of records to query in each pass. - * @param theRequestPartitionId The request partition ID (may be null on nonpartitioned systems) + * @param theRequestPartitionId The request partition ID (may be null on non-partitioned systems) * @param theUrl The search URL, or null to return IDs for all resources across all resource types. Null will only be supplied if {@link #isAllResourceTypeSupported()} returns true. */ - IResourcePidList fetchResourceIdsPage( + @Deprecated + default IResourcePidList fetchResourceIdsPage( Date theStart, Date theEnd, @Nonnull Integer thePageSize, @Nullable RequestPartitionId theRequestPartitionId, - @Nullable String theUrl); + @Nullable String theUrl) { + return fetchResourceIdsPage(theStart, theEnd, theRequestPartitionId, theUrl); + } }