diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java index 68ac7aba5cb13..13de715540447 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java @@ -153,7 +153,9 @@ public Pair, String> getNextEventsFromQueue(AmazonSQS sqs, for (Map eventRecord : eventRecords) { filteredEventRecords.add(new ObjectMapper().writeValueAsString(eventRecord).replace("%3D", "=")); } - return new ImmutablePair<>(filteredEventRecords, String.valueOf(newCheckpointTime)); + // Return the old checkpoint if no messages to consume from queue. + String newCheckpoint = newCheckpointTime == 0 ? lastCheckpointStr.orElse(null) : String.valueOf(newCheckpointTime); + return new ImmutablePair<>(filteredEventRecords, newCheckpoint); } catch (JSONException | IOException e) { throw new HoodieException("Unable to read from SQS: ", e); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java index 2208543c08d1d..f38e89b217bf9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java @@ -27,11 +27,14 @@ import org.apache.hudi.utilities.testutils.CloudObjectTestUtils; import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; +import com.amazonaws.services.sqs.model.GetQueueAttributesResult; import com.amazonaws.services.sqs.model.Message; import org.apache.hadoop.fs.Path; import org.json.JSONObject; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; @@ -43,8 +46,12 @@ import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION; import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES; import static org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector.REGION_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; public class TestS3EventsMetaSelector extends HoodieClientTestHarness { @@ -102,4 +109,21 @@ public void testNextEventsFromQueueShouldReturnsEventsFromQueue(Class clazz) .getString("key")); assertEquals("1627376736755", eventFromQueue.getRight()); } + + @Test + public void testEventsFromQueueNoMessages() { + S3EventsMetaSelector selector = new S3EventsMetaSelector(props); + when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))) + .thenReturn( + new GetQueueAttributesResult() + .addAttributesEntry(SQS_ATTR_APPROX_MESSAGES, "0")); + + List processed = new ArrayList<>(); + Pair, String> eventFromQueue = + selector.getNextEventsFromQueue(sqs, Option.empty(), processed); + + assertEquals(0, eventFromQueue.getLeft().size()); + assertEquals(0, processed.size()); + assertNull(eventFromQueue.getRight()); + } }