diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index fcb273f0a73bd..b253e26416055 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -24,7 +24,9 @@
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -81,7 +83,7 @@ public String showArchivedCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- blk.getRecordIterator().forEachRemaining(readRecords::add);
+ blk.getRecordIterator(HoodieAvroIndexedRecord::new).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
}
List readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
@@ -155,8 +157,8 @@ public String showCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- try (ClosableIterator recordItr = blk.getRecordIterator()) {
- recordItr.forEachRemaining(readRecords::add);
+ try (ClosableIterator recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
+ recordItr.forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
}
}
List readCommits = readRecords.stream().map(r -> (GenericRecord) r)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index fa6e15b7af696..85734f923742b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -25,7 +25,9 @@
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -124,9 +126,9 @@ private int copyArchivedInstants(List statuses, Set actionSe
// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- try (ClosableIterator recordItr = blk.getRecordIterator()) {
+ try (ClosableIterator recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
while (recordItr.hasNext()) {
- IndexedRecord ir = recordItr.next();
+ IndexedRecord ir = (IndexedRecord) recordItr.next().getData();
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
// metadata record from the entry and convert it to json.
HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
@@ -157,7 +159,7 @@ private int copyArchivedInstants(List statuses, Set actionSe
default:
throw new HoodieException("Unknown type of action " + action);
}
-
+
final String instantTime = archiveEntryRecord.get("commitTime").toString();
final String outPath = localFolder + Path.SEPARATOR + instantTime + "." + action;
writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 49cc25b895730..17d413fb1f8f8 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -24,9 +24,9 @@
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -61,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -124,7 +125,7 @@ public String showLogFileCommits(
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
if (n instanceof HoodieDataBlock) {
- try (ClosableIterator recordItr = ((HoodieDataBlock) n).getRecordIterator()) {
+ try (ClosableIterator recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieAvroIndexedRecord::new)) {
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
}
}
@@ -221,8 +222,8 @@ public String showLogFileRecords(
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build();
- for (HoodieRecord extends HoodieRecordPayload> hoodieRecord : scanner) {
- Option record = hoodieRecord.getData().getInsertValue(readerSchema);
+ for (HoodieRecord hoodieRecord : scanner) {
+ Option record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
if (allRecords.size() < limit) {
allRecords.add(record.get());
}
@@ -238,10 +239,10 @@ public String showLogFileRecords(
HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n;
- try (ClosableIterator recordItr = blk.getRecordIterator()) {
+ try (ClosableIterator recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
recordItr.forEachRemaining(record -> {
if (allRecords.size() < limit) {
- allRecords.add(record);
+ allRecords.add((IndexedRecord) record.getData());
}
});
}
diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
index fbfc1d8ec902e..3462f6db55cc9 100644
--- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
+++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
@@ -18,12 +18,10 @@
package org.apache.hudi.cli
import org.apache.avro.Schema
-import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
-import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
@@ -50,7 +48,7 @@ object SparkHelpers {
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
- val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
+ val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index ee7fbda11b783..bc8f395e2c7ab 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -27,9 +27,10 @@
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
@@ -104,7 +105,7 @@ public void init() throws IOException, InterruptedException, URISyntaxException
.withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build()) {
// write data to file
- List records = SchemaTestUtil.generateTestRecords(0, 100);
+ List records = SchemaTestUtil.generateTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
@@ -184,7 +185,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-log-fileid1").overBaseCommit(INSTANT_TIME).withFs(fs).withSizeThreshold(500).build();
- List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -223,12 +224,12 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build();
- Iterator> records = scanner.iterator();
+ Iterator records = scanner.iterator();
int num = 0;
int maxSize = 10;
List indexRecords = new ArrayList<>();
while (records.hasNext() && num < maxSize) {
- Option hoodieRecord = records.next().getData().getInsertValue(schema);
+ Option hoodieRecord = ((HoodieAvroRecord)records.next()).getData().getInsertValue(schema);
indexRecords.add(hoodieRecord.get());
num++;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java
index 648ce805b0825..a3f552e640ffc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java
@@ -19,7 +19,6 @@
package org.apache.hudi.client;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import java.io.IOException;
@@ -28,7 +27,7 @@
/**
* Client will run one round of clustering.
*/
-public abstract class BaseClusterer implements Serializable {
+public abstract class BaseClusterer implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java
index 88737dbcf1d7e..ba4d3f77fd9e5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java
@@ -18,7 +18,6 @@
package org.apache.hudi.client;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import java.io.IOException;
@@ -27,7 +26,7 @@
/**
* Run one round of compaction.
*/
-public abstract class BaseCompactor implements Serializable {
+public abstract class BaseCompactor implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 5c485bed0581d..662c3108b7c43 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -43,7 +43,6 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
@@ -124,7 +123,7 @@
* @param Type of keys
* @param Type of outputs
*/
-public abstract class BaseHoodieWriteClient extends BaseHoodieClient
+public abstract class BaseHoodieWriteClient extends BaseHoodieClient
implements RunsTableService {
protected static final String LOOKUP_STR = "lookup";
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index c53554d8e04d2..91e2236f2f7fe 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -30,8 +30,10 @@
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -339,7 +341,7 @@ public void mergeArchiveFiles(List compactCandidate) throws IOExcept
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- blk.getRecordIterator().forEachRemaining(records::add);
+ blk.getRecordIterator(HoodieAvroIndexedRecord::new).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
@@ -646,7 +648,8 @@ private void writeToFile(Schema wrapperSchema, List records) thro
Map header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header, keyField);
+ List indexRecords = records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
+ HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, header, keyField);
writer.appendBlock(block);
records.clear();
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java
index 47dde723e00dd..f84f1c00d93b3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java
@@ -18,20 +18,18 @@
package org.apache.hudi.client.utils;
-import java.util.Iterator;
-import java.util.function.Function;
-import org.apache.avro.generic.GenericRecord;
-
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
-public class MergingIterator implements Iterator {
+import java.util.Iterator;
+import java.util.function.BiFunction;
+
+public class MergingIterator implements Iterator {
private final Iterator leftIterator;
private final Iterator rightIterator;
- private final Function, T> mergeFunction;
+ private final BiFunction mergeFunction;
- public MergingIterator(Iterator leftIterator, Iterator rightIterator, Function, T> mergeFunction) {
+ public MergingIterator(Iterator leftIterator, Iterator rightIterator, BiFunction mergeFunction) {
this.leftIterator = leftIterator;
this.rightIterator = rightIterator;
this.mergeFunction = mergeFunction;
@@ -47,6 +45,6 @@ public boolean hasNext() {
@Override
public T next() {
- return mergeFunction.apply(Pair.of(leftIterator.next(), rightIterator.next()));
+ return mergeFunction.apply(leftIterator.next(), rightIterator.next());
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
index a042255cdcb1a..52e411108f3d2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
@@ -20,13 +20,12 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -38,28 +37,28 @@
/**
* Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice.
*/
-public class HoodieFileSliceReader implements Iterator> {
+public class HoodieFileSliceReader implements Iterator> {
private final Iterator> recordsIterator;
public static HoodieFileSliceReader getFileSliceReader(
- Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
+ Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException {
if (baseFileReader.isPresent()) {
Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
- HoodieRecord extends HoodieRecordPayload> hoodieRecord = transform(
+ HoodieRecord hoodieRecord = transform(
record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
scanner.processNextRecord(hoodieRecord);
}
return new HoodieFileSliceReader(scanner.iterator());
} else {
- Iterable> iterable = () -> scanner.iterator();
+ Iterable iterable = () -> scanner.iterator();
HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build();
return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
.map(e -> {
try {
- GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema, payloadConfig.getProps()).get();
+ GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, payloadConfig.getProps()).get();
return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
} catch (IOException io) {
throw new HoodieIOException("Error while creating reader for file slice with no base file.", io);
@@ -68,9 +67,11 @@ public static HoodieFileSliceReader getFileSliceReader(
}
}
- private static HoodieRecord extends HoodieRecordPayload> transform(
- GenericRecord record, HoodieMergedLogRecordScanner scanner, String payloadClass,
- String preCombineField, Option> simpleKeyGenFieldsOpt) {
+ private static HoodieRecord transform(GenericRecord record,
+ HoodieMergedLogRecordScanner scanner,
+ String payloadClass,
+ String preCombineField,
+ Option> simpleKeyGenFieldsOpt) {
return simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record,
payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField(), Option.empty())
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
index 5e1f832b7f239..3a8a1ee007acc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
@@ -19,9 +19,9 @@
package org.apache.hudi.execution;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
@@ -37,7 +37,7 @@
/**
* Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
*/
-public class CopyOnWriteInsertHandler
+public class CopyOnWriteInsertHandler
extends BoundedInMemoryQueueConsumer, List> {
private HoodieWriteConfig config;
@@ -68,9 +68,9 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
}
@Override
- public void consumeOneRecord(HoodieInsertValueGenResult payload) {
- final HoodieRecord insertPayload = payload.record;
- String partitionPath = insertPayload.getPartitionPath();
+ public void consumeOneRecord(HoodieInsertValueGenResult genResult) {
+ final HoodieRecord record = genResult.getResult();
+ String partitionPath = record.getPartitionPath();
HoodieWriteHandle,?,?,?> handle = handles.get(partitionPath);
if (handle == null) {
// If the records are sorted, this means that we encounter a new partition path
@@ -81,19 +81,19 @@ public void consumeOneRecord(HoodieInsertValueGenResult payload) {
}
// Lazily initialize the handle, for the first time
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
+ record.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}
- if (!handle.canWrite(payload.record)) {
+ if (!handle.canWrite(genResult.getResult())) {
// Handle is full. Close the handle and add the WriteStatus
statuses.addAll(handle.close());
// Open new handle
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
+ record.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}
- handle.write(insertPayload, payload.insertValue, payload.exception);
+ handle.write(record, genResult.schema, new TypedProperties(genResult.props));
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index b078076b864f5..0ffc23d32e730 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -18,21 +18,17 @@
package org.apache.hudi.execution;
+import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -41,7 +37,7 @@
/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new files.
*/
-public abstract class HoodieLazyInsertIterable
+public abstract class HoodieLazyInsertIterable
extends LazyIterableIterator, List> {
protected final HoodieWriteConfig hoodieConfig;
@@ -78,19 +74,19 @@ public HoodieLazyInsertIterable(Iterator> recordItr, boolean are
}
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
- public static class HoodieInsertValueGenResult {
- public T record;
- public Option insertValue;
- // It caches the exception seen while fetching insert value.
- public Option exception = Option.empty();
+ public static class HoodieInsertValueGenResult {
+ private final R record;
+ public final Schema schema;
+ public final Properties props;
- public HoodieInsertValueGenResult(T record, Schema schema, Properties properties) {
+ public HoodieInsertValueGenResult(R record, Schema schema, Properties properties) {
this.record = record;
- try {
- this.insertValue = ((HoodieRecordPayload) record.getData()).getInsertValue(schema, properties);
- } catch (Exception e) {
- this.exception = Option.of(e);
- }
+ this.schema = schema;
+ this.props = properties;
+ }
+
+ public R getResult() {
+ return record;
}
}
@@ -98,12 +94,12 @@ public HoodieInsertValueGenResult(T record, Schema schema, Properties properties
* Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some
* expensive operations of transformation to the reader thread.
*/
- static Function, HoodieInsertValueGenResult> getTransformFunction(
+ static Function, HoodieInsertValueGenResult> getTransformFunction(
Schema schema, HoodieWriteConfig config) {
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, config.getProps());
}
- static Function, HoodieInsertValueGenResult> getTransformFunction(
+ static Function, HoodieInsertValueGenResult> getTransformFunction(
Schema schema) {
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, CollectionUtils.EMPTY_PROPERTIES);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 9b3dc8df0098a..e5f8d901c853e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -31,7 +31,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
@@ -132,7 +132,7 @@ public static List filterKeysFromFile(Path filePath, List candid
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
- HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
+ HoodieAvroFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
Set fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
index b4c83c141b2bc..fdd232b55a0aa 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
@@ -19,11 +19,10 @@
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-public class AppendHandleFactory extends WriteHandleFactory {
+public class AppendHandleFactory extends WriteHandleFactory {
@Override
public HoodieAppendHandle create(final HoodieWriteConfig hoodieConfig, final String commitTime,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
index 09131b421f402..8dc19816fd133 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
@@ -19,13 +19,12 @@
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.io.Serializable;
-public class CreateHandleFactory extends WriteHandleFactory implements Serializable {
+public class CreateHandleFactory extends WriteHandleFactory implements Serializable {
private boolean preserveMetadata = false;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 426e20f83b034..599492265d840 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -19,11 +19,10 @@
package org.apache.hudi.io;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
@@ -67,6 +66,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -79,7 +79,7 @@
/**
* IO Operation to append data onto an existing file.
*/
-public class HoodieAppendHandle extends HoodieWriteHandle {
+public class HoodieAppendHandle extends HoodieWriteHandle {
private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
// This acts as the sequenceID for records written
@@ -87,7 +87,7 @@ public class HoodieAppendHandle extends
protected final String fileId;
// Buffer for holding records in memory before they are flushed to disk
- private final List recordList = new ArrayList<>();
+ private final List recordList = new ArrayList<>();
// Buffer for holding records (to be deleted) in memory before they are flushed to disk
private final List recordsToDelete = new ArrayList<>();
// Incoming records to be written to logs.
@@ -126,7 +126,7 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
this.fileId = fileId;
this.recordItr = recordItr;
- sizeEstimator = new DefaultSizeEstimator();
+ this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
}
@@ -201,8 +201,8 @@ protected boolean isUpdateRecord(HoodieRecord hoodieRecord) {
return hoodieRecord.getCurrentLocation() != null;
}
- private Option getIndexedRecord(HoodieRecord hoodieRecord) {
- Option
*/
@NotThreadSafe
-public class HoodieMergeHandle extends HoodieWriteHandle {
+public class HoodieMergeHandle extends HoodieWriteHandle {
private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
protected Map> keyToNewRecords;
protected Set writtenRecordKeys;
- protected HoodieFileWriter fileWriter;
+ protected HoodieFileWriter fileWriter;
private boolean preserveMetadata = false;
protected Path newFilePath;
@@ -257,12 +255,12 @@ protected void init(String fileId, Iterator> newRecordsItr) {
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
}
- private boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord oldRecord, Option indexedRecord) {
+ private boolean writeUpdateRecord(HoodieRecord hoodieRecord, HoodieRecord oldRecord, Option combineRecordOp) throws IOException {
boolean isDelete = false;
- if (indexedRecord.isPresent()) {
+ Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+ if (combineRecordOp.isPresent()) {
updatedRecordsWritten++;
- GenericRecord record = (GenericRecord) indexedRecord.get();
- if (oldRecord != record) {
+ if (oldRecord.getData() != combineRecordOp.get().getData()) {
// the incoming record is chosen
isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
} else {
@@ -270,27 +268,26 @@ private boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord ol
return false;
}
}
- return writeRecord(hoodieRecord, indexedRecord, isDelete);
+ return writeRecord(hoodieRecord, combineRecordOp, schema, config.getProps(), isDelete);
}
protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOException {
Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
- Option insertRecord = hoodieRecord.getData().getInsertValue(schema, config.getProps());
// just skip the ignored record
- if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
+ if (hoodieRecord.shouldIgnore(schema, config.getProps())) {
return;
}
- if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
+ if (writeRecord(hoodieRecord, Option.of(hoodieRecord), schema, config.getProps(), HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
insertRecordsWritten++;
}
}
- protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord) {
- return writeRecord(hoodieRecord, indexedRecord, false);
+ protected boolean writeRecord(HoodieRecord hoodieRecord, Option combineRecord, Schema schema, Properties prop) throws IOException {
+ return writeRecord(hoodieRecord, combineRecord, schema, prop, false);
}
- protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord, boolean isDelete) {
- Option recordMetadata = hoodieRecord.getData().getMetadata();
+ protected boolean writeRecord(HoodieRecord hoodieRecord, Option combineRecord, Schema schema, Properties prop, boolean isDelete) throws IOException {
+ Option recordMetadata = hoodieRecord.getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
+ hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
@@ -298,8 +295,8 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option hoodieRecord, Option oldRecord) {
+ String key = oldRecord.getRecordKey(keyGeneratorOpt);
boolean copyOldRecord = true;
+ Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+ TypedProperties props = config.getPayloadConfig().getProps();
if (keyToNewRecords.containsKey(key)) {
// If we have duplicate records that we are updating, then the hoodie record will be deflated after
// writing the first record. So make a copy of the record to be merged
HoodieRecord hoodieRecord = keyToNewRecords.get(key).newInstance();
try {
- Option combinedAvroRecord =
- hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
- useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema,
- config.getPayloadConfig().getProps());
+ Option combinedRecord =
+ hoodieRecord.combineAndGetUpdateValue(oldRecord,
+ schema,
+ props);
- if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
+ if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
copyOldRecord = true;
- } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {
+ } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedRecord)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
* write the combined new value
@@ -355,7 +354,7 @@ public void write(GenericRecord oldRecord) {
if (copyOldRecord) {
try {
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
- writeToFile(new HoodieKey(key, partitionPath), oldRecord, true);
+ writeToFile(new HoodieKey(key, partitionPath), oldRecord, schema, props, true);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
@@ -366,13 +365,13 @@ public void write(GenericRecord oldRecord) {
}
}
- protected void writeToFile(HoodieKey key, GenericRecord avroRecord, boolean shouldPreserveRecordMetadata) throws IOException {
+ protected void writeToFile(HoodieKey key, HoodieRecord record, Schema schema, Properties prop, boolean shouldPreserveRecordMetadata) throws IOException {
if (shouldPreserveRecordMetadata) {
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
// file holding this record even in cases when overall metadata is preserved
- fileWriter.writeAvro(key.getRecordKey(), rewriteRecordWithMetadata(avroRecord, newFilePath.getName()));
+ fileWriter.write(key.getRecordKey(), record.rewriteRecordWithMetadata(schema, prop, schemaOnReadEnabled, writeSchemaWithMetaFields, newFilePath.getName()), writeSchemaWithMetaFields);
} else {
- fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord));
+ fileWriter.writeWithMetadata(key, record.rewriteRecord(schema, prop, schemaOnReadEnabled, writeSchemaWithMetaFields), writeSchemaWithMetaFields);
}
}
@@ -437,7 +436,7 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {
long oldNumWrites = 0;
try {
- HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
+ HoodieAvroFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to check for merge data validation", e);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
index abe4a9befef9b..753d1db3010fb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
@@ -18,10 +18,9 @@
package org.apache.hudi.io;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
@@ -29,7 +28,7 @@
/**
* Extract range information for a given file slice.
*/
-public class HoodieRangeInfoHandle extends HoodieReadHandle {
+public class HoodieRangeInfoHandle extends HoodieReadHandle {
public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable hoodieTable,
Pair partitionPathFilePair) {
@@ -37,7 +36,7 @@ public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable h
}
public String[] getMinMaxKeys() throws IOException {
- try (HoodieFileReader reader = createNewFileReader()) {
+ try (HoodieAvroFileReader reader = createNewFileReader()) {
return reader.readMinMaxRecordKeys();
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
index fee75b22decd7..b699e9700c1f6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
@@ -21,11 +21,10 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
@@ -34,7 +33,7 @@
/**
* Base class for read operations done logically on the file group.
*/
-public abstract class HoodieReadHandle extends HoodieIOHandle {
+public abstract class HoodieReadHandle extends HoodieIOHandle {
protected final Pair partitionPathFileIDPair;
@@ -62,7 +61,7 @@ protected HoodieBaseFile getLatestDataFile() {
.getLatestBaseFile(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight()).get();
}
- protected HoodieFileReader createNewFileReader() throws IOException {
+ protected HoodieAvroFileReader createNewFileReader() throws IOException {
return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
new Path(getLatestDataFile().getPath()));
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 7dce31a4c349b..60c6a2da7fb1f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -22,16 +22,12 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.generic.GenericRecord;
-
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
@@ -48,7 +44,7 @@
* keys in newRecordKeys (sorted in-memory).
*/
@NotThreadSafe
-public class HoodieSortedMergeHandle extends HoodieMergeHandle {
+public class HoodieSortedMergeHandle extends HoodieMergeHandle {
private final Queue newRecordKeysSorted = new PriorityQueue<>();
@@ -75,8 +71,8 @@ public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, Hoo
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
*/
@Override
- public void write(GenericRecord oldRecord) {
- String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
+ public void write(HoodieRecord oldRecord) {
+ String key = oldRecord.getRecordKey(keyGeneratorOpt);
// To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than
// the oldRecord's key.
@@ -94,9 +90,9 @@ public void write(GenericRecord oldRecord) {
}
try {
if (useWriterSchemaForCompaction) {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
+ writeRecord(hoodieRecord, Option.of(hoodieRecord), tableSchemaWithMetaFields, config.getProps());
} else {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
+ writeRecord(hoodieRecord, Option.of(hoodieRecord), tableSchema, config.getProps());
}
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
@@ -117,9 +113,9 @@ public List close() {
HoodieRecord hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchemaForCompaction) {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
+ writeRecord(hoodieRecord, Option.of(hoodieRecord), tableSchemaWithMetaFields, config.getProps());
} else {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
+ writeRecord(hoodieRecord, Option.of(hoodieRecord), tableSchema, config.getProps());
}
insertRecordsWritten++;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
index ebbc7a5c28ea1..71a19816879ae 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
@@ -20,7 +20,6 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
@@ -36,7 +35,7 @@
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
@NotThreadSafe
-public class HoodieUnboundedCreateHandle extends HoodieCreateHandle {
+public class HoodieUnboundedCreateHandle extends HoodieCreateHandle {
private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index b7fdbecfd56d1..09f8831f8b71d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -18,12 +18,15 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -34,48 +37,22 @@
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.HashMap;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
/**
* Base class for all write operations logically performed at the file group level.
*/
-public abstract class HoodieWriteHandle extends HoodieIOHandle {
+public abstract class HoodieWriteHandle extends HoodieIOHandle {
private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);
- /**
- * A special record returned by {@link HoodieRecordPayload}, which means
- * {@link HoodieWriteHandle} should just skip this record.
- * This record is only used for {@link HoodieRecordPayload} currently, so it should not
- * shuffle though network, we can compare the record locally by the equal method.
- * The HoodieRecordPayload#combineAndGetUpdateValue and HoodieRecordPayload#getInsertValue
- * have 3 kind of return:
- * 1、Option.empty
- * This means we should delete this record.
- * 2、IGNORE_RECORD
- * This means we should not process this record,just skip.
- * 3、Other non-empty record
- * This means we should process this record.
- *
- * We can see the usage of IGNORE_RECORD in
- * org.apache.spark.sql.hudi.command.payload.ExpressionPayload
- */
- public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
-
/**
* The specified schema of the table. ("specified" denotes that this is configured by the client,
* as opposed to being implicitly fetched out of the commit metadata)
@@ -208,35 +185,15 @@ boolean layoutControlsNumFiles() {
/**
* Perform the actual writing of the given record into the backing file.
*/
- public void write(HoodieRecord record, Option insertValue) {
+ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props) {
// NO_OP
}
/**
* Perform the actual writing of the given record into the backing file.
*/
- public void write(HoodieRecord record, Option avroRecord, Option exception) {
- Option recordMetadata = ((HoodieRecordPayload) record.getData()).getMetadata();
- if (exception.isPresent() && exception.get() instanceof Throwable) {
- // Not throwing exception from here, since we don't want to fail the entire job for a single record
- writeStatus.markFailure(record, exception.get(), recordMetadata);
- LOG.error("Error writing record " + record, exception.get());
- } else {
- write(record, avroRecord);
- }
- }
-
- /**
- * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
- */
- protected GenericRecord rewriteRecord(GenericRecord record) {
- return schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
- : HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
- }
-
- protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
- return schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
- : HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
+ public void write(HoodieRecord record, Schema schema, TypedProperties props) {
+ doWrite(record, schema, props);
}
public abstract List close();
@@ -269,35 +226,7 @@ protected long getAttemptId() {
}
protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable hoodieTable,
- HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException {
+ HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException {
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier);
}
-
- private static class IgnoreRecord implements GenericRecord {
-
- @Override
- public void put(int i, Object v) {
-
- }
-
- @Override
- public Object get(int i) {
- return null;
- }
-
- @Override
- public Schema getSchema() {
- return null;
- }
-
- @Override
- public void put(String key, Object v) {
-
- }
-
- @Override
- public Object get(String key) {
- return null;
- }
- }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java
index a3f7c04ef23a8..fa5ce2c68bd88 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java
@@ -19,7 +19,6 @@
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
@@ -32,7 +31,7 @@
*
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
-public class SingleFileHandleCreateFactory extends CreateHandleFactory implements Serializable {
+public class SingleFileHandleCreateFactory extends CreateHandleFactory implements Serializable {
private final AtomicBoolean isHandleCreated = new AtomicBoolean(false);
private final String fileId;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
index c267b5969d801..46a0b1c614d1f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
@@ -20,13 +20,12 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.io.Serializable;
-public abstract class WriteHandleFactory implements Serializable {
+public abstract class WriteHandleFactory implements Serializable {
private int numFilesWritten = 0;
public abstract HoodieWriteHandle create(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java
similarity index 87%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java
index f065608b29bd5..c143f782d4839 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java
@@ -36,7 +36,6 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -53,8 +52,8 @@
* 1. Records should be added in order of keys
* 2. There are no column stats
*/
-public class HoodieHFileWriter
- implements HoodieFileWriter {
+public class HoodieAvroHFileWriter
+ implements HoodieAvroFileWriter {
private static AtomicLong recordIndex = new AtomicLong(1);
private final Path file;
@@ -73,8 +72,8 @@ public class HoodieHFileWriter
- implements HoodieFileWriter, Closeable {
+public class HoodieAvroOrcWriter implements HoodieAvroFileWriter, Closeable {
private static final AtomicLong RECORD_INDEX = new AtomicLong(1);
private final long maxFileSize;
@@ -68,8 +66,8 @@ public class HoodieOrcWriter
+public class HoodieAvroParquetWriter
extends HoodieBaseParquetWriter
- implements HoodieFileWriter {
+ implements HoodieAvroFileWriter {
private final String fileName;
private final String instantTime;
@@ -60,7 +60,7 @@ public HoodieAvroParquetWriter(Path file,
}
@Override
- public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
+ public void writeAvroWithMetadata(HoodieKey key, IndexedRecord avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index ffdff25738ed7..29d4b44d42651 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -23,7 +23,6 @@
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
@@ -46,7 +45,7 @@
public class HoodieFileWriterFactory {
- public static HoodieFileWriter getFileWriter(
+ public static HoodieFileWriter getFileWriter(
String instantTime, Path path, HoodieTable hoodieTable, HoodieWriteConfig config, Schema schema,
TaskContextSupplier taskContextSupplier) throws IOException {
final String extension = FSUtils.getFileExtension(path.getName());
@@ -64,14 +63,14 @@ public static
throw new UnsupportedOperationException(extension + " format not supported yet.");
}
- private static HoodieFileWriter newParquetFileWriter(
+ private static HoodieAvroFileWriter newParquetFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException {
return newParquetFileWriter(instantTime, path, config, schema, hoodieTable.getHadoopConf(),
taskContextSupplier, populateMetaFields, populateMetaFields);
}
- private static HoodieFileWriter newParquetFileWriter(
+ private static HoodieAvroFileWriter newParquetFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
TaskContextSupplier taskContextSupplier, boolean populateMetaFields, boolean enableBloomFilter) throws IOException {
Option filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
@@ -81,34 +80,34 @@ private static HoodieFi
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());
- return new HoodieAvroParquetWriter<>(path, parquetConfig, instantTime, taskContextSupplier, populateMetaFields);
+ return new HoodieAvroParquetWriter(path, parquetConfig, instantTime, taskContextSupplier, populateMetaFields);
}
- static HoodieFileWriter newHFileFileWriter(
+ static HoodieAvroFileWriter newHFileFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
TaskContextSupplier taskContextSupplier) throws IOException {
BloomFilter filter = createBloomFilter(config);
HoodieHFileConfig hfileConfig = new HoodieHFileConfig(conf,
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(),
- HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION,
+ HoodieAvroHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION,
filter, HFILE_COMPARATOR);
- return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields());
+ return new HoodieAvroHFileWriter(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields());
}
- private static HoodieFileWriter newOrcFileWriter(
+ private static HoodieAvroFileWriter newOrcFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
TaskContextSupplier taskContextSupplier) throws IOException {
BloomFilter filter = createBloomFilter(config);
HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf, config.getOrcCompressionCodec(),
config.getOrcStripeSize(), config.getOrcBlockSize(), config.getOrcMaxFileSize(), filter);
- return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, taskContextSupplier);
+ return new HoodieAvroOrcWriter(instantTime, path, orcConfig, schema, taskContextSupplier);
}
private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
- config.getDynamicBloomFilterMaxNumEntries(),
- config.getBloomFilterType());
+ config.getDynamicBloomFilterMaxNumEntries(),
+ config.getBloomFilterType());
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
index eeb287abd543c..8d44c603c6cc2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
@@ -22,7 +22,6 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import java.io.IOException;
import java.util.Iterator;
@@ -34,10 +33,10 @@
*
* @param HoodieRecordPayload type.
*/
-public interface HoodieCompactionHandler {
+public interface HoodieCompactionHandler {
Iterator> handleUpdate(String instantTime, String partitionPath, String fileId,
Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException;
Iterator> handleInsert(String instantTime, String partitionPath, String fileId,
- Map> recordMap);
+ Map recordMap);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 56526d23db006..e3c40deba07c2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -48,7 +48,6 @@
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -111,7 +110,7 @@
* @param Type of keys
* @param Type of outputs
*/
-public abstract class HoodieTable implements Serializable {
+public abstract class HoodieTable implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieTable.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index f893b4ccd5c4e..18e5e17a6be06 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -26,12 +26,11 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-public abstract class BaseActionExecutor implements Serializable {
+public abstract class BaseActionExecutor implements Serializable {
protected final transient HoodieEngineContext context;
protected final transient Configuration hadoopConf;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
index 8966a5d51c7cb..73ad53e4a6c16 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
@@ -18,14 +18,11 @@
package org.apache.hudi.table.action.bootstrap;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieBootstrapHandle;
-import java.io.IOException;
-
/**
* Consumer that dequeues records from queue and sends to Merge Handle for writing.
*/
@@ -39,12 +36,7 @@ public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) {
@Override
protected void consumeOneRecord(HoodieRecord record) {
- try {
- bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
- .getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
+ bootstrapHandle.write(record, bootstrapHandle.getWriterSchemaWithMetaFields(), new TypedProperties());
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 30ed27b39b77a..c3d550d475058 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -28,7 +28,6 @@
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
@@ -56,7 +55,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-public class CleanActionExecutor extends BaseActionExecutor {
+public class CleanActionExecutor extends BaseActionExecutor {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index d8e51bcd1643e..41815b02a7190 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -24,7 +24,6 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -45,7 +44,7 @@
import java.util.Map;
import java.util.stream.Collectors;
-public class CleanPlanActionExecutor extends BaseActionExecutor> {
+public class CleanPlanActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 79eef43b3c00a..af8b4dcecd60a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -29,7 +29,6 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -71,7 +70,7 @@
*
* 2) It bounds the growth of the files in the file system
*/
-public class CleanPlanner implements Serializable {
+public class CleanPlanner implements Serializable {
private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 15ead5efb0080..f0021a0125cb7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -21,7 +21,6 @@
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -41,7 +40,7 @@
import java.util.Collections;
import java.util.Map;
-public class ClusteringPlanActionExecutor extends BaseActionExecutor> {
+public class ClusteringPlanActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(ClusteringPlanActionExecutor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index 163947fa34481..a4b09d006ee21 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -20,7 +20,6 @@
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -34,7 +33,7 @@
/**
* Pluggable implementation for writing data into new file groups based on ClusteringPlan.
*/
-public abstract class ClusteringExecutionStrategy implements Serializable {
+public abstract class ClusteringExecutionStrategy implements Serializable {
private static final Logger LOG = LogManager.getLogger(ClusteringExecutionStrategy.class);
private final HoodieTable hoodieTable;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index a96ff73947cdb..53f71b6ee8c26 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -25,7 +25,6 @@
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -50,7 +49,7 @@
/**
* Pluggable implementation for scheduling clustering and creating ClusteringPlan.
*/
-public abstract class ClusteringPlanStrategy implements Serializable {
+public abstract class ClusteringPlanStrategy implements Serializable {
private static final Logger LOG = LogManager.getLogger(ClusteringPlanStrategy.class);
public static final int CLUSTERING_PLAN_VERSION_1 = 1;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 5d62ef390233f..00d0e3ba37795 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -24,7 +24,6 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -44,7 +43,7 @@
/**
* Scheduling strategy with restriction that clustering groups can only contain files from same partition.
*/
-public abstract class PartitionAwareClusteringPlanStrategy extends ClusteringPlanStrategy {
+public abstract class PartitionAwareClusteringPlanStrategy extends ClusteringPlanStrategy {
private static final Logger LOG = LogManager.getLogger(PartitionAwareClusteringPlanStrategy.class);
public PartitionAwareClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
index 4e33eb06038cd..e055b4477cc35 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
@@ -20,7 +20,6 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import java.util.Set;
@@ -28,7 +27,7 @@
/**
* When file groups in clustering, write records to these file group need to check.
*/
-public abstract class UpdateStrategy {
+public abstract class UpdateStrategy {
protected final HoodieEngineContext engineContext;
protected Set fileGroupsInPendingClustering;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index 5355194ff75bf..5b7118c6d906b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -18,7 +18,6 @@
package org.apache.hudi.table.action.commit;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.WriteHandleFactory;
@@ -26,7 +25,7 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-public abstract class BaseBulkInsertHelper {
+public abstract class BaseBulkInsertHelper {
/**
* Mark instant as inflight, write input records, update index and return result.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 31c8bbd6d30d2..198b470fcc1cb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -31,7 +31,6 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -70,7 +69,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-public abstract class BaseCommitActionExecutor
+public abstract class BaseCommitActionExecutor
extends BaseActionExecutor {
private static final Logger LOG = LogManager.getLogger(BaseCommitActionExecutor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
index b119587f47535..ceeb2aeb70dee 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -29,7 +28,7 @@
*
* @param
*/
-public abstract class BaseDeleteHelper {
+public abstract class BaseDeleteHelper {
/**
* Deduplicate Hoodie records, using the given deduplication function.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 5ead348140aa3..393ee9ddb698c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -18,12 +18,13 @@
package org.apache.hudi.table.action.commit;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -40,6 +41,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import javax.annotation.Nonnull;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Iterator;
@@ -47,7 +50,7 @@
/**
* Helper to read records from previous version of base file and run Merge.
*/
-public abstract class BaseMergeHelper {
+public abstract class BaseMergeHelper {
/**
* Read records from previous version of base file and merge.
@@ -57,7 +60,7 @@ public abstract class BaseMergeHelper {
*/
public abstract void runMerge(HoodieTable table, HoodieMergeHandle upsertHandle) throws IOException;
- protected GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader gReader, GenericDatumWriter gWriter,
+ protected HoodieRecord transformRecordBasedOnNewSchema(GenericDatumReader gReader, GenericDatumWriter gWriter,
ThreadLocal encoderCache, ThreadLocal decoderCache,
GenericRecord gRec) {
ByteArrayOutputStream inStream = null;
@@ -71,7 +74,7 @@ protected GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader getMergingIterator(HoodieTable table, HoodieMergeHandle mergeHandle,
- HoodieBaseFile baseFile, HoodieFileReader reader,
- Schema readSchema, boolean externalSchemaTransformation) throws IOException {
+ protected Iterator getMergingIterator(HoodieTable table,
+ HoodieMergeHandle mergeHandle,
+ HoodieBaseFile baseFile,
+ HoodieFileReader reader,
+ Schema readerSchema,
+ boolean externalSchemaTransformation) throws IOException {
Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
- HoodieFileReader bootstrapReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, externalFilePath);
+ HoodieFileReader bootstrapReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, externalFilePath);
+
Schema bootstrapReadSchema;
if (externalSchemaTransformation) {
bootstrapReadSchema = bootstrapReader.getSchema();
@@ -102,14 +108,25 @@ protected Iterator getMergingIterator(HoodieTable tab
bootstrapReadSchema = mergeHandle.getWriterSchema();
}
- return new MergingIterator<>(reader.getRecordIterator(readSchema), bootstrapReader.getRecordIterator(bootstrapReadSchema),
- (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
+ return new MergingIterator<>(
+ reader.getRecordIterator(readerSchema, HoodieAvroIndexedRecord::new),
+ bootstrapReader.getRecordIterator(bootstrapReadSchema, HoodieAvroIndexedRecord::new),
+ (oneRecord, otherRecord) -> mergeRecords(oneRecord, otherRecord, readerSchema, mergeHandle.getWriterSchemaWithMetaFields()));
+ }
+
+ @Nonnull
+ private static HoodieRecord mergeRecords(HoodieRecord one, HoodieRecord other, Schema readerSchema, Schema writerSchema) {
+ try {
+ return one.mergeWith(other, readerSchema, writerSchema);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to merge records", e);
+ }
}
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
- protected static class UpdateHandler extends BoundedInMemoryQueueConsumer {
+ protected static class UpdateHandler extends BoundedInMemoryQueueConsumer {
private final HoodieMergeHandle upsertHandle;
@@ -118,12 +135,13 @@ protected UpdateHandler(HoodieMergeHandle upsertHandle) {
}
@Override
- protected void consumeOneRecord(GenericRecord record) {
+ protected void consumeOneRecord(HoodieRecord record) {
upsertHandle.write(record);
}
@Override
- protected void finish() {}
+ protected void finish() {
+ }
@Override
protected Void getResult() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 846afec7c1db3..d8682152fa93e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
@@ -30,7 +29,7 @@
import java.time.Duration;
import java.time.Instant;
-public abstract class BaseWriteHelper {
+public abstract class BaseWriteHelper {
public HoodieWriteMetadata write(String instantTime,
I inputRecords,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
index fff52eb24d736..f54184abb0169 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
@@ -26,7 +26,6 @@
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
@@ -45,7 +44,7 @@
* @param
*/
@SuppressWarnings("checkstyle:LineLength")
-public class HoodieDeleteHelper extends
+public class HoodieDeleteHelper extends
BaseDeleteHelper>, HoodieData, HoodieData, R> {
private HoodieDeleteHelper() {
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 3e2d8abdd7466..6935e67f2d5b2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -19,14 +19,15 @@
package org.apache.hudi.table.action.commit;
import org.apache.avro.SchemaCompatibility;
-import org.apache.hudi.avro.HoodieAvroUtils;
+
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
@@ -49,17 +50,22 @@
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.stream.Collectors;
-public class HoodieMergeHelper extends
+public class HoodieMergeHelper extends
BaseMergeHelper>, HoodieData, HoodieData> {
+ private static final Logger LOG = LogManager.getLogger(HoodieMergeHelper.class);
+
private HoodieMergeHelper() {
}
@@ -92,7 +98,7 @@ public void runMerge(HoodieTable>, HoodieData wrapper = null;
- HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
+ HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
Option querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
boolean needToReWriteRecord = false;
@@ -124,14 +130,14 @@ public void runMerge(HoodieTable>, HoodieData readerIterator;
+ final Iterator readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
if (needToReWriteRecord) {
- readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols);
+ readerIterator = new RewriteIterator(reader.getRecordIterator(HoodieAvroIndexedRecord::new), readSchema, readSchema, table.getConfig().getProps(), renameCols);
} else {
- readerIterator = reader.getRecordIterator(readSchema);
+ readerIterator = reader.getRecordIterator(readSchema, HoodieAvroIndexedRecord::new);
}
}
@@ -142,7 +148,8 @@ public void runMerge(HoodieTable>, HoodieData>, HoodieData {
+
+ private final ClosableIterator iter;
+ private final Schema newSchema;
+ private final Schema recordSchema;
+ private final Properties prop;
+ private final Map renameCols;
+
+ public RewriteIterator(ClosableIterator iter, Schema newSchema, Schema recordSchema, Properties prop, Map renameCols) {
+ this.iter = iter;
+ this.newSchema = newSchema;
+ this.recordSchema = recordSchema;
+ this.prop = prop;
+ this.renameCols = renameCols;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public HoodieRecord next() {
+ try {
+ return iter.next().rewriteRecordWithNewSchema(recordSchema, prop, newSchema, renameCols);
+ } catch (IOException e) {
+ LOG.error("Error rewrite record with new schema", e);
+ throw new HoodieException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ iter.close();
+ }
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index b56d39b8e3679..e24cd71ab64c4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -21,15 +21,13 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
-public class HoodieWriteHelper extends BaseWriteHelper>,
+public class HoodieWriteHelper extends BaseWriteHelper>,
HoodieData, HoodieData, R> {
private HoodieWriteHelper() {
@@ -60,10 +58,10 @@ public HoodieData> deduplicateRecords(
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
- T reducedData = (T) rec2.getData().preCombine(rec1.getData());
- HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
+ HoodieRecord reducedRec = rec2.preCombine(rec1);
+ HoodieKey reducedKey = rec1.getData().equals(reducedRec) ? rec1.getKey() : rec2.getKey();
- return new HoodieAvroRecord<>(reducedKey, reducedData);
+ return (HoodieRecord) reducedRec.newInstance(reducedKey);
}, parallelism).map(Pair::getRight);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
index 3379d16f4c035..75fbb6b27ada3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -22,7 +22,6 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -43,7 +42,7 @@
* @param Type of keys
* @param Type of outputs
*/
-public class CompactHelpers {
+public class CompactHelpers {
private static final CompactHelpers SINGLETON_INSTANCE = new CompactHelpers();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 75954872aedd5..69c6b48feaf92 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -31,7 +31,6 @@
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -75,7 +74,7 @@
/**
* A HoodieCompactor runs compaction on a hoodie table.
*/
-public abstract class HoodieCompactor implements Serializable {
+public abstract class HoodieCompactor implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index fc4ae986e6d55..bfee3adbd4c81 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -25,7 +25,6 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -45,7 +44,7 @@
import java.util.List;
@SuppressWarnings("checkstyle:LineLength")
-public class RunCompactionActionExecutor extends
+public class RunCompactionActionExecutor extends
BaseActionExecutor>, HoodieData, HoodieData, HoodieWriteMetadata>> {
private final HoodieCompactor compactor;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 05fb7c0c92d1d..e662d21a5e36e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -22,7 +22,6 @@
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -48,7 +47,7 @@
import java.util.Set;
import java.util.stream.Collectors;
-public class ScheduleCompactionActionExecutor extends BaseActionExecutor> {
+public class ScheduleCompactionActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(ScheduleCompactionActionExecutor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 339e95b9e08d4..0f31f7157c6a4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -28,7 +28,6 @@
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -82,7 +81,7 @@
* Reads the index plan and executes the plan.
* It also reconciles updates on data timeline while indexing was in progress.
*/
-public class RunIndexActionExecutor extends BaseActionExecutor> {
+public class RunIndexActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
index cfd975c5098a4..202da9bf8159c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
@@ -23,7 +23,6 @@
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -60,7 +59,7 @@
* 3. Initialize file groups for the enabled partition types within a transaction.
*
*/
-public class ScheduleIndexActionExecutor extends BaseActionExecutor> {
+public class ScheduleIndexActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(ScheduleIndexActionExecutor.class);
private static final Integer INDEX_PLAN_VERSION_1 = 1;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 1fac279f8ec2c..19a688026a3ae 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -24,7 +24,6 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -49,7 +48,7 @@
import java.util.Map;
import java.util.stream.Collectors;
-public abstract class BaseRestoreActionExecutor extends BaseActionExecutor {
+public abstract class BaseRestoreActionExecutor extends BaseActionExecutor {
private static final Logger LOG = LogManager.getLogger(BaseRestoreActionExecutor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
index facab71c6237b..5c4ee0f0aa900 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
@@ -21,7 +21,6 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -30,7 +29,7 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
-public class CopyOnWriteRestoreActionExecutor
+public class CopyOnWriteRestoreActionExecutor
extends BaseRestoreActionExecutor {
public CopyOnWriteRestoreActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
index 661cee4a2e608..39a8abde3b264 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
@@ -21,7 +21,6 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -29,7 +28,7 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
-public class MergeOnReadRestoreActionExecutor
+public class MergeOnReadRestoreActionExecutor
extends BaseRestoreActionExecutor {
public MergeOnReadRestoreActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, String restoreInstantTime) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 8e34f0fe59dac..c26abccadb84e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -25,7 +25,6 @@
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -50,7 +49,7 @@
import java.util.Objects;
import java.util.stream.Collectors;
-public abstract class BaseRollbackActionExecutor extends BaseActionExecutor {
+public abstract class BaseRollbackActionExecutor extends BaseActionExecutor {
private static final Logger LOG = LogManager.getLogger(BaseRollbackActionExecutor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
index f95ec5d5c9fe1..541fa87206288 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
@@ -22,7 +22,6 @@
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -44,7 +43,7 @@
* Base rollback plan action executor to assist in scheduling rollback requests. This phase serialized {@link HoodieRollbackPlan}
* to rollback.requested instant.
*/
-public class BaseRollbackPlanActionExecutor extends BaseActionExecutor> {
+public class BaseRollbackPlanActionExecutor extends BaseActionExecutor> {
private static final Logger LOG = LogManager.getLogger(BaseRollbackPlanActionExecutor.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index 5315ce713eef3..759ac04f45806 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -21,7 +21,6 @@
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
@@ -34,7 +33,7 @@
import java.util.ArrayList;
import java.util.List;
-public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecutor {
+public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecutor