From 15b4a6ea721c534be4919e4c49bf58b23dd93560 Mon Sep 17 00:00:00 2001 From: rmahindra123 Date: Thu, 16 Nov 2023 01:26:35 -0800 Subject: [PATCH 1/2] Fix sqs delete and deltasync close --- .../sources/helpers/CloudObjectsSelector.java | 12 ++++++++---- .../hudi/utilities/streamer/ErrorTableUtils.java | 2 +- .../hudi/utilities/streamer/HoodieStreamer.java | 2 -- .../apache/hudi/utilities/streamer/StreamSync.java | 5 ++--- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java index efe2913255f38..8c447d93a0ffd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java @@ -200,9 +200,12 @@ protected List> createListPartitions(List singleList, int * Delete batch of messages from queue. */ protected void deleteBatchOfMessages(SqsClient sqs, String queueUrl, List messagesToBeDeleted) { - DeleteMessageBatchRequest deleteBatchReq = - DeleteMessageBatchRequest.builder().queueUrl(queueUrl).build(); - List deleteEntries = new ArrayList<>(deleteBatchReq.entries()); + if (messagesToBeDeleted.isEmpty()) { + return; + } + DeleteMessageBatchRequest.Builder builder = DeleteMessageBatchRequest.builder().queueUrl(queueUrl); + List deleteEntries = new ArrayList<>(); + for (Message message : messagesToBeDeleted) { deleteEntries.add( DeleteMessageBatchRequestEntry.builder() @@ -210,7 +213,8 @@ protected void deleteBatchOfMessages(SqsClient sqs, String queueUrl, List deleteFailures = deleteResponse.failed().stream() .map(BatchResultErrorEntry::id) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java index 694990cf1fa0d..8907a1b664783 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java @@ -64,7 +64,7 @@ public static Option getErrorTableWriter(HoodieStreamer.Co public static HoodieErrorTableConfig.ErrorWriteFailureStrategy getErrorWriteFailureStrategy( TypedProperties props) { - String writeFailureStrategy = props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key()); + String writeFailureStrategy = props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key(), ERROR_TABLE_WRITE_FAILURE_STRATEGY.defaultValue()); return HoodieErrorTableConfig.ErrorWriteFailureStrategy.valueOf(writeFailureStrategy); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 5604a6240c71c..7bae06db82282 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -844,8 +844,6 @@ public void ingestOnce() { streamSync.syncOnce(); } catch (IOException e) { throw new HoodieIngestionException(String.format("Ingestion via %s failed with exception.", this.getClass()), e); - } finally { - close(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 6626084aa2bdf..65abcc542b7f4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -119,12 +119,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import scala.Tuple2; import scala.collection.JavaConversions; @@ -975,7 +974,7 @@ private String getSyncClassShortName(String syncClassName) { } public void runMetaSync() { - Set syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(","))); + List syncClientToolClasses = Arrays.stream(cfg.syncClientToolClassNames.split(",")).distinct().collect(Collectors.toList()); // for backward compatibility if (cfg.enableHiveSync) { cfg.enableMetaSync = true; From 8ff6f254f37ad8b30e6a1f74e2e5011f1c29c2cb Mon Sep 17 00:00:00 2001 From: rmahindra123 Date: Mon, 20 Nov 2023 16:37:15 -0800 Subject: [PATCH 2/2] Reverted close change --- .../java/org/apache/hudi/utilities/streamer/HoodieStreamer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 7bae06db82282..5604a6240c71c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -844,6 +844,8 @@ public void ingestOnce() { streamSync.syncOnce(); } catch (IOException e) { throw new HoodieIngestionException(String.format("Ingestion via %s failed with exception.", this.getClass()), e); + } finally { + close(); } }