diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 278a3f09f99f6..93c5c8db0fe8b 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -155,7 +155,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_3
displayName: UT spark-datasource
- timeoutInMinutes: '150'
+ timeoutInMinutes: '180'
steps:
- task: Maven@4
displayName: maven install
@@ -179,7 +179,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_4
displayName: UT FT other modules
- timeoutInMinutes: '150'
+ timeoutInMinutes: '180'
steps:
- task: Maven@4
displayName: maven install
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 7b967ad064900..9d3b92c907ad0 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -28,6 +28,8 @@
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -118,7 +120,7 @@ public String showArchivedCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- blk.getRecordIterator().forEachRemaining(readRecords::add);
+ blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
}
List readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
@@ -192,8 +194,8 @@ public String showCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- try (ClosableIterator recordItr = blk.getRecordIterator()) {
- recordItr.forEachRemaining(readRecords::add);
+ try (ClosableIterator> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
+ recordItr.forEachRemaining(r -> readRecords.add(r.getData()));
}
}
List readCommits = readRecords.stream().map(r -> (GenericRecord) r)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index 333eb11b0c077..7dd889e58e374 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -32,6 +32,8 @@
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -121,9 +123,9 @@ private int copyArchivedInstants(List statuses, Set actionSe
// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- try (ClosableIterator recordItr = blk.getRecordIterator()) {
+ try (ClosableIterator> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
while (recordItr.hasNext()) {
- IndexedRecord ir = recordItr.next();
+ IndexedRecord ir = recordItr.next().getData();
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
// metadata record from the entry and convert it to json.
HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 22f70480a3f4d..9a0e485fc9fa0 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -30,9 +30,11 @@
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -44,6 +46,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -61,6 +64,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -121,7 +125,7 @@ public String showLogFileCommits(
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
if (n instanceof HoodieDataBlock) {
- try (ClosableIterator recordItr = ((HoodieDataBlock) n).getRecordIterator()) {
+ try (ClosableIterator> recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) {
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
}
}
@@ -217,12 +221,13 @@ public String showLogFileRecords(
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
+ .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.build();
- for (HoodieRecord extends HoodieRecordPayload> hoodieRecord : scanner) {
- Option record = hoodieRecord.getData().getInsertValue(readerSchema);
+ for (HoodieRecord hoodieRecord : scanner) {
+ Option record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
if (allRecords.size() < limit) {
- allRecords.add(record.get());
+ allRecords.add(record.get().getData());
}
}
} else {
@@ -236,10 +241,10 @@ public String showLogFileRecords(
HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n;
- try (ClosableIterator recordItr = blk.getRecordIterator()) {
+ try (ClosableIterator> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
recordItr.forEachRemaining(record -> {
if (allRecords.size() < limit) {
- allRecords.add(record);
+ allRecords.add(record.getData());
}
});
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
index bcccb66b3716c..456610b3911de 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
@@ -21,11 +21,11 @@
import org.apache.hudi.cli.HoodieCliSparkConfig;
import org.apache.hudi.cli.commands.SparkEnvCommand;
import org.apache.hudi.cli.commands.SparkMain;
-import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
@@ -116,7 +116,7 @@ public static JavaSparkContext initJavaSparkContext(String name, Option
}
public static JavaSparkContext initJavaSparkContext(SparkConf sparkConf) {
- SparkRDDWriteClient.registerClasses(sparkConf);
+ HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
index 5c0e4a662cd4a..afbab33072c7f 100644
--- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
+++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
@@ -18,20 +18,19 @@
package org.apache.hudi.cli
import org.apache.avro.Schema
-import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
+import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
-import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
+import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
-
import scala.collection.JavaConversions._
import scala.collection.mutable._
@@ -58,7 +57,7 @@ object SparkHelpers {
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
- val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
+ val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 25d54b648715f..21e6218dbe27b 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -28,9 +28,11 @@
import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
@@ -38,6 +40,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -111,7 +114,7 @@ public void init() throws IOException, InterruptedException, URISyntaxException
.withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build()) {
// write data to file
- List records = SchemaTestUtil.generateTestRecords(0, 100);
+ List records = SchemaTestUtil.generateTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
@@ -191,7 +194,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-log-fileid1").overBaseCommit(INSTANT_TIME).withFs(fs).withSizeThreshold(500).build();
- List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -228,15 +231,16 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
+ .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.build();
- Iterator> records = scanner.iterator();
+ Iterator records = scanner.iterator();
int num = 0;
int maxSize = 10;
List indexRecords = new ArrayList<>();
while (records.hasNext() && num < maxSize) {
- Option hoodieRecord = records.next().getData().getInsertValue(schema);
+ Option hoodieRecord = ((HoodieAvroRecord)records.next()).getData().getInsertValue(schema);
indexRecords.add(hoodieRecord.get());
num++;
}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
index 04f77df549606..b41d71cbcaeec 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
@@ -20,7 +20,6 @@
package org.apache.hudi.cli.functional;
import org.apache.hudi.client.SparkRDDReadClient;
-import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -28,6 +27,7 @@
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -100,7 +100,7 @@ public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
- SparkRDDWriteClient.registerClasses(sparkConf);
+ HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java
index 648ce805b0825..a3f552e640ffc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java
@@ -19,7 +19,6 @@
package org.apache.hudi.client;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import java.io.IOException;
@@ -28,7 +27,7 @@
/**
* Client will run one round of clustering.
*/
-public abstract class BaseClusterer implements Serializable {
+public abstract class BaseClusterer implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java
index 88737dbcf1d7e..ba4d3f77fd9e5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java
@@ -18,7 +18,6 @@
package org.apache.hudi.client;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import java.io.IOException;
@@ -27,7 +26,7 @@
/**
* Run one round of compaction.
*/
-public abstract class BaseCompactor implements Serializable {
+public abstract class BaseCompactor implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 1fcaff04559ae..48382e0870a32 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -44,7 +44,6 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
@@ -123,12 +122,12 @@
* Abstract Write Client providing functionality for performing commit, index updates and rollback
* Reused for regular write operations like upsert/insert/bulk-insert.. as well as bootstrap
*
- * @param Sub type of HoodieRecordPayload
+ * @param Type of data
* @param Type of inputs
* @param Type of keys
* @param Type of outputs
*/
-public abstract class BaseHoodieWriteClient extends BaseHoodieClient
+public abstract class BaseHoodieWriteClient extends BaseHoodieClient
implements RunsTableService {
protected static final String LOOKUP_STR = "lookup";
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index a61a5c9008293..6d632417a42bf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -30,8 +30,11 @@
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -341,7 +344,7 @@ public void mergeArchiveFiles(List compactCandidate) throws IOExcept
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- blk.getRecordIterator().forEachRemaining(records::add);
+ blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
@@ -668,7 +671,8 @@ private void writeToFile(Schema wrapperSchema, List records) thro
Map header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header, keyField);
+ List indexRecords = records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
+ HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, header, keyField);
writer.appendBlock(block);
records.clear();
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/FullRecordBootstrapDataProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/FullRecordBootstrapDataProvider.java
index 1cf1702717295..636e0e4b2603a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/FullRecordBootstrapDataProvider.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/FullRecordBootstrapDataProvider.java
@@ -23,6 +23,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -49,8 +50,9 @@ public FullRecordBootstrapDataProvider(TypedProperties props, HoodieEngineContex
* @param tableName Hudi Table Name
* @param sourceBasePath Source Base Path
* @param partitionPaths Partition Paths
+ * @param config config
* @return input records
*/
public abstract I generateInputRecords(String tableName,
- String sourceBasePath, List>> partitionPaths);
+ String sourceBasePath, List>> partitionPaths, HoodieWriteConfig config);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java
index 47dde723e00dd..abfbc987c16eb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java
@@ -18,20 +18,19 @@
package org.apache.hudi.client.utils;
-import java.util.Iterator;
-import java.util.function.Function;
-import org.apache.avro.generic.GenericRecord;
-
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
-public class MergingIterator implements Iterator {
+import java.util.Iterator;
+import java.util.function.BiFunction;
+
+public class MergingIterator implements Iterator {
- private final Iterator leftIterator;
- private final Iterator rightIterator;
- private final Function, T> mergeFunction;
+ private final Iterator leftIterator;
+ private final Iterator rightIterator;
+ private final BiFunction mergeFunction;
- public MergingIterator(Iterator leftIterator, Iterator rightIterator, Function, T> mergeFunction) {
+ public MergingIterator(Iterator leftIterator, Iterator rightIterator, BiFunction mergeFunction) {
this.leftIterator = leftIterator;
this.rightIterator = rightIterator;
this.mergeFunction = mergeFunction;
@@ -46,7 +45,7 @@ public boolean hasNext() {
}
@Override
- public T next() {
- return mergeFunction.apply(Pair.of(leftIterator.next(), rightIterator.next()));
+ public HoodieRecord next() {
+ return mergeFunction.apply(leftIterator.next(), rightIterator.next());
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
index a042255cdcb1a..35ca3d6d5adec 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
@@ -20,62 +20,33 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodiePayloadConfig;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.util.Iterator;
-import java.util.stream.StreamSupport;
+import java.util.Properties;
/**
* Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice.
*/
-public class HoodieFileSliceReader implements Iterator> {
+public class HoodieFileSliceReader implements Iterator> {
+
private final Iterator> recordsIterator;
public static HoodieFileSliceReader getFileSliceReader(
- Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
- String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException {
+ Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException {
if (baseFileReader.isPresent()) {
- Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
+ Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
while (baseIterator.hasNext()) {
- GenericRecord record = (GenericRecord) baseIterator.next();
- HoodieRecord extends HoodieRecordPayload> hoodieRecord = transform(
- record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
- scanner.processNextRecord(hoodieRecord);
+ scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props,
+ simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionName(), false));
}
- return new HoodieFileSliceReader(scanner.iterator());
- } else {
- Iterable> iterable = () -> scanner.iterator();
- HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build();
- return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
- .map(e -> {
- try {
- GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema, payloadConfig.getProps()).get();
- return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
- } catch (IOException io) {
- throw new HoodieIOException("Error while creating reader for file slice with no base file.", io);
- }
- }).iterator());
}
- }
-
- private static HoodieRecord extends HoodieRecordPayload> transform(
- GenericRecord record, HoodieMergedLogRecordScanner scanner, String payloadClass,
- String preCombineField, Option> simpleKeyGenFieldsOpt) {
- return simpleKeyGenFieldsOpt.isPresent()
- ? SpillableMapUtils.convertToHoodieRecordPayload(record,
- payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField(), Option.empty())
- : SpillableMapUtils.convertToHoodieRecordPayload(record,
- payloadClass, preCombineField, scanner.isWithOperationField(), scanner.getPartitionName());
+ return new HoodieFileSliceReader(scanner.iterator());
}
private HoodieFileSliceReader(Iterator> recordsItr) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 03eb44b001112..6bbcec370eb7b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -29,13 +29,16 @@
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FileSystemRetryConfig;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -45,6 +48,7 @@
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -130,6 +134,18 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");
+ public static final ConfigProperty MERGER_IMPLS = ConfigProperty
+ .key("hoodie.datasource.write.merger.impls")
+ .defaultValue(HoodieAvroRecordMerger.class.getName())
+ .withDocumentation("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used. "
+ + "These merger impls will filter by hoodie.datasource.write.merger.strategy "
+ + "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)");
+
+ public static final ConfigProperty MERGER_STRATEGY = ConfigProperty
+ .key("hoodie.datasource.write.merger.strategy")
+ .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+ .withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls which has the same merger strategy id");
+
public static final ConfigProperty KEYGENERATOR_CLASS_NAME = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
.noDefaultValue()
@@ -542,6 +558,7 @@ public class HoodieWriteConfig extends HoodieConfig {
private HoodieMetadataConfig metadataConfig;
private HoodieMetastoreConfig metastoreConfig;
private HoodieCommonConfig commonConfig;
+ private HoodieStorageConfig storageConfig;
private EngineType engineType;
/**
@@ -934,6 +951,7 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) {
this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
this.metastoreConfig = HoodieMetastoreConfig.newBuilder().fromProperties(props).build();
this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build();
+ this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build();
}
public static HoodieWriteConfig.Builder newBuilder() {
@@ -947,6 +965,15 @@ public String getBasePath() {
return getString(BASE_PATH);
}
+ public HoodieRecordMerger getRecordMerger() {
+ List mergers = getSplitStringsOrDefault(MERGER_IMPLS).stream()
+ .map(String::trim)
+ .distinct()
+ .collect(Collectors.toList());
+ String mergerStrategy = getString(MERGER_STRATEGY);
+ return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), engineType, mergers, mergerStrategy);
+ }
+
public String getSchema() {
return getString(AVRO_SCHEMA_STRING);
}
@@ -955,6 +982,10 @@ public void setSchema(String schemaStr) {
setValue(AVRO_SCHEMA_STRING, schemaStr);
}
+ public void setMergerClass(String mergerStrategy) {
+ setValue(MERGER_STRATEGY, mergerStrategy);
+ }
+
/**
* Returns schema used for writing records
*
@@ -2033,6 +2064,10 @@ public HoodieCommonConfig getCommonConfig() {
return commonConfig;
}
+ public HoodieStorageConfig getStorageConfig() {
+ return storageConfig;
+ }
+
/**
* Commit call back configs.
*/
@@ -2340,6 +2375,16 @@ public Builder withWritePayLoad(String payload) {
return this;
}
+ public Builder withMergerImpls(String mergerImpls) {
+ writeConfig.setValue(MERGER_IMPLS, mergerImpls);
+ return this;
+ }
+
+ public Builder withMergerStrategy(String mergerStrategy) {
+ writeConfig.setValue(MERGER_STRATEGY, mergerStrategy);
+ return this;
+ }
+
public Builder withKeyGenerator(String keyGeneratorClass) {
writeConfig.setValue(KEYGENERATOR_CLASS_NAME, keyGeneratorClass);
return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
index fffb5e0e775c1..55db97e87a492 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
@@ -19,9 +19,9 @@
package org.apache.hudi.execution;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
@@ -39,7 +39,7 @@
/**
* Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
*/
-public class CopyOnWriteInsertHandler
+public class CopyOnWriteInsertHandler
implements HoodieConsumer, List> {
private final HoodieWriteConfig config;
@@ -70,9 +70,9 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
}
@Override
- public void consume(HoodieInsertValueGenResult payload) {
- final HoodieRecord insertPayload = payload.record;
- String partitionPath = insertPayload.getPartitionPath();
+ public void consume(HoodieInsertValueGenResult genResult) {
+ final HoodieRecord record = genResult.getResult();
+ String partitionPath = record.getPartitionPath();
HoodieWriteHandle,?,?,?> handle = handles.get(partitionPath);
if (handle == null) {
// If the records are sorted, this means that we encounter a new partition path
@@ -83,19 +83,19 @@ public void consume(HoodieInsertValueGenResult payload) {
}
// Lazily initialize the handle, for the first time
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
+ record.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}
- if (!handle.canWrite(payload.record)) {
+ if (!handle.canWrite(genResult.getResult())) {
// Handle is full. Close the handle and add the WriteStatus
statuses.addAll(handle.close());
// Open new handle
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
+ record.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}
- handle.write(insertPayload, payload.insertValue, payload.exception);
+ handle.write(record, genResult.schema, new TypedProperties(genResult.props));
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index 1754836c91c4a..20f75f63c5234 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -18,21 +18,17 @@
package org.apache.hudi.execution;
+import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -41,7 +37,7 @@
/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new files.
*/
-public abstract class HoodieLazyInsertIterable
+public abstract class HoodieLazyInsertIterable
extends LazyIterableIterator, List> {
protected final HoodieWriteConfig hoodieConfig;
@@ -78,34 +74,40 @@ public HoodieLazyInsertIterable(Iterator> recordItr, boolean are
}
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
- public static class HoodieInsertValueGenResult {
- public T record;
- public Option insertValue;
- // It caches the exception seen while fetching insert value.
- public Option exception = Option.empty();
+ public static class HoodieInsertValueGenResult {
+ private final R record;
+ public final Schema schema;
+ public final Properties props;
- public HoodieInsertValueGenResult(T record, Schema schema, Properties properties) {
+ public HoodieInsertValueGenResult(R record, Schema schema, Properties properties) {
this.record = record;
- try {
- this.insertValue = ((HoodieRecordPayload) record.getData()).getInsertValue(schema, properties);
- } catch (Exception e) {
- this.exception = Option.of(e);
- }
+ this.schema = schema;
+ this.props = properties;
}
+
+ public R getResult() {
+ return record;
+ }
+ }
+
+ static Function, HoodieInsertValueGenResult> getCloningTransformer(Schema schema,
+ HoodieWriteConfig config) {
+ return getCloningTransformerInternal(schema, config.getProps());
}
- /**
- * Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some
- * expensive operations of transformation to the reader thread.
- */
- static Function, HoodieInsertValueGenResult> getTransformFunction(
- Schema schema, HoodieWriteConfig config) {
- return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, config.getProps());
+ static Function, HoodieInsertValueGenResult> getCloningTransformer(Schema schema) {
+ return getCloningTransformerInternal(schema, new TypedProperties());
}
- static Function, HoodieInsertValueGenResult> getTransformFunction(
- Schema schema) {
- return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, CollectionUtils.emptyProps());
+ private static Function, HoodieInsertValueGenResult> getCloningTransformerInternal(Schema schema,
+ TypedProperties props) {
+ return record -> {
+ // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
+ // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
+ // it since these records will be subsequently buffered (w/in the in-memory queue)
+ HoodieRecord clonedRecord = record.copy();
+ return new HoodieInsertValueGenResult(clonedRecord, schema, props);
+ };
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 6bbea356e51d4..8a8a03e1b17ac 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -23,6 +23,7 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
@@ -150,7 +151,8 @@ public static List filterKeysFromFile(Path filePath, List candid
Configuration configuration) throws HoodieIndexException {
ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
List foundRecordKeys = new ArrayList<>();
- try (HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath)) {
+ try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(configuration, filePath)) {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = HoodieTimer.start();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
index 42dcc1b97d760..8b7551361729f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
@@ -26,6 +26,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
@@ -122,4 +123,11 @@ public boolean canIndexLogFiles() {
public boolean isImplicitWithStorage() {
return false;
}
+
+ @VisibleForTesting
+ public static void clear() {
+ if (recordLocationMap != null) {
+ recordLocationMap.clear();
+ }
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
index b4c83c141b2bc..fdd232b55a0aa 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
@@ -19,11 +19,10 @@
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-public class AppendHandleFactory extends WriteHandleFactory {
+public class AppendHandleFactory extends WriteHandleFactory {
@Override
public HoodieAppendHandle create(final HoodieWriteConfig hoodieConfig, final String commitTime,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
index 09131b421f402..8dc19816fd133 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
@@ -19,13 +19,12 @@
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.io.Serializable;
-public class CreateHandleFactory extends WriteHandleFactory implements Serializable {
+public class CreateHandleFactory extends WriteHandleFactory implements Serializable {
private boolean preserveMetadata = false;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 3a1931554330f..02053127d9c83 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -19,11 +19,10 @@
package org.apache.hudi.io;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
@@ -36,10 +35,11 @@
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -67,6 +67,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -79,7 +80,7 @@
/**
* IO Operation to append data onto an existing file.
*/
-public class HoodieAppendHandle extends HoodieWriteHandle {
+public class HoodieAppendHandle extends HoodieWriteHandle {
private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
// This acts as the sequenceID for records written
@@ -88,7 +89,7 @@ public class HoodieAppendHandle extends
protected final String fileId;
// Buffer for holding records in memory before they are flushed to disk
- private final List recordList = new ArrayList<>();
+ private final List recordList = new ArrayList<>();
// Buffer for holding records (to be deleted) in memory before they are flushed to disk
private final List recordsToDelete = new ArrayList<>();
// Incoming records to be written to logs.
@@ -145,7 +146,7 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
this.fileId = fileId;
this.recordItr = recordItr;
- sizeEstimator = new DefaultSizeEstimator();
+ this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
}
@@ -225,8 +226,9 @@ protected boolean isUpdateRecord(HoodieRecord hoodieRecord) {
return hoodieRecord.getCurrentLocation() != null;
}
- private Option getIndexedRecord(HoodieRecord hoodieRecord) {
- Option
*/
@NotThreadSafe
-public class HoodieMergeHandle extends HoodieWriteHandle {
+public class HoodieMergeHandle extends HoodieWriteHandle {
private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
protected Map> keyToNewRecords;
protected Set writtenRecordKeys;
- protected HoodieFileWriter fileWriter;
+ protected HoodieFileWriter fileWriter;
private boolean preserveMetadata = false;
protected Path newFilePath;
@@ -158,15 +158,6 @@ public static HoodieBaseFile getLatestBaseFile(HoodieTable, ?, ?, ?> hoodieTab
return baseFileOp.get();
}
- @Override
- public Schema getWriterSchemaWithMetaFields() {
- return writeSchemaWithMetaFields;
- }
-
- public Schema getWriterSchema() {
- return writeSchema;
- }
-
/**
* Extract old file path, initialize StorageWriter and WriteStatus.
*/
@@ -202,8 +193,8 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
createMarkerFile(partitionPath, newFilePath.getName());
// Create the writer for writing the new version file
- fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath, hoodieTable,
- config, writeSchemaWithMetaFields, taskContextSupplier);
+ fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath, hoodieTable.getHadoopConf(),
+ config, writeSchemaWithMetaFields, taskContextSupplier, recordMerger.getRecordType());
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
@@ -269,66 +260,65 @@ protected void init(String fileId, Iterator> newRecordsItr) {
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
}
- protected boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord oldRecord, Option indexedRecord) {
+ protected boolean writeUpdateRecord(HoodieRecord newRecord, HoodieRecord oldRecord, Option combineRecordOpt, Schema writerSchema) throws IOException {
boolean isDelete = false;
- if (indexedRecord.isPresent()) {
+ if (combineRecordOpt.isPresent()) {
updatedRecordsWritten++;
- GenericRecord record = (GenericRecord) indexedRecord.get();
- if (oldRecord != record) {
+ if (oldRecord.getData() != combineRecordOpt.get().getData()) {
// the incoming record is chosen
- isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
+ isDelete = HoodieOperation.isDelete(newRecord.getOperation());
} else {
// the incoming record is dropped
return false;
}
}
- return writeRecord(hoodieRecord, indexedRecord, isDelete);
+ return writeRecord(newRecord, combineRecordOpt, writerSchema, config.getPayloadConfig().getProps(), isDelete);
}
- protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOException {
+ protected void writeInsertRecord(HoodieRecord newRecord) throws IOException {
Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema;
- Option insertRecord = hoodieRecord.getData().getInsertValue(schema, config.getProps());
// just skip the ignored record
- if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
+ if (newRecord.shouldIgnore(schema, config.getProps())) {
return;
}
- writeInsertRecord(hoodieRecord, insertRecord);
+ writeInsertRecord(newRecord, schema, config.getProps());
}
- protected void writeInsertRecord(HoodieRecord hoodieRecord, Option insertRecord) {
- if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
+ protected void writeInsertRecord(HoodieRecord newRecord, Schema schema, Properties prop)
+ throws IOException {
+ if (writeRecord(newRecord, Option.of(newRecord), schema, prop, HoodieOperation.isDelete(newRecord.getOperation()))) {
insertRecordsWritten++;
}
}
- protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord) {
- return writeRecord(hoodieRecord, indexedRecord, false);
+ protected boolean writeRecord(HoodieRecord newRecord, Option combineRecord, Schema schema, Properties prop) throws IOException {
+ return writeRecord(newRecord, combineRecord, schema, prop, false);
}
- private boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord, boolean isDelete) {
- Option recordMetadata = hoodieRecord.getData().getMetadata();
- if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
+ private boolean writeRecord(HoodieRecord newRecord, Option combineRecord, Schema schema, Properties prop, boolean isDelete) throws IOException {
+ Option recordMetadata = newRecord.getMetadata();
+ if (!partitionPath.equals(newRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
- + hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
- writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
+ + newRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
+ writeStatus.markFailure(newRecord, failureEx, recordMetadata);
return false;
}
try {
- if (indexedRecord.isPresent() && !isDelete) {
- writeToFile(hoodieRecord.getKey(), (GenericRecord) indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction);
+ if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, config.getProps()) && !isDelete) {
+ writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction);
recordsWritten++;
} else {
recordsDeleted++;
}
- writeStatus.markSuccess(hoodieRecord, recordMetadata);
+ writeStatus.markSuccess(newRecord, recordMetadata);
// deflate record payload after recording success. This will help users access payload as a
// part of marking
// record successful.
- hoodieRecord.deflate();
+ newRecord.deflate();
return true;
} catch (Exception e) {
- LOG.error("Error writing record " + hoodieRecord, e);
- writeStatus.markFailure(hoodieRecord, e, recordMetadata);
+ LOG.error("Error writing record " + newRecord, e);
+ writeStatus.markFailure(newRecord, e, recordMetadata);
}
return false;
}
@@ -336,23 +326,24 @@ private boolean writeRecord(HoodieRecord hoodieRecord, Option
/**
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
*/
- public void write(GenericRecord oldRecord) {
- String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
+ public void write(HoodieRecord oldRecord) {
+ Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields : writeSchema;
+ Schema newSchema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema;
boolean copyOldRecord = true;
+ String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
+ TypedProperties props = config.getPayloadConfig().getProps();
if (keyToNewRecords.containsKey(key)) {
// If we have duplicate records that we are updating, then the hoodie record will be deflated after
// writing the first record. So make a copy of the record to be merged
- HoodieRecord hoodieRecord = keyToNewRecords.get(key).newInstance();
+ HoodieRecord newRecord = keyToNewRecords.get(key).newInstance();
try {
- Option combinedAvroRecord =
- hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
- useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema,
- config.getPayloadConfig().getProps());
-
- if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
+ Option> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
+ Schema combineRecordSchema = mergeResult.map(Pair::getRight).orElse(null);
+ Option combinedRecord = mergeResult.map(Pair::getLeft);
+ if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
copyOldRecord = true;
- } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {
+ } else if (writeUpdateRecord(newRecord, oldRecord, combinedRecord, combineRecordSchema)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
* write the combined new value
@@ -371,7 +362,7 @@ public void write(GenericRecord oldRecord) {
if (copyOldRecord) {
try {
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
- writeToFile(new HoodieKey(key, partitionPath), oldRecord, true);
+ writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, props, true);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
@@ -382,13 +373,21 @@ public void write(GenericRecord oldRecord) {
}
}
- protected void writeToFile(HoodieKey key, GenericRecord avroRecord, boolean shouldPreserveRecordMetadata) throws IOException {
+ protected void writeToFile(HoodieKey key, HoodieRecord record, Schema schema, Properties prop, boolean shouldPreserveRecordMetadata) throws IOException {
+ HoodieRecord rewriteRecord;
+ if (schemaOnReadEnabled) {
+ rewriteRecord = record.rewriteRecordWithNewSchema(schema, prop, writeSchemaWithMetaFields);
+ } else {
+ rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields);
+ }
+ // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
+ // file holding this record even in cases when overall metadata is preserved
+ MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName());
+ rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
if (shouldPreserveRecordMetadata) {
- // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
- // file holding this record even in cases when overall metadata is preserved
- fileWriter.writeAvro(key.getRecordKey(), rewriteRecordWithMetadata(avroRecord, newFilePath.getName()));
+ fileWriter.write(key.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
} else {
- fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord));
+ fileWriter.writeWithMetadata(key, rewriteRecord, writeSchemaWithMetaFields);
}
}
@@ -452,7 +451,7 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {
}
long oldNumWrites = 0;
- try (HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath)) {
+ try (HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()).getFileReader(hoodieTable.getHadoopConf(), oldFilePath)) {
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to check for merge data validation", e);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
index 436eff5dac54d..b110c2c081782 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -21,7 +21,6 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -38,7 +37,7 @@ public class HoodieMergeHandleFactory {
/**
* Creates a merge handle for normal write path.
*/
- public static HoodieMergeHandle create(
+ public static HoodieMergeHandle create(
WriteOperationType operationType,
HoodieWriteConfig writeConfig,
String instantTime,
@@ -70,7 +69,7 @@ public static HoodieMergeHandle HoodieMergeHandle create(
+ public static HoodieMergeHandle create(
HoodieWriteConfig writeConfig,
String instantTime,
HoodieTable table,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index a19b87532635a..cd61f428ae4c0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -18,22 +18,25 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-
+import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -41,7 +44,10 @@
/**
* A merge handle that supports logging change logs.
*/
-public class HoodieMergeHandleWithChangeLog extends HoodieMergeHandle {
+public class HoodieMergeHandleWithChangeLog extends HoodieMergeHandle {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieMergeHandleWithChangeLog.class);
+
protected final HoodieCDCLogger cdcLogger;
public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable,
@@ -77,19 +83,27 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord oldRecord, Option indexedRecord) {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, indexedRecord);
+ protected boolean writeUpdateRecord(HoodieRecord newRecord, HoodieRecord oldRecord, Option combinedRecordOpt, Schema writerSchema)
+ throws IOException {
+ // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
+ Option savedCombineRecordOp = combinedRecordOpt.map(HoodieRecord::newInstance);
+ final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combinedRecordOpt, writerSchema);
if (result) {
- boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : indexedRecord);
+ boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+ Option avroRecordOpt = savedCombineRecordOp.flatMap(r ->
+ toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
+ cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
}
return result;
}
- protected void writeInsertRecord(HoodieRecord hoodieRecord, Option insertRecord) {
- super.writeInsertRecord(hoodieRecord, insertRecord);
- if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
- cdcLogger.put(hoodieRecord, null, insertRecord);
+ protected void writeInsertRecord(HoodieRecord newRecord) throws IOException {
+ Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema;
+ // TODO Remove these unnecessary newInstance invocations
+ HoodieRecord savedRecord = newRecord.newInstance();
+ super.writeInsertRecord(newRecord);
+ if (!HoodieOperation.isDelete(newRecord.getOperation())) {
+ cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
index 4440733508142..282a59466406e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
@@ -19,7 +19,6 @@
package org.apache.hudi.io;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
@@ -30,7 +29,7 @@
/**
* Extract range information for a given file slice.
*/
-public class HoodieRangeInfoHandle extends HoodieReadHandle {
+public class HoodieRangeInfoHandle extends HoodieReadHandle {
public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable hoodieTable,
Pair partitionPathFilePair) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
index 223241bc373d0..ebe6361fdf55f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -34,7 +33,7 @@
/**
* Base class for read operations done logically on the file group.
*/
-public abstract class HoodieReadHandle extends HoodieIOHandle {
+public abstract class HoodieReadHandle extends HoodieIOHandle {
protected final Pair partitionPathFileIDPair;
@@ -63,12 +62,12 @@ protected HoodieBaseFile getLatestDataFile() {
}
protected HoodieFileReader createNewFileReader() throws IOException {
- return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
+ return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
new Path(getLatestDataFile().getPath()));
}
protected HoodieFileReader createNewFileReader(HoodieBaseFile hoodieBaseFile) throws IOException {
- return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
+ return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
new Path(hoodieBaseFile.getPath()));
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 45f262ba886b4..18fe6a344db98 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -18,20 +18,17 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.generic.GenericRecord;
-
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
@@ -48,7 +45,7 @@
* keys in newRecordKeys (sorted in-memory).
*/
@NotThreadSafe
-public class HoodieSortedMergeHandle extends HoodieMergeHandle {
+public class HoodieSortedMergeHandle extends HoodieMergeHandle {
private final Queue newRecordKeysSorted = new PriorityQueue<>();
@@ -75,8 +72,10 @@ public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, Hoo
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
*/
@Override
- public void write(GenericRecord oldRecord) {
- String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
+ public void write(HoodieRecord oldRecord) {
+ Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields : writeSchema;
+ Schema newSchema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema;
+ String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
// To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than
// the oldRecord's key.
@@ -93,11 +92,7 @@ public void write(GenericRecord oldRecord) {
throw new HoodieUpsertException("Insert/Update not in sorted order");
}
try {
- if (useWriterSchemaForCompaction) {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writeSchemaWithMetaFields, config.getProps()));
- } else {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writeSchema, config.getProps()));
- }
+ writeRecord(hoodieRecord, Option.of(hoodieRecord), newSchema, config.getProps());
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
} catch (IOException e) {
@@ -117,9 +112,9 @@ public List close() {
HoodieRecord hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchemaForCompaction) {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writeSchemaWithMetaFields, config.getProps()));
+ writeRecord(hoodieRecord, Option.of(hoodieRecord), writeSchemaWithMetaFields, config.getProps());
} else {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writeSchema, config.getProps()));
+ writeRecord(hoodieRecord, Option.of(hoodieRecord), writeSchema, config.getProps());
}
insertRecordsWritten++;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
index 8d317b709a4f2..819cfd0754f6e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
@@ -19,23 +19,25 @@
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.Schema;
+import java.io.IOException;
+import java.util.Properties;
import java.util.Iterator;
import java.util.Map;
/**
* A sorted merge handle that supports logging change logs.
*/
-public class HoodieSortedMergeHandleWithChangeLog extends HoodieMergeHandleWithChangeLog {
+public class HoodieSortedMergeHandleWithChangeLog extends HoodieMergeHandleWithChangeLog {
public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable,
Iterator> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) {
@@ -51,9 +53,10 @@ public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String ins
super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
}
- protected boolean writeRecord(HoodieRecord hoodieRecord, Option insertRecord) {
- final boolean result = super.writeRecord(hoodieRecord, insertRecord);
- this.cdcLogger.put(hoodieRecord, null, insertRecord);
+ protected boolean writeRecord(HoodieRecord newRecord, Option insertRecord, Schema schema, Properties props)
+ throws IOException {
+ final boolean result = super.writeRecord(newRecord, insertRecord, schema, props);
+ this.cdcLogger.put(newRecord, null, insertRecord.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
return result;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
index ebbc7a5c28ea1..71a19816879ae 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
@@ -20,7 +20,6 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
@@ -36,7 +35,7 @@
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
@NotThreadSafe
-public class HoodieUnboundedCreateHandle extends HoodieCreateHandle {
+public class HoodieUnboundedCreateHandle extends HoodieCreateHandle {
private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 89af038e5168b..151f2e6f99fc6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -18,14 +18,20 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.util.HoodieTimer;
@@ -36,18 +42,11 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
@@ -55,34 +54,16 @@
/**
* Base class for all write operations logically performed at the file group level.
*/
-public abstract class HoodieWriteHandle extends HoodieIOHandle {
+public abstract class HoodieWriteHandle extends HoodieIOHandle {
private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);
- /**
- * A special record returned by {@link HoodieRecordPayload}, which means
- * {@link HoodieWriteHandle} should just skip this record.
- * This record is only used for {@link HoodieRecordPayload} currently, so it should not
- * shuffle though network, we can compare the record locally by the equal method.
- * The HoodieRecordPayload#combineAndGetUpdateValue and HoodieRecordPayload#getInsertValue
- * have 3 kind of return:
- * 1、Option.empty
- * This means we should delete this record.
- * 2、IGNORE_RECORD
- * This means we should not process this record,just skip.
- * 3、Other non-empty record
- * This means we should process this record.
- *
- * We can see the usage of IGNORE_RECORD in
- * org.apache.spark.sql.hudi.command.payload.ExpressionPayload
- */
- public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
-
/**
* Schema used to write records into data files
*/
protected final Schema writeSchema;
protected final Schema writeSchemaWithMetaFields;
+ protected final HoodieRecordMerger recordMerger;
protected HoodieTimer timer;
protected WriteStatus writeStatus;
@@ -113,6 +94,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+ this.recordMerger = config.getRecordMerger();
}
/**
@@ -159,6 +141,10 @@ public Schema getWriterSchemaWithMetaFields() {
return writeSchemaWithMetaFields;
}
+ public Schema getWriterSchema() {
+ return writeSchema;
+ }
+
/**
* Determines whether we can accept the incoming records, into the current file. Depending on
*
@@ -176,35 +162,15 @@ boolean layoutControlsNumFiles() {
/**
* Perform the actual writing of the given record into the backing file.
*/
- public void write(HoodieRecord record, Option insertValue) {
+ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props) {
// NO_OP
}
/**
* Perform the actual writing of the given record into the backing file.
*/
- public void write(HoodieRecord record, Option avroRecord, Option exception) {
- Option recordMetadata = ((HoodieRecordPayload) record.getData()).getMetadata();
- if (exception.isPresent() && exception.get() instanceof Throwable) {
- // Not throwing exception from here, since we don't want to fail the entire job for a single record
- writeStatus.markFailure(record, exception.get(), recordMetadata);
- LOG.error("Error writing record " + record, exception.get());
- } else {
- write(record, avroRecord);
- }
- }
-
- /**
- * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
- */
- protected GenericRecord rewriteRecord(GenericRecord record) {
- return schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
- : HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
- }
-
- protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
- return schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
- : HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
+ public void write(HoodieRecord record, Schema schema, TypedProperties props) {
+ doWrite(record, schema, props);
}
public abstract List close();
@@ -275,31 +241,12 @@ protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime, String f
}
}
- private static class IgnoreRecord implements GenericRecord {
-
- @Override
- public void put(int i, Object v) {
-
- }
-
- @Override
- public Object get(int i) {
- return null;
- }
-
- @Override
- public Schema getSchema() {
- return null;
- }
-
- @Override
- public void put(String key, Object v) {
-
- }
-
- @Override
- public Object get(String key) {
- return null;
+ protected static Option toAvroRecord(HoodieRecord record, Schema writerSchema, TypedProperties props) {
+ try {
+ return record.toIndexedRecord(writerSchema, props).map(HoodieAvroIndexedRecord::getData);
+ } catch (IOException e) {
+ LOG.error("Fail to get indexRecord from " + record, e);
+ return Option.empty();
}
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java
index a3f7c04ef23a8..fa5ce2c68bd88 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java
@@ -19,7 +19,6 @@
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
@@ -32,7 +31,7 @@
*
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
-public class SingleFileHandleCreateFactory extends CreateHandleFactory implements Serializable {
+public class SingleFileHandleCreateFactory extends CreateHandleFactory implements Serializable {
private final AtomicBoolean isHandleCreated = new AtomicBoolean(false);
private final String fileId;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
index c267b5969d801..46a0b1c614d1f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
@@ -20,13 +20,12 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.io.Serializable;
-public abstract class WriteHandleFactory implements Serializable {
+public abstract class WriteHandleFactory implements Serializable {
private int numFilesWritten = 0;
public abstract HoodieWriteHandle create(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
deleted file mode 100644
index 9ee8571ebd066..0000000000000
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.io.storage;
-
-import org.apache.hudi.avro.HoodieAvroWriteSupport;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroSchemaConverter;
-
-import java.io.IOException;
-
-import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
-import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
-import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.CACHE_DATA_IN_L1;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
-
-public class HoodieFileWriterFactory {
-
- public static HoodieFileWriter getFileWriter(
- String instantTime, Path path, HoodieTable hoodieTable, HoodieWriteConfig config, Schema schema,
- TaskContextSupplier taskContextSupplier) throws IOException {
- final String extension = FSUtils.getFileExtension(path.getName());
- if (PARQUET.getFileExtension().equals(extension)) {
- return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier, config.populateMetaFields());
- }
- if (HFILE.getFileExtension().equals(extension)) {
- return newHFileFileWriter(
- instantTime, path, config, schema, hoodieTable.getHadoopConf(), taskContextSupplier);
- }
- if (ORC.getFileExtension().equals(extension)) {
- return newOrcFileWriter(
- instantTime, path, config, schema, hoodieTable.getHadoopConf(), taskContextSupplier);
- }
- throw new UnsupportedOperationException(extension + " format not supported yet.");
- }
-
- private static HoodieFileWriter newParquetFileWriter(
- String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
- TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException {
- return newParquetFileWriter(instantTime, path, config, schema, hoodieTable.getHadoopConf(),
- taskContextSupplier, populateMetaFields, populateMetaFields);
- }
-
- private static HoodieFileWriter newParquetFileWriter(
- String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
- TaskContextSupplier taskContextSupplier, boolean populateMetaFields, boolean enableBloomFilter) throws IOException {
- Option filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
- HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(conf).convert(schema), schema, filter);
-
- HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, config.getParquetCompressionCodec(),
- config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
- conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());
-
- return new HoodieAvroParquetWriter<>(path, parquetConfig, instantTime, taskContextSupplier, populateMetaFields);
- }
-
- static HoodieFileWriter newHFileFileWriter(
- String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
- TaskContextSupplier taskContextSupplier) throws IOException {
-
- BloomFilter filter = createBloomFilter(config);
- HoodieHFileConfig hfileConfig = new HoodieHFileConfig(conf,
- config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(),
- HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION,
- filter, HFILE_COMPARATOR);
-
- return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields());
- }
-
- private static HoodieFileWriter newOrcFileWriter(
- String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
- TaskContextSupplier taskContextSupplier) throws IOException {
- BloomFilter filter = createBloomFilter(config);
- HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf, config.getOrcCompressionCodec(),
- config.getOrcStripeSize(), config.getOrcBlockSize(), config.getOrcMaxFileSize(), filter);
- return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, taskContextSupplier);
- }
-
- private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
- return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
- config.getDynamicBloomFilterMaxNumEntries(),
- config.getBloomFilterType());
- }
-}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 866fb432ab4ed..2408646e3900f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -994,7 +994,9 @@ protected HoodieData prepRecords(Map rddSinglePartitionRecords = records.map(r -> {
FileSlice slice = finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
fileGroupCount));
+ r.unseal();
r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+ r.seal();
return r;
});
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java
index cddac8e2ac653..8bad9fefa5c15 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java
@@ -37,7 +37,6 @@
import java.rmi.server.UnicastRemoteObject;
import java.util.Objects;
-
/**
* A reporter which publishes metric values to a JMX server.
*/
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
index 7c0ad4a4ad647..4d6a216cf2312 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
@@ -22,7 +22,6 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -36,15 +35,15 @@
*
* @param HoodieRecordPayload type.
*/
-public interface HoodieCompactionHandler {
+public interface HoodieCompactionHandler {
Iterator> handleUpdate(String instantTime, String partitionPath, String fileId,
Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException;
Iterator> handleInsert(String instantTime, String partitionPath, String fileId,
- Map> recordMap);
+ Map> recordMap);
default Iterator> handleInsertsForLogCompaction(String instantTime, String partitionPath, String fileId,
- Map> recordMap,
+ Map> recordMap,
Map header) {
throw new HoodieNotSupportedException("Operation is not yet supported");
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index c1a29220857e1..3a31c80d4289e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -43,7 +43,6 @@
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -115,7 +114,7 @@
* @param Type of keys
* @param Type of outputs
*/
-public abstract class HoodieTable implements Serializable {
+public abstract class HoodieTable implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieTable.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index d2b2cef2f604b..b672d4d6bbb2e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -26,12 +26,11 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-public abstract class BaseActionExecutor implements Serializable {
+public abstract class BaseActionExecutor implements Serializable {
protected final transient HoodieEngineContext context;
protected final transient Configuration hadoopConf;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
index fbdb941365b76..f9b85679fbbe1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
@@ -18,14 +18,11 @@
package org.apache.hudi.table.action.bootstrap;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.HoodieConsumer;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieBootstrapHandle;
-import java.io.IOException;
-
/**
* Consumer that dequeues records from queue and sends to Merge Handle for writing.
*/
@@ -39,12 +36,7 @@ public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) {
@Override
public void consume(HoodieRecord record) {
- try {
- bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
- .getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
+ bootstrapHandle.write(record, bootstrapHandle.getWriterSchema(), new TypedProperties());
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index c0a4f7731689d..9137eb436bb8a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -25,7 +25,6 @@
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
@@ -55,7 +54,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-public class CleanActionExecutor extends BaseActionExecutor {
+public class CleanActionExecutor extends BaseActionExecutor {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 7f3b437178fd4..1f6a5a1d790b3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -25,7 +25,6 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -46,7 +45,7 @@
import java.util.Map;
import java.util.stream.Collectors;
-public class CleanPlanActionExecutor extends BaseActionExecutor> {
+public class CleanPlanActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 396b47ae0a30c..5de92af3258e1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -30,7 +30,6 @@
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
@@ -74,7 +73,7 @@
*
* 2) It bounds the growth of the files in the file system
*/
-public class CleanPlanner implements Serializable {
+public class CleanPlanner implements Serializable {
private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index e0e02bae8e14c..683be09efeec4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -21,7 +21,6 @@
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -41,7 +40,7 @@
import java.util.Collections;
import java.util.Map;
-public class ClusteringPlanActionExecutor extends BaseActionExecutor> {
+public class ClusteringPlanActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(ClusteringPlanActionExecutor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index 163947fa34481..4b7240d432a43 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -20,7 +20,7 @@
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -34,17 +34,19 @@
/**
* Pluggable implementation for writing data into new file groups based on ClusteringPlan.
*/
-public abstract class ClusteringExecutionStrategy implements Serializable {
+public abstract class ClusteringExecutionStrategy implements Serializable {
private static final Logger LOG = LogManager.getLogger(ClusteringExecutionStrategy.class);
private final HoodieTable hoodieTable;
private final transient HoodieEngineContext engineContext;
- private final HoodieWriteConfig writeConfig;
+ protected final HoodieWriteConfig writeConfig;
+ protected final HoodieRecordType recordType;
public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
this.writeConfig = writeConfig;
this.hoodieTable = table;
this.engineContext = engineContext;
+ this.recordType = table.getConfig().getRecordMerger().getRecordType();
}
/**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 2df1824c5f72f..e55e900c3cedb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -25,7 +25,6 @@
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -50,7 +49,7 @@
/**
* Pluggable implementation for scheduling clustering and creating ClusteringPlan.
*/
-public abstract class ClusteringPlanStrategy implements Serializable {
+public abstract class ClusteringPlanStrategy implements Serializable {
private static final Logger LOG = LogManager.getLogger(ClusteringPlanStrategy.class);
public static final int CLUSTERING_PLAN_VERSION_1 = 1;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index e12d6d27aa278..915ccc2df0386 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -24,7 +24,6 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -44,7 +43,7 @@
/**
* Scheduling strategy with restriction that clustering groups can only contain files from same partition.
*/
-public abstract class PartitionAwareClusteringPlanStrategy extends ClusteringPlanStrategy {
+public abstract class PartitionAwareClusteringPlanStrategy extends ClusteringPlanStrategy {
private static final Logger LOG = LogManager.getLogger(PartitionAwareClusteringPlanStrategy.class);
public PartitionAwareClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
index c08c3f312dbc8..4463f7887bb47 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
@@ -20,7 +20,6 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.HoodieTable;
@@ -30,7 +29,7 @@
/**
* When file groups in clustering, write records to these file group need to check.
*/
-public abstract class UpdateStrategy implements Serializable {
+public abstract class UpdateStrategy implements Serializable {
protected final transient HoodieEngineContext engineContext;
protected final HoodieTable table;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index aa7196e3dbbed..b55993856790c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -18,7 +18,6 @@
package org.apache.hudi.table.action.commit;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.WriteHandleFactory;
@@ -26,7 +25,7 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-public abstract class BaseBulkInsertHelper {
+public abstract class BaseBulkInsertHelper {
/**
* Mark instant as inflight, write input records, update index and return result.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 5f406d8920165..1e92f80227482 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -31,7 +31,6 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -70,7 +69,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-public abstract class BaseCommitActionExecutor
+public abstract class BaseCommitActionExecutor
extends BaseActionExecutor {
private static final Logger LOG = LogManager.getLogger(BaseCommitActionExecutor.class);
@@ -157,7 +156,6 @@ protected String getCommitActionType() {
return table.getMetaClient().getCommitActionType();
}
-
/**
* Check if any validators are configured and run those validations. If any of the validations fail, throws HoodieValidationException.
*/
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
index b119587f47535..ceeb2aeb70dee 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -29,7 +28,7 @@
*
* @param
*/
-public abstract class BaseDeleteHelper {
+public abstract class BaseDeleteHelper {
/**
* Deduplicate Hoodie records, using the given deduplication function.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index f6572aae4a617..17b8620da63f6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -18,12 +18,11 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.generic.GenericRecord;
-
import java.io.IOException;
/**
@@ -42,7 +41,7 @@ public abstract class BaseMergeHelper {
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
- protected static class UpdateHandler implements HoodieConsumer {
+ protected static class UpdateHandler implements HoodieConsumer {
private final HoodieMergeHandle upsertHandle;
@@ -51,7 +50,7 @@ protected UpdateHandler(HoodieMergeHandle upsertHandle) {
}
@Override
- public void consume(GenericRecord record) {
+ public void consume(HoodieRecord record) {
upsertHandle.write(record);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index c69d8746d1913..adef1c44591a8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -18,8 +18,10 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
@@ -29,8 +31,9 @@
import java.time.Duration;
import java.time.Instant;
+import java.util.Properties;
-public abstract class BaseWriteHelper {
+public abstract class BaseWriteHelper {
public HoodieWriteMetadata write(String instantTime,
I inputRecords,
@@ -80,11 +83,16 @@ public I combineOnCondition(
* @param parallelism parallelism or partitions to be used while reducing/deduplicating
* @return Collection of HoodieRecord already be deduplicated
*/
- public I deduplicateRecords(
- I records, HoodieTable table, int parallelism) {
- return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema());
+ public I deduplicateRecords(I records, HoodieTable table, int parallelism) {
+ HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
+ return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
}
- public abstract I deduplicateRecords(
- I records, HoodieIndex, ?> index, int parallelism, String schema);
+ public I deduplicateRecords(I records, HoodieIndex, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) {
+ TypedProperties updatedProps = HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(props);
+ return doDeduplicateRecords(records, index, parallelism, schema, updatedProps, merger);
+ }
+
+ protected abstract I doDeduplicateRecords(
+ I records, HoodieIndex, ?> index, int parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
index fff52eb24d736..0d212555ab228 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
@@ -24,9 +24,10 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
@@ -45,7 +46,7 @@
* @param
*/
@SuppressWarnings("checkstyle:LineLength")
-public class HoodieDeleteHelper extends
+public class HoodieDeleteHelper extends
BaseDeleteHelper>, HoodieData, HoodieData, R> {
private HoodieDeleteHelper() {
}
@@ -85,8 +86,15 @@ public HoodieWriteMetadata> execute(String instantTime,
dedupedKeys = keys.repartition(parallelism);
}
- HoodieData> dedupedRecords =
- dedupedKeys.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
+ HoodieData dedupedRecords;
+ HoodieRecordType recordType = config.getRecordMerger().getRecordType();
+ if (recordType == HoodieRecordType.AVRO) {
+ // For BWC, will remove when HoodieRecordPayload removed
+ dedupedRecords =
+ dedupedKeys.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
+ } else {
+ dedupedRecords = dedupedKeys.map(key -> new HoodieEmptyRecord<>(key, recordType));
+ }
Instant beginTag = Instant.now();
// perform index loop up to get existing location of records
HoodieData> taggedRecords = table.getIndex().tagLocation(dedupedRecords, context, table);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 0b50f2a302fe0..2caa20c69c5c8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -18,17 +18,17 @@
package org.apache.hudi.table.action.commit;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.MappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -46,9 +46,10 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Iterator;
@@ -59,9 +60,10 @@
import java.util.stream.Collectors;
import static org.apache.hudi.avro.AvroSchemaUtils.isStrictProjectionOf;
-import static org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema;
-public class HoodieMergeHelper extends BaseMergeHelper {
+public class HoodieMergeHelper extends BaseMergeHelper {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieMergeHelper.class);
private HoodieMergeHelper() {
}
@@ -81,15 +83,18 @@ public void runMerge(HoodieTable, ?, ?, ?> table,
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
Configuration hadoopConf = new Configuration(table.getHadoopConf());
- HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
- HoodieFileReader bootstrapFileReader = null;
+ HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType();
+ HoodieFileReader baseFileReader = HoodieFileReaderFactory
+ .getReaderFactory(recordType)
+ .getFileReader(hadoopConf, mergeHandle.getOldFilePath());
+ HoodieFileReader bootstrapFileReader = null;
Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
Schema readerSchema = baseFileReader.getSchema();
// In case Advanced Schema Evolution is enabled we might need to rewrite currently
// persisted records to adhere to an evolved schema
- Option> schemaEvolutionTransformerOpt =
+ Option>, Schema>> schemaEvolutionTransformerOpt =
composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, table.getMetaClient());
// Check whether the writer schema is simply a projection of the file's one, ie
@@ -103,36 +108,46 @@ public void runMerge(HoodieTable, ?, ?, ?> table,
|| !isPureProjection
|| baseFile.getBootstrapBaseFile().isPresent();
- HoodieExecutor wrapper = null;
+ HoodieExecutor wrapper = null;
try {
- Iterator recordIterator;
+ Iterator recordIterator;
// In case writer's schema is simply a projection of the reader's one we can read
// the records in the projected schema directly
- ClosableIterator baseFileRecordIterator = baseFileReader.getRecordIterator(isPureProjection ? writerSchema : readerSchema);
-
+ ClosableIterator baseFileRecordIterator =
+ baseFileReader.getRecordIterator(isPureProjection ? writerSchema : readerSchema);
+ Schema recordSchema;
if (baseFile.getBootstrapBaseFile().isPresent()) {
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
- bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
- recordIterator = new MergingIterator<>(
- baseFileRecordIterator,
- bootstrapFileReader.getRecordIterator(),
- (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
-
+ bootstrapFileReader =
+ HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath);
+ recordIterator = new MergingIterator(baseFileRecordIterator, bootstrapFileReader.getRecordIterator(),
+ (left, right) -> left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields()));
+ recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
} else if (schemaEvolutionTransformerOpt.isPresent()) {
recordIterator = new MappingIterator<>(baseFileRecordIterator,
- schemaEvolutionTransformerOpt.get());
+ schemaEvolutionTransformerOpt.get().getLeft().apply(isPureProjection ? writerSchema : readerSchema));
+ recordSchema = schemaEvolutionTransformerOpt.get().getRight();
} else {
recordIterator = baseFileRecordIterator;
+ recordSchema = isPureProjection ? writerSchema : readerSchema;
}
wrapper = QueueBasedExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
+ // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
+ // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
+ // it since these records will be put into queue of QueueBasedExecutorFactory.
if (shouldRewriteInWriterSchema) {
- return rewriteRecordWithNewSchema(record, writerSchema);
+ try {
+ return record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema).copy();
+ } catch (IOException e) {
+ LOG.error("Error rewrite record with new schema", e);
+ throw new HoodieException(e);
+ }
} else {
- return record;
+ return record.copy();
}
}, table.getPreExecuteRunnable());
@@ -154,7 +169,7 @@ public void runMerge(HoodieTable, ?, ?, ?> table,
}
}
- private Option> composeSchemaEvolutionTransformer(Schema writerSchema,
+ private Option>, Schema>> composeSchemaEvolutionTransformer(Schema writerSchema,
HoodieBaseFile baseFile,
HoodieWriteConfig writeConfig,
HoodieTableMetaClient metaClient) {
@@ -195,7 +210,18 @@ private Option> composeSchemaEvolutionTra
|| SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
if (needToReWriteRecord) {
Map renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
- return Option.of(record -> rewriteRecordWithNewSchema(record, newWriterSchema, renameCols));
+ return Option.of(Pair.of(
+ (schema) -> (record) -> {
+ try {
+ return record.rewriteRecordWithNewSchema(
+ schema,
+ writeConfig.getProps(),
+ newWriterSchema, renameCols);
+ } catch (IOException e) {
+ LOG.error("Error rewrite record with new schema", e);
+ throw new HoodieException(e);
+ }
+ }, newWriterSchema));
} else {
return Option.empty();
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index b1d18573fcfc6..6557f83b24181 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -20,20 +20,21 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
-public class HoodieWriteHelper extends BaseWriteHelper>,
- HoodieData, HoodieData, R> {
+import java.io.IOException;
+public class HoodieWriteHelper extends BaseWriteHelper>,
+ HoodieData, HoodieData, R> {
private HoodieWriteHelper() {
}
@@ -52,8 +53,8 @@ protected HoodieData> tag(HoodieData> dedupedRec
}
@Override
- public HoodieData> deduplicateRecords(
- HoodieData> records, HoodieIndex, ?> index, int parallelism, String schemaStr) {
+ protected HoodieData> doDeduplicateRecords(
+ HoodieData> records, HoodieIndex, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
boolean isIndexingGlobal = index.isGlobal();
final SerializableSchema schema = new SerializableSchema(schemaStr);
// Auto-tunes the parallelism for reduce transformation based on the number of data partitions
@@ -63,13 +64,19 @@ public HoodieData> deduplicateRecords(
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
- return Pair.of(key, record);
+ // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ // Here we have to make a copy of the incoming record, since it might be holding
+ // an instance of [[InternalRow]] pointing into shared, mutable buffer
+ return Pair.of(key, record.copy());
}).reduceByKey((rec1, rec2) -> {
- @SuppressWarnings("unchecked")
- T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), CollectionUtils.emptyProps());
- HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
-
- return new HoodieAvroRecord<>(reducedKey, reducedData);
+ HoodieRecord reducedRecord;
+ try {
+ reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft();
+ } catch (IOException e) {
+ throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
+ }
+ HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey();
+ return reducedRecord.newInstance(reducedKey);
}, reduceParallelism).map(Pair::getRight);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
index a4daacfedbeec..f4498a82d4fb8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -22,7 +22,6 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -43,7 +42,7 @@
* @param Type of keys
* @param Type of outputs
*/
-public class CompactHelpers {
+public class CompactHelpers {
private static final CompactHelpers SINGLETON_INSTANCE = new CompactHelpers();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 51ed7639a4923..c6a20436c03ef 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -27,7 +27,6 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -65,7 +64,7 @@
/**
* A HoodieCompactor runs compaction on a hoodie table.
*/
-public abstract class HoodieCompactor implements Serializable {
+public abstract class HoodieCompactor implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
@@ -201,6 +200,7 @@ public List compact(HoodieCompactionHandler compactionHandler,
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.withUseScanV2(executionHelper.useScanV2(config))
+ .withRecordMerger(config.getRecordMerger())
.build();
Option oldDataFileOpt =
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index a0e5ec22f9f5f..c26e2a9ec516e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -25,7 +25,6 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -46,7 +45,7 @@
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
@SuppressWarnings("checkstyle:LineLength")
-public class RunCompactionActionExecutor extends
+public class RunCompactionActionExecutor extends
BaseActionExecutor>, HoodieData, HoodieData, HoodieWriteMetadata>> {
private final HoodieCompactor compactor;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 7b4fa0fa4cf1e..328ed7d922186 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -21,7 +21,6 @@
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -52,7 +51,7 @@
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
-public class ScheduleCompactionActionExecutor extends BaseActionExecutor> {
+public class ScheduleCompactionActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(ScheduleCompactionActionExecutor.class);
private WriteOperationType operationType;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index 7357054b4e233..e2ed1a06ac9a0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -92,6 +92,7 @@ private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, String
.withLatestInstantTime(maxInstantTime)
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withUseScanV2(true)
+ .withRecordMerger(writeConfig.getRecordMerger())
.build();
scanner.scanInternal(Option.empty(), true);
int totalBlocks = scanner.getCurrentInstantLogBlocks().size();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 0d43df70c7401..2fcbfb2b2e5b6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -28,7 +28,6 @@
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -82,7 +81,7 @@
* Reads the index plan and executes the plan.
* It also reconciles updates on data timeline while indexing was in progress.
*/
-public class RunIndexActionExecutor extends BaseActionExecutor> {
+public class RunIndexActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
index d562dec671d14..91587b8bd5606 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
@@ -23,7 +23,6 @@
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -60,7 +59,7 @@
* 3. Initialize file groups for the enabled partition types within a transaction.
*
*/
-public class ScheduleIndexActionExecutor extends BaseActionExecutor> {
+public class ScheduleIndexActionExecutor