From d794f4fbf9d8ce4c90507e4de36121ac1fc2fd4b Mon Sep 17 00:00:00 2001
From: qianchutao <72595723+qianchutao@users.noreply.github.com>
Date: Fri, 6 May 2022 00:33:06 +0800
Subject: [PATCH 01/52] [MINOR] Optimize code logic (#5499)
---
.../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 56124b82afc06..824c7375fa07a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -625,8 +625,8 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config
ValidationUtils.checkArgument(baseFileFormat.equals(cfg.baseFileFormat) || cfg.baseFileFormat == null,
"Hoodie table's base file format is of type " + baseFileFormat + " but passed in CLI argument is "
+ cfg.baseFileFormat);
- cfg.baseFileFormat = meta.getTableConfig().getBaseFileFormat().toString();
- this.cfg.baseFileFormat = cfg.baseFileFormat;
+ cfg.baseFileFormat = baseFileFormat;
+ this.cfg.baseFileFormat = baseFileFormat;
} else {
tableType = HoodieTableType.valueOf(cfg.tableType);
if (cfg.baseFileFormat == null) {
From abb4893b25df47328d01890f0d01fbc4e5d99135 Mon Sep 17 00:00:00 2001
From: guanziyue <30882822+guanziyue@users.noreply.github.com>
Date: Fri, 6 May 2022 04:49:34 +0800
Subject: [PATCH 02/52] [HUDI-2875] Make HoodieParquetWriter Thread safe and
memory executor exit gracefully (#4264)
---
.../apache/hudi/io/HoodieConcatHandle.java | 3 ++
.../apache/hudi/io/HoodieCreateHandle.java | 3 ++
.../org/apache/hudi/io/HoodieMergeHandle.java | 3 ++
.../hudi/io/HoodieSortedMergeHandle.java | 3 ++
.../hudi/io/HoodieUnboundedCreateHandle.java | 3 ++
.../hudi/io/storage/HoodieParquetWriter.java | 10 +++++
.../action/commit/HoodieMergeHelper.java | 5 ++-
.../execution/FlinkLazyInsertIterable.java | 1 +
.../table/action/commit/FlinkMergeHelper.java | 5 ++-
.../execution/JavaLazyInsertIterable.java | 1 +
.../table/action/commit/JavaMergeHelper.java | 5 ++-
.../execution/SparkLazyInsertIterable.java | 1 +
.../OrcBootstrapMetadataHandler.java | 3 +-
.../ParquetBootstrapMetadataHandler.java | 8 ++--
.../TestBoundedInMemoryExecutorInSpark.java | 45 +++++++++++++++++++
.../util/queue/BoundedInMemoryExecutor.java | 24 +++++++++-
.../testutils/HoodieTestDataGenerator.java | 10 +++--
17 files changed, 121 insertions(+), 12 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
index 022f600b5e078..ca245e0c391ba 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
@@ -35,6 +35,8 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
@@ -66,6 +68,7 @@
* Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not
* happen and every batch should have new records to be inserted. Above example is for illustration purposes only.
*/
+@NotThreadSafe
public class HoodieConcatHandle extends HoodieMergeHandle {
private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 41d583668a933..43a8c12324136 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -42,12 +42,15 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+@NotThreadSafe
public class HoodieCreateHandle extends HoodieWriteHandle {
private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 3363571ddf0cb..2e2a894f5e96c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -54,6 +54,8 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
@@ -91,6 +93,7 @@
*
*
*/
+@NotThreadSafe
public class HoodieMergeHandle extends HoodieWriteHandle {
private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index d6c1d1be40f36..7dce31a4c349b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -32,6 +32,8 @@
import org.apache.avro.generic.GenericRecord;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -45,6 +47,7 @@
* The implementation performs a merge-sort by comparing the key of the record being written to the list of
* keys in newRecordKeys (sorted in-memory).
*/
+@NotThreadSafe
public class HoodieSortedMergeHandle extends HoodieMergeHandle {
private final Queue newRecordKeysSorted = new PriorityQueue<>();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
index 9ab44d0f62f1b..ebbc7a5c28ea1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
@@ -28,11 +28,14 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.concurrent.NotThreadSafe;
+
/**
* A HoodieCreateHandle which writes all data into a single file.
*
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
+@NotThreadSafe
public class HoodieUnboundedCreateHandle extends HoodieCreateHandle {
private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index 5b3c69ddf943e..095cacc144a9a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -30,13 +30,18 @@
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
* the current file can take more records with the canWrite()
+ *
+ * ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close
*/
+@NotThreadSafe
public class HoodieParquetWriter
extends ParquetWriter implements HoodieFileWriter {
@@ -106,4 +111,9 @@ public void writeAvro(String key, IndexedRecord object) throws IOException {
writeSupport.add(key);
}
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 04dd29c63c5b4..3e2d8abdd7466 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -148,13 +148,16 @@ public void runMerge(HoodieTable>, HoodieData computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index 38d4e60f648ec..31312655251ab 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -102,13 +102,16 @@ public void runMerge(HoodieTable>, List, List
} catch (Exception e) {
throw new HoodieException(e);
} finally {
+ // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
+ // and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
- mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ mergeHandle.close();
}
}
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
index f91dd5019a275..9821aedc875cd 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -74,6 +74,7 @@ protected List computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
index 7878d857761ea..46dd30a7cb773 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
@@ -102,13 +102,16 @@ public void runMerge(HoodieTable>, List, List
} catch (Exception e) {
throw new HoodieException(e);
} finally {
+ // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
+ // and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
- mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ mergeHandle.close();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index a8a9e49c01c00..df5bd2d3f458c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -95,6 +95,7 @@ protected List computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index e3d0e9b3c69d4..96ac794dcbc82 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -80,10 +80,11 @@ void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle, Path so
} catch (Exception e) {
throw new HoodieException(e);
} finally {
- bootstrapHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ bootstrapHandle.close();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index d07ea771bc557..5f45629ba8023 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -68,9 +68,9 @@ Schema getAvroSchema(Path sourceFilePath) throws IOException {
void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle,
Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
BoundedInMemoryExecutor wrapper = null;
+ ParquetReader reader =
+ AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build();
try {
- ParquetReader reader =
- AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build();
wrapper = new BoundedInMemoryExecutor(config.getWriteBufferLimitBytes(),
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
@@ -84,10 +84,12 @@ void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle,
} catch (Exception e) {
throw new HoodieException(e);
} finally {
- bootstrapHandle.close();
+ reader.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ bootstrapHandle.close();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index 91f9cbc96e6ed..a714d60d0033a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -28,6 +28,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
@@ -35,6 +36,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Iterator;
import java.util.List;
import scala.Tuple2;
@@ -105,6 +107,7 @@ protected Integer getResult() {
} finally {
if (executor != null) {
executor.shutdownNow();
+ executor.awaitTermination();
}
}
}
@@ -152,7 +155,49 @@ protected Integer getResult() {
} finally {
if (executor != null) {
executor.shutdownNow();
+ executor.awaitTermination();
}
}
}
+
+ @Test
+ public void testExecutorTermination() {
+ HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+ when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
+ Iterator unboundedRecordIter = new Iterator() {
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public GenericRecord next() {
+ return dataGen.generateGenericRecord();
+ }
+ };
+
+ BoundedInMemoryQueueConsumer, Integer> consumer =
+ new BoundedInMemoryQueueConsumer, Integer>() {
+ @Override
+ protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) {
+ }
+
+ @Override
+ protected void finish() {
+ }
+
+ @Override
+ protected Integer getResult() {
+ return 0;
+ }
+ };
+
+ BoundedInMemoryExecutor>, Integer> executor =
+ new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
+ consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+ getPreExecuteRunnable());
+ executor.shutdownNow();
+ boolean terminatedGracefully = executor.awaitTermination();
+ assertTrue(terminatedGracefully);
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index d1e5e66083196..46ef5dc40caf8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -37,6 +37,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -48,7 +49,7 @@
public class BoundedInMemoryExecutor {
private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
-
+ private static final long TERMINATE_WAITING_TIME_SECS = 60L;
// Executor service used for launching write thread.
private final ExecutorService producerExecutorService;
// Executor service used for launching read thread.
@@ -168,6 +169,27 @@ public boolean isRemaining() {
public void shutdownNow() {
producerExecutorService.shutdownNow();
consumerExecutorService.shutdownNow();
+ // close queue to force producer stop
+ queue.close();
+ }
+
+ public boolean awaitTermination() {
+ // if current thread has been interrupted before awaitTermination was called, we still give
+ // executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return.
+ boolean interruptedBefore = Thread.interrupted();
+ boolean producerTerminated = false;
+ boolean consumerTerminated = false;
+ try {
+ producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ // fail silently for any other interruption
+ }
+ // reset interrupt flag if needed
+ if (interruptedBefore) {
+ Thread.currentThread().interrupt();
+ }
+ return producerTerminated && consumerTerminated;
}
public BoundedInMemoryQueue getQueue() {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index cb4f5570743a6..e05d5f6f3e088 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -860,12 +860,14 @@ public boolean deleteExistingKeyIfPresent(HoodieKey key) {
return false;
}
+ public GenericRecord generateGenericRecord() {
+ return generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
+ genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong());
+ }
+
public List generateGenericRecords(int numRecords) {
List list = new ArrayList<>();
- IntStream.range(0, numRecords).forEach(i -> {
- list.add(generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
- genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong()));
- });
+ IntStream.range(0, numRecords).forEach(i -> list.add(generateGenericRecord()));
return list;
}
From 248b0591b031ef5d017bc4f710b9b0496e751c39 Mon Sep 17 00:00:00 2001
From: Jin Xing
Date: Fri, 6 May 2022 15:29:47 +0800
Subject: [PATCH 03/52] [HUDI-4042] Support truncate-partition for Spark-3.2
(#5506)
---
.../hudi/analysis/HoodieSpark3Analysis.scala | 23 ++++++++++---------
1 file changed, 12 insertions(+), 11 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
index e20f934592e45..4c77733b144aa 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
@@ -179,21 +179,22 @@ case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends Rul
case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
- case ShowPartitions(child, specOpt, _)
- if child.isInstanceOf[ResolvedTable] &&
- child.asInstanceOf[ResolvedTable].table.isInstanceOf[HoodieInternalV2Table] =>
- ShowHoodieTablePartitionsCommand(child.asInstanceOf[ResolvedTable].identifier.asTableIdentifier, specOpt.map(s => s.asInstanceOf[UnresolvedPartitionSpec].spec))
+ case ShowPartitions(ResolvedTable(_, idt, _: HoodieInternalV2Table, _), specOpt, _) =>
+ ShowHoodieTablePartitionsCommand(
+ idt.asTableIdentifier, specOpt.map(s => s.asInstanceOf[UnresolvedPartitionSpec].spec))
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand
- case TruncateTable(child)
- if child.isInstanceOf[ResolvedTable] &&
- child.asInstanceOf[ResolvedTable].table.isInstanceOf[HoodieInternalV2Table] =>
- new TruncateHoodieTableCommand(child.asInstanceOf[ResolvedTable].identifier.asTableIdentifier, None)
+ case TruncateTable(ResolvedTable(_, idt, _: HoodieInternalV2Table, _)) =>
+ TruncateHoodieTableCommand(idt.asTableIdentifier, None)
- case DropPartitions(child, specs, ifExists, purge)
- if child.resolved && child.isInstanceOf[ResolvedTable] && child.asInstanceOf[ResolvedTable].table.isInstanceOf[HoodieInternalV2Table] =>
+ case TruncatePartition(
+ ResolvedTable(_, idt, _: HoodieInternalV2Table, _),
+ partitionSpec: UnresolvedPartitionSpec) =>
+ TruncateHoodieTableCommand(idt.asTableIdentifier, Some(partitionSpec.spec))
+
+ case DropPartitions(ResolvedTable(_, idt, _: HoodieInternalV2Table, _), specs, ifExists, purge) =>
AlterHoodieTableDropPartitionCommand(
- child.asInstanceOf[ResolvedTable].identifier.asTableIdentifier,
+ idt.asTableIdentifier,
specs.seq.map(f => f.asInstanceOf[UnresolvedPartitionSpec]).map(s => s.spec),
ifExists,
purge,
From c319ee9cea78544406e30dd36cc603fd0a0283db Mon Sep 17 00:00:00 2001
From: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
Date: Fri, 6 May 2022 05:52:06 -0700
Subject: [PATCH 04/52] [HUDI-4017] Improve spark sql coverage in CI (#5512)
Add GitHub actions tasks to run spark sql UTs under spark 3.1 and 3.2.
---
.github/workflows/bot.yml | 8 ++++++++
.../hudi/functional/TestSqlStatement.scala | 4 ++--
.../SpaceCurveOptimizeBenchmark.scala | 4 ++--
...ase.scala => HoodieSparkSqlTestBase.scala} | 2 +-
.../spark/sql/hudi/TestAlterTable.scala | 2 +-
.../hudi/TestAlterTableDropPartition.scala | 2 +-
.../spark/sql/hudi/TestCompactionTable.scala | 2 +-
.../spark/sql/hudi/TestCreateTable.scala | 2 +-
.../spark/sql/hudi/TestDeleteTable.scala | 2 +-
.../apache/spark/sql/hudi/TestDropTable.scala | 2 +-
.../sql/hudi/TestHoodieOptionConfig.scala | 20 +++----------------
.../spark/sql/hudi/TestInsertTable.scala | 2 +-
.../sql/hudi/TestMergeIntoLogOnlyTable.scala | 2 +-
.../spark/sql/hudi/TestMergeIntoTable.scala | 2 +-
.../spark/sql/hudi/TestMergeIntoTable2.scala | 2 +-
.../hudi/TestPartialUpdateForMergeInto.scala | 2 +-
.../spark/sql/hudi/TestShowPartitions.scala | 2 +-
.../apache/spark/sql/hudi/TestSpark3DDL.scala | 2 +-
.../apache/spark/sql/hudi/TestSqlConf.scala | 2 +-
.../spark/sql/hudi/TestTimeTravelTable.scala | 2 +-
.../spark/sql/hudi/TestTruncateTable.scala | 2 +-
.../spark/sql/hudi/TestUpdateTable.scala | 2 +-
.../procedure/TestCallCommandParser.scala | 4 ++--
.../hudi/procedure/TestCallProcedure.scala | 4 ++--
.../procedure/TestClusteringProcedure.scala | 4 ++--
.../procedure/TestCompactionProcedure.scala | 4 ++--
.../procedure/TestSavepointsProcedure.scala | 4 ++--
27 files changed, 43 insertions(+), 49 deletions(-)
rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/{TestHoodieSqlBase.scala => HoodieSparkSqlTestBase.scala} (98%)
diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 29702846b3d2d..b76a465d7128c 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -59,3 +59,11 @@ jobs:
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 before hadoop upgrade to 3.x
run:
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark
+ - name: Spark SQL Test
+ env:
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI
+ run:
+ mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" '-Dtest=org.apache.spark.sql.hudi.Test*' -pl hudi-spark-datasource/hudi-spark
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSqlStatement.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSqlStatement.scala
index c451b51ef77c6..f8a9cf5fb060f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSqlStatement.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSqlStatement.scala
@@ -18,9 +18,9 @@
package org.apache.hudi.functional
import org.apache.hudi.common.util.FileIOUtils
-import org.apache.spark.sql.hudi.TestHoodieSqlBase
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
-class TestSqlStatement extends TestHoodieSqlBase {
+class TestSqlStatement extends HoodieSparkSqlTestBase {
val STATE_INIT = 0
val STATE_SKIP_COMMENT = 1
val STATE_FINISH_COMMENT = 2
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala
index d84fad4f2493c..273303fdae63d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.ColumnStatsIndexHelper.buildColumnStatsTableFor
import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
import org.apache.hudi.sort.SpaceCurveSortingHelper
import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.hudi.TestHoodieSqlBase
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
import org.apache.spark.sql.types.{IntegerType, StructField}
import org.junit.jupiter.api.{Disabled, Tag, Test}
@@ -31,7 +31,7 @@ import scala.collection.JavaConversions._
import scala.util.Random
@Tag("functional")
-object SpaceCurveOptimizeBenchmark extends TestHoodieSqlBase {
+object SpaceCurveOptimizeBenchmark extends HoodieSparkSqlTestBase {
def evalSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= {
val sourceTableDF = spark.sql(s"select * from ${tableName}")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
similarity index 98%
rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index d1f373db99e51..68fc6d7c41d89 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -31,7 +31,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import java.io.File
import java.util.TimeZone
-class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
+class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
org.apache.log4j.Logger.getRootLogger.setLevel(Level.WARN)
private lazy val sparkWareHouse = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
index 0f2cb547c2fe9..6d29ea3f4a13e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.types.{LongType, StructField, StructType}
-class TestAlterTable extends TestHoodieSqlBase {
+class TestAlterTable extends HoodieSparkSqlTestBase {
test("Test Alter Table") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index ecbbadeeb9a28..677f8632a7143 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode
-class TestAlterTableDropPartition extends TestHoodieSqlBase {
+class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
test("Drop non-partitioned table") {
val tableName = generateTableName
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
index 20238a6e4318d..0ef89fc5b9fe3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi
-class TestCompactionTable extends TestHoodieSqlBase {
+class TestCompactionTable extends HoodieSparkSqlTestBase {
test("Test compaction table") {
withTempDir {tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 6b8efb84e32f1..e7910fa115852 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
-class TestCreateTable extends TestHoodieSqlBase {
+class TestCreateTable extends HoodieSparkSqlTestBase {
test("Test Create Managed Hoodie Table") {
val databaseName = "hudi_database"
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
index b2e888a5f3140..4c7c6269667ab 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.SaveMode
-class TestDeleteTable extends TestHoodieSqlBase {
+class TestDeleteTable extends HoodieSparkSqlTestBase {
test("Test Delete Table") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala
index c53eb9127c887..ed43d37d0388e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi
-class TestDropTable extends TestHoodieSqlBase {
+class TestDropTable extends HoodieSparkSqlTestBase {
test("Test Drop Table") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 4c0c60385104b..14c2245d5be36 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -19,27 +19,13 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.testutils.HoodieClientTestBase
-
-import org.apache.spark.sql.SparkSession
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.types._
-
import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.{BeforeEach, Test}
-
+import org.junit.jupiter.api.Test
import org.scalatest.Matchers.intercept
-class TestHoodieOptionConfig extends HoodieClientTestBase {
-
- var spark: SparkSession = _
-
- /**
- * Setup method running before each test.
- */
- @BeforeEach override def setUp() {
- initSparkContexts()
- spark = sqlContext.sparkSession
- }
+class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness {
@Test
def testWithDefaultSqlOptions(): Unit = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 3141208db121e..ab75ef563f229 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.SaveMode
import java.io.File
-class TestInsertTable extends TestHoodieSqlBase {
+class TestInsertTable extends HoodieSparkSqlTestBase {
test("Test Insert Into") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala
index 5139825f9428f..232b6bbb511c5 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.testutils.DataSourceTestUtils
-class TestMergeIntoLogOnlyTable extends TestHoodieSqlBase {
+class TestMergeIntoLogOnlyTable extends HoodieSparkSqlTestBase {
test("Test Query Log Only MOR Table") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 28dee88e1f61e..992a442f4fda3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers}
import org.apache.hudi.common.fs.FSUtils
-class TestMergeIntoTable extends TestHoodieSqlBase {
+class TestMergeIntoTable extends HoodieSparkSqlTestBase {
test("Test MergeInto Basic") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index 5041a543168bf..e162368dacc72 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.Row
-class TestMergeIntoTable2 extends TestHoodieSqlBase {
+class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
test("Test MergeInto for MOR table 2") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index 2524d04ec81fb..1af7a162be185 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi
-class TestPartialUpdateForMergeInto extends TestHoodieSqlBase {
+class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase {
test("Test Partial Update") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
index 868bfc43d57f1..369f3b341adce 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
import org.apache.spark.sql.Row
-class TestShowPartitions extends TestHoodieSqlBase {
+class TestShowPartitions extends HoodieSparkSqlTestBase {
test("Test Show Non Partitioned Table's Partitions") {
val tableName = generateTableName
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 54163635984bf..15fed579bba41 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-class TestSpark3DDL extends TestHoodieSqlBase {
+class TestSpark3DDL extends HoodieSparkSqlTestBase {
def createTestResult(tableName: String): Array[Row] = {
spark.sql(s"select * from ${tableName} order by id")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
index 1a8ac0e645899..ac3c49efdd713 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
@@ -28,7 +28,7 @@ import java.nio.file.{Files, Paths}
import org.scalatest.BeforeAndAfter
-class TestSqlConf extends TestHoodieSqlBase with BeforeAndAfter {
+class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter {
def setEnv(key: String, value: String): String = {
val field = System.getenv().getClass.getDeclaredField("m")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
index 471ebd6107dcc..ce0f17c3f569c 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
-class TestTimeTravelTable extends TestHoodieSqlBase {
+class TestTimeTravelTable extends HoodieSparkSqlTestBase {
test("Test Insert and Update Record with time travel") {
if (HoodieSparkUtils.gteqSpark3_2) {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala
index a61d0f822cf45..5dd243079efb7 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode
-class TestTruncateTable extends TestHoodieSqlBase {
+class TestTruncateTable extends HoodieSparkSqlTestBase {
test("Test Truncate non-partitioned Table") {
Seq("cow", "mor").foreach { tableType =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
index 57c4a972960a9..8c709ab37a6e3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi
-class TestUpdateTable extends TestHoodieSqlBase {
+class TestUpdateTable extends HoodieSparkSqlTestBase {
test("Test Update Table") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallCommandParser.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallCommandParser.scala
index 87814763bf4d3..668fb544934dd 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallCommandParser.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallCommandParser.scala
@@ -21,13 +21,13 @@ import com.google.common.collect.ImmutableList
import org.apache.hudi.HoodieSparkUtils
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{CallCommand, NamedArgument, PositionalArgument}
-import org.apache.spark.sql.hudi.TestHoodieSqlBase
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
import org.apache.spark.sql.types.{DataType, DataTypes}
import java.math.BigDecimal
import scala.collection.JavaConverters
-class TestCallCommandParser extends TestHoodieSqlBase {
+class TestCallCommandParser extends HoodieSparkSqlTestBase {
private val parser = spark.sessionState.sqlParser
test("Test Call Produce with Positional Arguments") {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
index bdf4cbe7ba0ff..f75569a1171f5 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
@@ -17,9 +17,9 @@
package org.apache.spark.sql.hudi.procedure
-import org.apache.spark.sql.hudi.TestHoodieSqlBase
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
-class TestCallProcedure extends TestHoodieSqlBase {
+class TestCallProcedure extends HoodieSparkSqlTestBase {
test("Test Call show_commits Procedure") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 6214117233467..f975651bd7527 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -24,11 +24,11 @@ import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeli
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
-import org.apache.spark.sql.hudi.TestHoodieSqlBase
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
import scala.collection.JavaConverters.asScalaIteratorConverter
-class TestClusteringProcedure extends TestHoodieSqlBase {
+class TestClusteringProcedure extends HoodieSparkSqlTestBase {
test("Test Call run_clustering Procedure By Table") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index f6e6772d161b6..0f6f96f91196f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -19,9 +19,9 @@
package org.apache.spark.sql.hudi.procedure
-import org.apache.spark.sql.hudi.TestHoodieSqlBase
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
-class TestCompactionProcedure extends TestHoodieSqlBase {
+class TestCompactionProcedure extends HoodieSparkSqlTestBase {
test("Test Call run_compaction Procedure by Table") {
withTempDir { tmp =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
index 7d60ca018d32a..cfc5319c75641 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
@@ -17,9 +17,9 @@
package org.apache.spark.sql.hudi.procedure
-import org.apache.spark.sql.hudi.TestHoodieSqlBase
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
-class TestSavepointsProcedure extends TestHoodieSqlBase {
+class TestSavepointsProcedure extends HoodieSparkSqlTestBase {
test("Test Call create_savepoints Procedure") {
withTempDir { tmp =>
From 52fe1c9faeb83fe51b520e18d0c37b67ad3fcfe4 Mon Sep 17 00:00:00 2001
From: Sivabalan Narayanan
Date: Fri, 6 May 2022 09:27:29 -0400
Subject: [PATCH 05/52] [HUDI-3675] Adding post write termination strategy to
deltastreamer continuous mode (#5073)
- Added a postWriteTerminationStrategy to deltastreamer continuous mode. One can enable by setting the appropriate termination strategy using DeltastreamerConfig.postWriteTerminationStrategyClass. If not, continuous mode is expected to run forever.
- Added one concrete impl for termination strategy as NoNewDataTerminationStrategy which shuts down deltastreamer if there is no new data to consume from source for N consecutive rounds.
---
.../deltastreamer/HoodieDeltaStreamer.java | 16 ++++++
.../NoNewDataTerminationStrategy.java | 56 +++++++++++++++++++
.../PostWriteTerminationStrategy.java | 39 +++++++++++++
.../TerminationStrategyUtils.java | 45 +++++++++++++++
.../functional/TestHoodieDeltaStreamer.java | 47 +++++++++++++++-
.../utilities/sources/TestDataSource.java | 9 ++-
6 files changed, 209 insertions(+), 3 deletions(-)
create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java
create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java
create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 824c7375fa07a..7a688b50c7097 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -43,6 +43,7 @@
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
@@ -403,6 +404,9 @@ public static class Config implements Serializable {
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer clusterSchedulingMinShare = 0;
+ @Parameter(names = {"--post-write-termination-strategy-class"}, description = "Post writer termination strategy class to gracefully shutdown deltastreamer in continuous mode")
+ public String postWriteTerminationStrategyClass = "";
+
public boolean isAsyncCompactionEnabled() {
return continuousMode && !forceDisableCompaction
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
@@ -603,6 +607,8 @@ public static class DeltaSyncService extends HoodieAsyncService {
*/
private transient DeltaSync deltaSync;
+ private final Option postWriteTerminationStrategy;
+
public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
Option properties) throws IOException {
this.cfg = cfg;
@@ -610,6 +616,8 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config
this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate();
this.asyncCompactService = Option.empty();
this.asyncClusteringService = Option.empty();
+ this.postWriteTerminationStrategy = StringUtils.isNullOrEmpty(cfg.postWriteTerminationStrategyClass) ? Option.empty() :
+ TerminationStrategyUtils.createPostWriteTerminationStrategy(properties.get(), cfg.postWriteTerminationStrategyClass);
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta =
@@ -695,6 +703,14 @@ protected Pair startService() {
}
}
}
+ // check if deltastreamer need to be shutdown
+ if (postWriteTerminationStrategy.isPresent()) {
+ if (postWriteTerminationStrategy.get().shouldShutdown(scheduledCompactionInstantAndRDD.isPresent() ? Option.of(scheduledCompactionInstantAndRDD.get().getRight()) :
+ Option.empty())) {
+ error = true;
+ shutdown(false);
+ }
+ }
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start);
if (toSleepMs > 0) {
LOG.info("Last sync ran less than min sync interval: " + cfg.minSyncIntervalSeconds + " s, sleep: "
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java
new file mode 100644
index 0000000000000..2701ce4bc3085
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+
+/**
+ * Post writer termination strategy for deltastreamer in continuous mode. This strategy is based on no new data for consecutive number of times.
+ */
+public class NoNewDataTerminationStrategy implements PostWriteTerminationStrategy {
+
+ private static final Logger LOG = LogManager.getLogger(NoNewDataTerminationStrategy.class);
+
+ public static final String MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN = "max.rounds.without.new.data.to.shutdown";
+ public static final int DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN = 3;
+
+ private final int numTimesNoNewDataToShutdown;
+ private int numTimesNoNewData = 0;
+
+ public NoNewDataTerminationStrategy(TypedProperties properties) {
+ numTimesNoNewDataToShutdown = properties.getInteger(MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN, DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN);
+ }
+
+ @Override
+ public boolean shouldShutdown(Option> writeStatuses) {
+ numTimesNoNewData = writeStatuses.isPresent() ? 0 : numTimesNoNewData + 1;
+ if (numTimesNoNewData >= numTimesNoNewDataToShutdown) {
+ LOG.info("Shutting down on continuous mode as there is no new data for " + numTimesNoNewData);
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java
new file mode 100644
index 0000000000000..61f55428f166a
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.spark.api.java.JavaRDD;
+
+/**
+ * Post write termination strategy for deltastreamer in continuous mode.
+ */
+public interface PostWriteTerminationStrategy {
+
+ /**
+ * Returns whether deltastreamer needs to be shutdown.
+ * @param writeStatuses optional pair of scheduled compaction instant and write statuses.
+ * @return true if deltastreamer has to be shutdown. false otherwise.
+ */
+ boolean shouldShutdown(Option> writeStatuses);
+
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java
new file mode 100644
index 0000000000000..1b046a0db0da2
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+
+public class TerminationStrategyUtils {
+
+ /**
+ * Create a PostWriteTerminationStrategy class via reflection,
+ *
+ * if the class name of PostWriteTerminationStrategy is configured through the {@link HoodieDeltaStreamer.Config#postWriteTerminationStrategyClass}.
+ */
+ public static Option createPostWriteTerminationStrategy(TypedProperties properties, String postWriteTerminationStrategyClass)
+ throws HoodieException {
+ try {
+ return StringUtils.isNullOrEmpty(postWriteTerminationStrategyClass)
+ ? Option.empty() :
+ Option.of((PostWriteTerminationStrategy) ReflectionUtils.loadClass(postWriteTerminationStrategyClass, properties));
+ } catch (Throwable e) {
+ throw new HoodieException("Could not create PostWritTerminationStrategy class " + postWriteTerminationStrategyClass, e);
+ }
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 0576f6aaee88b..3eaec56cc2764 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -60,6 +60,7 @@
import org.apache.hudi.utilities.HoodieIndexer;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
@@ -738,18 +739,30 @@ public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
}
+ @Test
+ public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception {
+ testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true);
+ }
+
@Test
public void testUpsertsMORContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
}
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
+ testUpsertsContinuousMode(tableType, tempDir, false);
+ }
+
+ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception {
String tableBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true;
+ if (testShutdownGracefully) {
+ cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName();
+ }
cfg.tableType = tableType.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
@@ -763,6 +776,9 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext);
+ if (testShutdownGracefully) {
+ TestDataSource.returnEmptyBatch = true;
+ }
return true;
});
}
@@ -781,8 +797,35 @@ static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.
}
});
TestHelpers.waitTillCondition(condition, dsFuture, 360);
- ds.shutdownGracefully();
- dsFuture.get();
+ if (cfg != null && !cfg.postWriteTerminationStrategyClass.isEmpty()) {
+ awaitDeltaStreamerShutdown(ds);
+ } else {
+ ds.shutdownGracefully();
+ dsFuture.get();
+ }
+ }
+
+ static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws InterruptedException {
+ // await until deltastreamer shuts down on its own
+ boolean shutDownRequested = false;
+ int timeSoFar = 0;
+ while (!shutDownRequested) {
+ shutDownRequested = ds.getDeltaSyncService().isShutdownRequested();
+ Thread.sleep(500);
+ timeSoFar += 500;
+ if (timeSoFar > (2 * 60 * 1000)) {
+ Assertions.fail("Deltastreamer should have shutdown by now");
+ }
+ }
+ boolean shutdownComplete = false;
+ while (!shutdownComplete) {
+ shutdownComplete = ds.getDeltaSyncService().isShutdown();
+ Thread.sleep(500);
+ timeSoFar += 500;
+ if (timeSoFar > (2 * 60 * 1000)) {
+ Assertions.fail("Deltastreamer should have shutdown by now");
+ }
+ }
}
static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function condition) throws Exception {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
index 1806d5c48b06d..a5a39dbe2d09e 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
@@ -39,11 +39,14 @@
public class TestDataSource extends AbstractBaseTestSource {
private static final Logger LOG = LogManager.getLogger(TestDataSource.class);
+ public static boolean returnEmptyBatch = false;
+ private static int counter = 0;
public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
initDataGen();
+ returnEmptyBatch = false;
}
@Override
@@ -54,9 +57,13 @@ protected InputBatch> fetchNewData(Option lastChe
LOG.info("Source Limit is set to " + sourceLimit);
// No new data.
- if (sourceLimit <= 0) {
+ if (sourceLimit <= 0 || returnEmptyBatch) {
+ LOG.warn("Return no new data from Test Data source " + counter + ", source limit " + sourceLimit);
return new InputBatch<>(Option.empty(), lastCheckpointStr.orElse(null));
+ } else {
+ LOG.warn("Returning valid data from Test Data source " + counter + ", source limit " + sourceLimit);
}
+ counter++;
List records =
fetchNextBatch(props, (int) sourceLimit, instantTime, DEFAULT_PARTITION_NUM).collect(Collectors.toList());
From 9625d16937954a54420384b41f964e48cba8cc2f Mon Sep 17 00:00:00 2001
From: cxzl25
Date: Sat, 7 May 2022 15:39:14 +0800
Subject: [PATCH 06/52] [HUDI-3849] AvroDeserializer supports
AVRO_REBASE_MODE_IN_READ configuration (#5287)
---
.../spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala
index 0275e2f635d3b..d839c73032cd4 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala
@@ -18,13 +18,14 @@
package org.apache.spark.sql.avro
import org.apache.avro.Schema
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
class HoodieSpark3_2AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType)
extends HoodieAvroDeserializer {
- private val avroDeserializer = new AvroDeserializer(rootAvroType, rootCatalystType, "EXCEPTION")
+ private val avroDeserializer = new AvroDeserializer(rootAvroType, rootCatalystType,
+ SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
def deserialize(data: Any): Option[Any] = avroDeserializer.deserialize(data)
}
From 80f99893a06b984da7332d7db05d6ac309810da0 Mon Sep 17 00:00:00 2001
From: BruceLin
Date: Sat, 7 May 2022 20:03:18 +0800
Subject: [PATCH 07/52] [MINOR] Fixing class not found when using flink and
enable metadata table (#5527)
---
packaging/hudi-flink-bundle/pom.xml | 1 +
1 file changed, 1 insertion(+)
diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml
index a322daaabe9a1..903671d754c76 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -111,6 +111,7 @@
com.github.davidmoten:guava-minicom.github.davidmoten:hilbert-curve
+ com.github.ben-manes.caffeine:caffeinecom.twitter:bijection-avro_${scala.binary.version}com.twitter:bijection-core_${scala.binary.version}io.dropwizard.metrics:metrics-core
From 569a76a9a5389efb74ec88b81c616f8ead58df5c Mon Sep 17 00:00:00 2001
From: Sivabalan Narayanan
Date: Sat, 7 May 2022 15:37:20 -0400
Subject: [PATCH 08/52] [MINOR] fixing flaky tests in deltastreamer tests
(#5521)
---
.../hudi/utilities/functional/HoodieDeltaStreamerTestBase.java | 2 ++
1 file changed, 2 insertions(+)
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
index a9de85ce5ac9e..4ac6f73d880fb 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
@@ -30,6 +30,7 @@
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema;
@@ -191,6 +192,7 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath)
@BeforeEach
public void setup() throws Exception {
super.setup();
+ TestParquetDFSSourceEmptyBatch.returnEmptyBatch = false;
}
@AfterAll
From 75eaa0bffe86306c5df7a52c42ee41b7c7dc24c1 Mon Sep 17 00:00:00 2001
From: guanziyue <30882822+guanziyue@users.noreply.github.com>
Date: Mon, 9 May 2022 10:27:37 +0800
Subject: [PATCH 09/52] [HUDI-4055]refactor ratelimiter to avoid stack overflow
(#5530)
---
.../apache/hudi/common/util/RateLimiter.java | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RateLimiter.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RateLimiter.java
index e156ccffdbb97..4915e454af215 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/RateLimiter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RateLimiter.java
@@ -53,19 +53,22 @@ private RateLimiter(int permits, TimeUnit timePeriod) {
}
public boolean tryAcquire(int numPermits) {
- if (numPermits > maxPermits) {
- acquire(maxPermits);
- return tryAcquire(numPermits - maxPermits);
- } else {
- return acquire(numPermits);
+ int remainingPermits = numPermits;
+ while (remainingPermits > 0) {
+ if (remainingPermits > maxPermits) {
+ acquire(maxPermits);
+ remainingPermits -= maxPermits;
+ } else {
+ return acquire(remainingPermits);
+ }
}
+ return true;
}
public boolean acquire(int numOps) {
try {
- if (!semaphore.tryAcquire(numOps)) {
+ while (!semaphore.tryAcquire(numOps)) {
Thread.sleep(WAIT_BEFORE_NEXT_ACQUIRE_PERMIT_IN_MS);
- return acquire(numOps);
}
LOG.debug(String.format("acquire permits: %s, maxPremits: %s", numOps, maxPermits));
} catch (InterruptedException e) {
From 4c708402758545c5097579d1a92e8d710ef1f61d Mon Sep 17 00:00:00 2001
From: ForwardXu
Date: Mon, 9 May 2022 15:17:24 +0800
Subject: [PATCH 10/52] [MINOR] Fixing close for HoodieCatalog's test (#5531)
* [MINOR] Fixing close for HoodieCatalog's test
---
.../org/apache/hudi/table/catalog/TestHoodieCatalog.java | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 3930e763fbaaa..8e23ef9d63bcb 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -42,6 +42,7 @@
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -138,6 +139,13 @@ void beforeEach() {
catalog.open();
}
+ @AfterEach
+ void afterEach() {
+ if (catalog != null) {
+ catalog.close();
+ }
+ }
+
@Test
public void testListDatabases() {
List actual = catalog.listDatabases();
From 6b47ef6ed223e10a05b91acc3331cff2fa069d87 Mon Sep 17 00:00:00 2001
From: xicm <36392121+xicm@users.noreply.github.com>
Date: Mon, 9 May 2022 16:35:50 +0800
Subject: [PATCH 11/52] =?UTF-8?q?[HUDI-4053]=20Flaky=20ITTestHoodieDataSou?=
=?UTF-8?q?rce.testStreamWriteBatchReadOpti=E2=80=A6=20(#5526)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* [HUDI-4053] Flaky ITTestHoodieDataSource.testStreamWriteBatchReadOptimized
Co-authored-by: xicm
---
.../apache/hudi/table/ITTestHoodieDataSource.java | 10 +++++++++-
.../test/java/org/apache/hudi/utils/TestData.java | 12 ++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 088ddb260dd5f..0c423df6b7bdb 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -240,7 +240,15 @@ void testStreamWriteBatchReadOptimized() {
List rows = CollectionUtil.iterableToList(
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
- assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+
+ // the test is flaky based on whether the first compaction is pending when
+ // scheduling the 2nd compaction.
+ // see details in CompactionPlanOperator#scheduleCompaction.
+ if (rows.size() < TestData.DATA_SET_SOURCE_INSERT.size()) {
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_FIRST_COMMIT);
+ } else {
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+ }
}
@Test
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index c1e924056cfa2..61f1657c2c6ed 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -164,6 +164,18 @@ public class TestData {
TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
);
+ // data set of test_source.data first commit.
+ public static List DATA_SET_SOURCE_INSERT_FIRST_COMMIT = Arrays.asList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33,
+ TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+ TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
+ insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31,
+ TimestampData.fromEpochMillis(4000), StringData.fromString("par2"))
+ );
+
// data set of test_source.data latest commit.
public static List DATA_SET_SOURCE_INSERT_LATEST_COMMIT = Arrays.asList(
insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
From 6285a239a35c5808ba2eea00193a51564c716ab6 Mon Sep 17 00:00:00 2001
From: Sivabalan Narayanan
Date: Mon, 9 May 2022 12:40:22 -0400
Subject: [PATCH 12/52] [HUDI-3995] Making perf optimizations for bulk insert
row writer path (#5462)
- Avoid using udf for key generator for SimpleKeyGen and NonPartitionedKeyGen.
- Fixed NonPartitioned Key generator to directly fetch record key from row rather than involving GenericRecord.
- Other minor fixes around using static values instead of looking up hashmap.
---
.../io/storage/row/HoodieRowCreateHandle.java | 14 +--
.../hudi/keygen/BuiltinKeyGenerator.java | 103 +++++-------------
.../hudi/keygen/ComplexKeyGenerator.java | 8 +-
.../hudi/keygen/GlobalDeleteKeyGenerator.java | 4 +-
.../keygen/NonpartitionedKeyGenerator.java | 6 +
.../hudi/keygen/RowKeyGeneratorHelper.java | 57 +++++-----
.../hudi/keygen/SimpleKeyGenerator.java | 8 +-
.../keygen/TimestampBasedKeyGenerator.java | 16 +--
.../hudi/keygen/TestRowGeneratorHelper.scala | 24 ++--
.../org/apache/hudi/avro/HoodieAvroUtils.java | 4 +-
.../hudi/common/model/HoodieRecord.java | 6 +-
.../common/table/HoodieTableMetaClient.java | 14 ++-
.../hudi/HoodieDatasetBulkInsertHelper.java | 71 ++++++++----
.../BulkInsertDataInternalWriterHelper.java | 5 +-
.../apache/hudi/HoodieSparkSqlWriter.scala | 3 +-
.../TestHoodieDatasetBulkInsertHelper.java | 53 +++++++--
.../hudi/keygen/TestComplexKeyGenerator.java | 2 +-
.../TestGlobalDeleteRecordGenerator.java | 4 +-
.../TestNonpartitionedKeyGenerator.java | 2 +-
.../hudi/keygen/TestSimpleKeyGenerator.java | 4 +-
20 files changed, 219 insertions(+), 189 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index ce3cd6f09768d..4db7eb26e64ba 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -68,8 +68,8 @@ public class HoodieRowCreateHandle implements Serializable {
private final HoodieTimer currTimer;
public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
- String instantTime, int taskPartitionId, long taskId, long taskEpochId,
- StructType structType) {
+ String instantTime, int taskPartitionId, long taskId, long taskEpochId,
+ StructType structType) {
this.partitionPath = partitionPath;
this.table = table;
this.writeConfig = writeConfig;
@@ -107,16 +107,15 @@ public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, S
/**
* Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
* and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
+ *
* @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
* @throws IOException
*/
public void write(InternalRow record) throws IOException {
try {
- String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
- HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
- String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
- String recordKey = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
- HoodieRecord.RECORD_KEY_METADATA_FIELD)).toString();
+ final String partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS));
+ final String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
+ final String recordKey = String.valueOf(record.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_POS));
HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, seqId, recordKey, partitionPath, path.getName(),
record);
try {
@@ -141,6 +140,7 @@ public boolean canWrite() {
/**
* Closes the {@link HoodieRowCreateHandle} and returns an instance of {@link HoodieInternalWriteStatus} containing the stats and
* status of the writes to this handle.
+ *
* @return the {@link HoodieInternalWriteStatus} containing the stats and status of the writes to this handle.
* @throws IOException
*/
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index fe03f60ee816c..0642a85c5f6cd 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -18,25 +18,25 @@
package org.apache.hudi.keygen;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.PublicAPIMethod;
-import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
-import scala.Function1;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import scala.Function1;
/**
* Base class for the built-in key generators. Contains methods structured for
@@ -46,13 +46,12 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
private static final String STRUCT_NAME = "hoodieRowTopLevelField";
private static final String NAMESPACE = "hoodieRow";
- private transient Function1 converterFn = null;
- private SparkRowSerDe sparkRowSerDe;
+ private Function1 converterFn = null;
+ private final AtomicBoolean validatePartitionFields = new AtomicBoolean(false);
protected StructType structType;
- protected Map> recordKeyPositions = new HashMap<>();
- protected Map> partitionPathPositions = new HashMap<>();
- protected Map> partitionPathDataTypes = null;
+ protected Map, DataType>> recordKeySchemaInfo = new HashMap<>();
+ protected Map, DataType>> partitionPathSchemaInfo = new HashMap<>();
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
@@ -60,6 +59,7 @@ protected BuiltinKeyGenerator(TypedProperties config) {
/**
* Fetch record key from {@link Row}.
+ *
* @param row instance of {@link Row} from which record key is requested.
* @return the record key of interest from {@link Row}.
*/
@@ -74,6 +74,7 @@ public String getRecordKey(Row row) {
/**
* Fetch partition path from {@link Row}.
+ *
* @param row instance of {@link Row} from which partition path is requested
* @return the partition path of interest from {@link Row}.
*/
@@ -97,87 +98,41 @@ public String getPartitionPath(Row row) {
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getPartitionPath(InternalRow internalRow, StructType structType) {
try {
- initDeserializer(structType);
- Row row = sparkRowSerDe.deserializeRow(internalRow);
- return getPartitionPath(row);
+ buildFieldSchemaInfoIfNeeded(structType);
+ return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
+ hiveStylePartitioning, partitionPathSchemaInfo);
} catch (Exception e) {
throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e);
}
}
- private void initDeserializer(StructType structType) {
- if (sparkRowSerDe == null) {
- sparkRowSerDe = HoodieSparkUtils.getDeserializer(structType);
- }
- }
-
- void buildFieldPositionMapIfNeeded(StructType structType) {
+ void buildFieldSchemaInfoIfNeeded(StructType structType) {
if (this.structType == null) {
- // parse simple fields
- getRecordKeyFields().stream()
- .filter(f -> !(f.contains(".")))
- .forEach(f -> {
- if (structType.getFieldIndex(f).isDefined()) {
- recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
- } else {
- throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
- }
- });
- // parse nested fields
- getRecordKeyFields().stream()
- .filter(f -> f.contains("."))
- .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
- // parse simple fields
+ getRecordKeyFields()
+ .stream().filter(f -> !f.isEmpty())
+ .forEach(f -> recordKeySchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, true)));
if (getPartitionPathFields() != null) {
- getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
- .forEach(f -> {
- if (structType.getFieldIndex(f).isDefined()) {
- partitionPathPositions.put(f,
- Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
- } else {
- partitionPathPositions.put(f, Collections.singletonList(-1));
- }
- });
- // parse nested fields
- getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
- .forEach(f -> partitionPathPositions.put(f,
- RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+ getPartitionPathFields().stream().filter(f -> !f.isEmpty())
+ .forEach(f -> partitionPathSchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, false)));
}
this.structType = structType;
}
}
protected String getPartitionPathInternal(InternalRow row, StructType structType) {
- buildFieldDataTypesMapIfNeeded(structType);
+ buildFieldSchemaInfoIfNeeded(structType);
validatePartitionFieldsForInternalRow();
return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(row, getPartitionPathFields(),
- hiveStylePartitioning, partitionPathPositions, partitionPathDataTypes);
+ hiveStylePartitioning, partitionPathSchemaInfo);
}
protected void validatePartitionFieldsForInternalRow() {
- partitionPathPositions.entrySet().forEach(entry -> {
- if (entry.getValue().size() > 1) {
- throw new IllegalArgumentException("Nested column for partitioning is not supported with disabling meta columns");
- }
- });
- }
-
- void buildFieldDataTypesMapIfNeeded(StructType structType) {
- buildFieldPositionMapIfNeeded(structType);
- if (this.partitionPathDataTypes == null) {
- this.partitionPathDataTypes = new HashMap<>();
- if (getPartitionPathFields() != null) {
- // populating simple fields are good enough
- getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
- .forEach(f -> {
- if (structType.getFieldIndex(f).isDefined()) {
- partitionPathDataTypes.put(f,
- Collections.singletonList((structType.fields()[structType.fieldIndex(f)].dataType())));
- } else {
- partitionPathDataTypes.put(f, Collections.singletonList(null));
- }
- });
- }
+ if (!validatePartitionFields.getAndSet(true)) {
+ partitionPathSchemaInfo.values().forEach(entry -> {
+ if (entry.getKey().size() > 1) {
+ throw new IllegalArgumentException("Nested column for partitioning is not supported with disabling meta columns");
+ }
+ });
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index 2e2167f9379f0..9ba3fb8760882 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -60,15 +60,15 @@ public String getPartitionPath(GenericRecord record) {
@Override
public String getRecordKey(Row row) {
- buildFieldPositionMapIfNeeded(row.schema());
- return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, true);
+ buildFieldSchemaInfoIfNeeded(row.schema());
+ return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
}
@Override
public String getPartitionPath(Row row) {
- buildFieldPositionMapIfNeeded(row.schema());
+ buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
- hiveStylePartitioning, partitionPathPositions);
+ hiveStylePartitioning, partitionPathSchemaInfo);
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 391ea2c87c917..77eec748c7cb1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -60,8 +60,8 @@ public List getPartitionPathFields() {
@Override
public String getRecordKey(Row row) {
- buildFieldPositionMapIfNeeded(row.schema());
- return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, true);
+ buildFieldSchemaInfoIfNeeded(row.schema());
+ return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index 032c750f03240..dc8b253b0f1be 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -61,6 +61,12 @@ public List getPartitionPathFields() {
return nonpartitionedAvroKeyGenerator.getPartitionPathFields();
}
+ @Override
+ public String getRecordKey(Row row) {
+ buildFieldSchemaInfoIfNeeded(row.schema());
+ return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false);
+ }
+
@Override
public String getPartitionPath(Row row) {
return nonpartitionedAvroKeyGenerator.getEmptyPartition();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
index 6a28fbe9501a9..c0e10e6f9b775 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
@@ -18,6 +18,7 @@
package org.apache.hudi.keygen;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.spark.sql.Row;
@@ -52,17 +53,18 @@ public class RowKeyGeneratorHelper {
/**
* Generates record key for the corresponding {@link Row}.
- * @param row instance of {@link Row} of interest
- * @param recordKeyFields record key fields as a list
+ *
+ * @param row instance of {@link Row} of interest
+ * @param recordKeyFields record key fields as a list
* @param recordKeyPositions record key positions for the corresponding record keys in {@code recordKeyFields}
- * @param prefixFieldName {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise.
+ * @param prefixFieldName {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise.
* @return the record key thus generated
*/
- public static String getRecordKeyFromRow(Row row, List recordKeyFields, Map> recordKeyPositions, boolean prefixFieldName) {
+ public static String getRecordKeyFromRow(Row row, List recordKeyFields, Map, DataType>> recordKeyPositions, boolean prefixFieldName) {
AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
String toReturn = recordKeyFields.stream().map(field -> {
String val = null;
- List fieldPositions = recordKeyPositions.get(field);
+ List fieldPositions = recordKeyPositions.get(field).getKey();
if (fieldPositions.size() == 1) { // simple field
Integer fieldPos = fieldPositions.get(0);
if (row.isNullAt(fieldPos)) {
@@ -76,7 +78,7 @@ public static String getRecordKeyFromRow(Row row, List recordKeyFields,
}
}
} else { // nested fields
- val = getNestedFieldVal(row, recordKeyPositions.get(field)).toString();
+ val = getNestedFieldVal(row, recordKeyPositions.get(field).getKey()).toString();
if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
keyIsNullOrEmpty.set(false);
}
@@ -91,17 +93,18 @@ public static String getRecordKeyFromRow(Row row, List recordKeyFields,
/**
* Generates partition path for the corresponding {@link Row}.
- * @param row instance of {@link Row} of interest
- * @param partitionPathFields partition path fields as a list
- * @param hiveStylePartitioning {@code true} if hive style partitioning is set. {@code false} otherwise
+ *
+ * @param row instance of {@link Row} of interest
+ * @param partitionPathFields partition path fields as a list
+ * @param hiveStylePartitioning {@code true} if hive style partitioning is set. {@code false} otherwise
* @param partitionPathPositions partition path positions for the corresponding fields in {@code partitionPathFields}
* @return the generated partition path for the row
*/
- public static String getPartitionPathFromRow(Row row, List partitionPathFields, boolean hiveStylePartitioning, Map> partitionPathPositions) {
+ public static String getPartitionPathFromRow(Row row, List partitionPathFields, boolean hiveStylePartitioning, Map, DataType>> partitionPathPositions) {
return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
String field = partitionPathFields.get(idx);
String val = null;
- List fieldPositions = partitionPathPositions.get(field);
+ List fieldPositions = partitionPathPositions.get(field).getKey();
if (fieldPositions.size() == 1) { // simple
Integer fieldPos = fieldPositions.get(0);
// for partition path, if field is not found, index will be set to -1
@@ -118,7 +121,7 @@ public static String getPartitionPathFromRow(Row row, List partitionPath
val = field + "=" + val;
}
} else { // nested
- Object data = getNestedFieldVal(row, partitionPathPositions.get(field));
+ Object data = getNestedFieldVal(row, partitionPathPositions.get(field).getKey());
data = convertToTimestampIfInstant(data);
if (data.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || data.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
val = hiveStylePartitioning ? field + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH;
@@ -130,20 +133,20 @@ public static String getPartitionPathFromRow(Row row, List partitionPath
}).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
}
- public static String getPartitionPathFromInternalRow(InternalRow row, List partitionPathFields, boolean hiveStylePartitioning,
- Map> partitionPathPositions,
- Map> partitionPathDataTypes) {
+ public static String getPartitionPathFromInternalRow(InternalRow internalRow, List partitionPathFields, boolean hiveStylePartitioning,
+ Map, DataType>> partitionPathPositions) {
return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
String field = partitionPathFields.get(idx);
String val = null;
- List fieldPositions = partitionPathPositions.get(field);
+ List fieldPositions = partitionPathPositions.get(field).getKey();
+ DataType dataType = partitionPathPositions.get(field).getValue();
if (fieldPositions.size() == 1) { // simple
Integer fieldPos = fieldPositions.get(0);
// for partition path, if field is not found, index will be set to -1
- if (fieldPos == -1 || row.isNullAt(fieldPos)) {
+ if (fieldPos == -1 || internalRow.isNullAt(fieldPos)) {
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
- Object value = row.get(fieldPos, partitionPathDataTypes.get(field).get(0));
+ Object value = internalRow.get(fieldPos, dataType);
if (value == null || value.toString().isEmpty()) {
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
@@ -180,22 +183,22 @@ public static Object getFieldValFromInternalRow(InternalRow internalRow,
/**
* Fetch the field value located at the positions requested for.
- *
+ *
* The fetching logic recursively goes into the nested field based on the position list to get the field value.
* For example, given the row [4357686,key1,2020-03-21,pi,[val1,10]] with the following schema, which has the fourth
* field as a nested field, and positions list as [4,0],
- *
+ *
* the logic fetches the value from field nested_col.prop1.
* If any level of the nested field is null, {@link KeyGenUtils#NULL_RECORDKEY_PLACEHOLDER} is returned.
* If the field value is an empty String, {@link KeyGenUtils#EMPTY_RECORDKEY_PLACEHOLDER} is returned.
*
- * @param row instance of {@link Row} of interest
+ * @param row instance of {@link Row} of interest
* @param positions tree style positions where the leaf node need to be fetched and returned
* @return the field value as per the positions requested for.
*/
@@ -234,13 +237,14 @@ public static Object getNestedFieldVal(Row row, List positions) {
* @param structType schema of interest
* @param field field of interest for which the positions are requested for
* @param isRecordKey {@code true} if the field requested for is a record key. {@code false} in case of a partition path.
- * @return the positions of the field as per the struct type.
+ * @return the positions of the field as per the struct type and the leaf field's datatype.
*/
- public static List getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) {
+ public static Pair, DataType> getFieldSchemaInfo(StructType structType, String field, boolean isRecordKey) {
String[] slices = field.split("\\.");
List positions = new ArrayList<>();
int index = 0;
int totalCount = slices.length;
+ DataType leafFieldDataType = null;
while (index < totalCount) {
String slice = slices[index];
Option