newRecordKeysSorted = new PriorityQueue<>();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
index 9ab44d0f62f1b..ebbc7a5c28ea1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
@@ -28,11 +28,14 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.concurrent.NotThreadSafe;
+
/**
* A HoodieCreateHandle which writes all data into a single file.
*
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
+@NotThreadSafe
public class HoodieUnboundedCreateHandle extends HoodieCreateHandle {
private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index 957a0ff52e91d..f2536378807d6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -31,13 +31,18 @@
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
* the current file can take more records with the canWrite()
+ *
+ * ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close
*/
+@NotThreadSafe
public class HoodieParquetWriter
extends ParquetWriter implements HoodieFileWriter {
@@ -107,4 +112,9 @@ public void writeAvro(String key, IndexedRecord object) throws IOException {
writeSupport.add(key);
}
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 2b4a5d1608eec..2b42ba8aba2bf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -99,13 +99,16 @@ public void runMerge(HoodieTable>, HoodieData computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index 38d4e60f648ec..31312655251ab 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -102,13 +102,16 @@ public void runMerge(HoodieTable>, List, List
} catch (Exception e) {
throw new HoodieException(e);
} finally {
+ // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
+ // and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
- mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ mergeHandle.close();
}
}
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
index f91dd5019a275..9821aedc875cd 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -74,6 +74,7 @@ protected List computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
index 7878d857761ea..46dd30a7cb773 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
@@ -102,13 +102,16 @@ public void runMerge(HoodieTable>, List, List
} catch (Exception e) {
throw new HoodieException(e);
} finally {
+ // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
+ // and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
- mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ mergeHandle.close();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index a8a9e49c01c00..df5bd2d3f458c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -95,6 +95,7 @@ protected List computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index e3d0e9b3c69d4..96ac794dcbc82 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -80,10 +80,11 @@ void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle, Path so
} catch (Exception e) {
throw new HoodieException(e);
} finally {
- bootstrapHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ bootstrapHandle.close();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index d07ea771bc557..5f45629ba8023 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -68,9 +68,9 @@ Schema getAvroSchema(Path sourceFilePath) throws IOException {
void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle,
Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
BoundedInMemoryExecutor wrapper = null;
+ ParquetReader reader =
+ AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build();
try {
- ParquetReader reader =
- AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build();
wrapper = new BoundedInMemoryExecutor(config.getWriteBufferLimitBytes(),
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
@@ -84,10 +84,12 @@ void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle,
} catch (Exception e) {
throw new HoodieException(e);
} finally {
- bootstrapHandle.close();
+ reader.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ bootstrapHandle.close();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index 91f9cbc96e6ed..a714d60d0033a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -28,6 +28,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
@@ -35,6 +36,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Iterator;
import java.util.List;
import scala.Tuple2;
@@ -105,6 +107,7 @@ protected Integer getResult() {
} finally {
if (executor != null) {
executor.shutdownNow();
+ executor.awaitTermination();
}
}
}
@@ -152,7 +155,49 @@ protected Integer getResult() {
} finally {
if (executor != null) {
executor.shutdownNow();
+ executor.awaitTermination();
}
}
}
+
+ @Test
+ public void testExecutorTermination() {
+ HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+ when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
+ Iterator unboundedRecordIter = new Iterator() {
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public GenericRecord next() {
+ return dataGen.generateGenericRecord();
+ }
+ };
+
+ BoundedInMemoryQueueConsumer, Integer> consumer =
+ new BoundedInMemoryQueueConsumer, Integer>() {
+ @Override
+ protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) {
+ }
+
+ @Override
+ protected void finish() {
+ }
+
+ @Override
+ protected Integer getResult() {
+ return 0;
+ }
+ };
+
+ BoundedInMemoryExecutor>, Integer> executor =
+ new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
+ consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+ getPreExecuteRunnable());
+ executor.shutdownNow();
+ boolean terminatedGracefully = executor.awaitTermination();
+ assertTrue(terminatedGracefully);
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index d1e5e66083196..46ef5dc40caf8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -37,6 +37,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -48,7 +49,7 @@
public class BoundedInMemoryExecutor {
private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
-
+ private static final long TERMINATE_WAITING_TIME_SECS = 60L;
// Executor service used for launching write thread.
private final ExecutorService producerExecutorService;
// Executor service used for launching read thread.
@@ -168,6 +169,27 @@ public boolean isRemaining() {
public void shutdownNow() {
producerExecutorService.shutdownNow();
consumerExecutorService.shutdownNow();
+ // close queue to force producer stop
+ queue.close();
+ }
+
+ public boolean awaitTermination() {
+ // if current thread has been interrupted before awaitTermination was called, we still give
+ // executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return.
+ boolean interruptedBefore = Thread.interrupted();
+ boolean producerTerminated = false;
+ boolean consumerTerminated = false;
+ try {
+ producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ // fail silently for any other interruption
+ }
+ // reset interrupt flag if needed
+ if (interruptedBefore) {
+ Thread.currentThread().interrupt();
+ }
+ return producerTerminated && consumerTerminated;
}
public BoundedInMemoryQueue getQueue() {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 3e147b7fdd47c..dcb9fc639c34f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -860,12 +860,14 @@ public boolean deleteExistingKeyIfPresent(HoodieKey key) {
return false;
}
+ public GenericRecord generateGenericRecord() {
+ return generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
+ genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong());
+ }
+
public List generateGenericRecords(int numRecords) {
List list = new ArrayList<>();
- IntStream.range(0, numRecords).forEach(i -> {
- list.add(generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
- genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong()));
- });
+ IntStream.range(0, numRecords).forEach(i -> list.add(generateGenericRecord()));
return list;
}