diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index 3802bb46a0f5b..e8b0d042eb023 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -18,17 +18,15 @@ package org.apache.hudi.cli import org.apache.avro.Schema -import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.avro.HoodieAvroWriteSupport import org.apache.hudi.client.SparkTaskContextSupplier -import org.apache.hudi.common.HoodieJsonPayload import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.util.BaseFileUtils import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig} -import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter} +import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter} import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -50,13 +48,13 @@ object SparkHelpers { // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) - val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(), + val writer = new HoodieAvroParquetWriter(instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(), true) for (rec <- sourceRecords) { val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString if (!keysToSkip.contains(key)) { - writer.writeAvro(key, rec) + writer.write(key, rec) } } writer.close diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java index 47dde723e00dd..ad44112236559 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java @@ -18,14 +18,14 @@ package org.apache.hudi.client.utils; -import java.util.Iterator; -import java.util.function.Function; -import org.apache.avro.generic.GenericRecord; - +import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; -public class MergingIterator implements Iterator { +import java.util.Iterator; +import java.util.function.Function; + +public class MergingIterator implements Iterator { private final Iterator leftIterator; private final Iterator rightIterator; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index b714c50334b4f..51e6a7a662416 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -31,7 +31,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIndexException; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; @@ -132,7 +132,7 @@ public static List filterKeysFromFile(Path filePath, List candid // Load all rowKeys from the file, to double-confirm if (!candidateRecordKeys.isEmpty()) { HoodieTimer timer = new HoodieTimer().startTimer(); - HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath); + HoodieAvroFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath); Set fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys)); foundRecordKeys.addAll(fileRowKeys); LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java index c33c0f08ca830..ee17dea4206dc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; @@ -29,8 +30,6 @@ import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.generic.GenericRecord; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -93,7 +92,7 @@ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTa public void write(GenericRecord oldRecord) { String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); try { - fileWriter.writeAvro(key, oldRecord); + fileWriter.write(key, oldRecord); } catch (IOException | RuntimeException e) { String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s", key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 096c257b1f797..e06f3666f5bbe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -18,6 +18,10 @@ package org.apache.hudi.io; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -32,14 +36,9 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; -import org.apache.hudi.io.storage.HoodieFileWriter; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -53,7 +52,7 @@ public class HoodieCreateHandle extends private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class); - protected final HoodieFileWriter fileWriter; + protected final HoodieAvroFileWriter fileWriter; protected final Path path; protected long recordsWritten = 0; protected long insertRecordsWritten = 0; @@ -142,9 +141,9 @@ public void write(HoodieRecord record, Option avroRecord) { if (preserveHoodieMetadata) { // do not preserve FILENAME_METADATA_FIELD recordWithMetadataInSchema.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD), path.getName()); - fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema); + fileWriter.write(record.getRecordKey(), recordWithMetadataInSchema); } else { - fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record); + fileWriter.writeWithMetadata(recordWithMetadataInSchema, record); } // update the new location of record, so we know where to find it next record.unseal(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java index 12d075e0cb532..acdcd6bc7a499 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; @@ -31,10 +32,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndexUtils; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.table.HoodieTable; - -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -93,7 +92,7 @@ private BloomFilter getBloomFilter() { new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(), BloomFilterTypeCode.DYNAMIC_V0); } else { - try (HoodieFileReader reader = createNewFileReader()) { + try (HoodieAvroFileReader reader = createNewFileReader()) { bloomFilter = reader.readBloomFilter(); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 32d4ec2a6d794..e5d0377bd1b42 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -18,6 +18,10 @@ package org.apache.hudi.io; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -39,17 +43,12 @@ import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; -import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -96,7 +95,7 @@ public class HoodieMergeHandle extends H protected Map> keyToNewRecords; protected Set writtenRecordKeys; - protected HoodieFileWriter fileWriter; + protected HoodieAvroFileWriter fileWriter; private boolean preserveMetadata = false; protected Path newFilePath; @@ -294,9 +293,9 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option h } public String[] getMinMaxKeys() throws IOException { - try (HoodieFileReader reader = createNewFileReader()) { + try (HoodieAvroFileReader reader = createNewFileReader()) { return reader.readMinMaxRecordKeys(); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java index fee75b22decd7..094400bdf47fc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; @@ -62,7 +62,7 @@ protected HoodieBaseFile getLatestDataFile() { .getLatestBaseFile(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight()).get(); } - protected HoodieFileReader createNewFileReader() throws IOException { + protected HoodieAvroFileReader createNewFileReader() throws IOException { return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), new Path(getLatestDataFile().getPath())); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 28e88e16a6482..15c599a097d10 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -18,6 +18,11 @@ package org.apache.hudi.io; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -30,16 +35,10 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieFileWriter; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkersFactory; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -256,8 +255,8 @@ protected long getAttemptId() { return taskContextSupplier.getAttemptIdSupplier().get(); } - protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable hoodieTable, - HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { + protected HoodieAvroFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable hoodieTable, + HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java similarity index 66% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java index a5792349cad16..ae37a7b7fd18e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java @@ -18,28 +18,34 @@ package org.apache.hudi.io.storage; -import java.util.concurrent.atomic.AtomicLong; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.io.storage.HoodieRecordFileWriter; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.avro.generic.IndexedRecord; - import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; -public interface HoodieFileWriter { - - void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException; - - boolean canWrite(); +public interface HoodieAvroFileWriter extends HoodieRecordFileWriter { - void close() throws IOException; + long getBytesWritten(); - void writeAvro(String key, R oldRecord) throws IOException; + // TODO rename + @Override + default void writeWithMetadata(HoodieRecord record, Schema schema, Properties props) throws IOException { + record.writeWithMetadata(this, schema, props); + } - long getBytesWritten(); + // TODO rename + @Override + default void write(HoodieRecord record, Schema schema, Properties props) throws IOException { + record.write(this, schema, props); + } - default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { + default void prepRecordWithMetadata(IndexedRecord avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement()); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java similarity index 85% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java index 2ad6d7f9220b0..0d06416998d6f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java @@ -18,14 +18,6 @@ package org.apache.hudi.io.storage; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -38,6 +30,12 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.io.Writable; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -54,8 +52,8 @@ * 1. Records should be added in order of keys * 2. There are no column stats */ -public class HoodieHFileWriter - implements HoodieFileWriter { +public class HoodieAvroHFileWriter + implements HoodieAvroFileWriter { private static AtomicLong recordIndex = new AtomicLong(1); private final Path file; @@ -74,8 +72,8 @@ public class HoodieHFileWriter - implements HoodieFileWriter, Closeable { +public class HoodieAvroOrcWriter implements HoodieAvroFileWriter, Closeable { private static final AtomicLong RECORD_INDEX = new AtomicLong(1); private final long maxFileSize; @@ -67,8 +66,8 @@ public class HoodieOrcWritercanWrite() */ -public class HoodieParquetWriter - extends ParquetWriter implements HoodieFileWriter { +public class HoodieAvroParquetWriter extends ParquetWriter implements HoodieAvroFileWriter { private static AtomicLong recordIndex = new AtomicLong(1); @@ -51,12 +48,12 @@ public class HoodieParquetWriter HoodieFileWriter getFileWriter( + public static HoodieAvroFileWriter getFileWriter( String instantTime, Path path, HoodieTable hoodieTable, HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { final String extension = FSUtils.getFileExtension(path.getName()); @@ -61,13 +60,13 @@ public static throw new UnsupportedOperationException(extension + " format not supported yet."); } - private static HoodieFileWriter newParquetFileWriter( + private static HoodieAvroFileWriter newParquetFileWriter( String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException { return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier, populateMetaFields, populateMetaFields); } - private static HoodieFileWriter newParquetFileWriter( + private static HoodieAvroFileWriter newParquetFileWriter( String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier, boolean populateMetaFields, boolean enableBloomFilter) throws IOException { Option filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty(); @@ -77,34 +76,34 @@ private static HoodieFi config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(), hoodieTable.getHadoopConf(), config.getParquetCompressionRatio(), config.parquetDictionaryEnabled()); - return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields); + return new HoodieAvroParquetWriter(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields); } - private static HoodieFileWriter newHFileFileWriter( + private static HoodieAvroFileWriter newHFileFileWriter( String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier) throws IOException { BloomFilter filter = createBloomFilter(config); HoodieHFileConfig hfileConfig = new HoodieHFileConfig(hoodieTable.getHadoopConf(), config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(), - HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, + HoodieAvroHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); - return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields()); + return new HoodieAvroHFileWriter(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields()); } - private static HoodieFileWriter newOrcFileWriter( + private static HoodieAvroFileWriter newOrcFileWriter( String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier) throws IOException { BloomFilter filter = createBloomFilter(config); HoodieOrcConfig orcConfig = new HoodieOrcConfig(hoodieTable.getHadoopConf(), config.getOrcCompressionCodec(), config.getOrcStripeSize(), config.getOrcBlockSize(), config.getOrcMaxFileSize(), filter); - return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, taskContextSupplier); + return new HoodieAvroOrcWriter(instantTime, path, orcConfig, schema, taskContextSupplier); } private static BloomFilter createBloomFilter(HoodieWriteConfig config) { return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(), - config.getDynamicBloomFilterMaxNumEntries(), - config.getBloomFilterType()); + config.getDynamicBloomFilterMaxNumEntries(), + config.getBloomFilterType()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 5ead348140aa3..c1b2a14cd95ac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -18,29 +18,20 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Iterator; @@ -57,30 +48,8 @@ public abstract class BaseMergeHelper { */ public abstract void runMerge(HoodieTable table, HoodieMergeHandle upsertHandle) throws IOException; - protected GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader gReader, GenericDatumWriter gWriter, - ThreadLocal encoderCache, ThreadLocal decoderCache, - GenericRecord gRec) { - ByteArrayOutputStream inStream = null; - try { - inStream = new ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(inStream, encoderCache.get()); - encoderCache.set(encoder); - gWriter.write(gRec, encoder); - encoder.flush(); - - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inStream.toByteArray(), decoderCache.get()); - decoderCache.set(decoder); - GenericRecord transformedRec = gReader.read(null, decoder); - return transformedRec; - } catch (IOException e) { - throw new HoodieException(e); - } finally { - try { - inStream.close(); - } catch (IOException ioe) { - throw new HoodieException(ioe.getMessage(), ioe); - } - } + protected GenericRecord transformRecordBasedOnNewSchema(GenericRecord record, Schema writerSchema) { + return HoodieAvroUtils.rewriteRecord(record, writerSchema); } /** @@ -90,11 +59,11 @@ protected GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader getMergingIterator(HoodieTable table, HoodieMergeHandle mergeHandle, - HoodieBaseFile baseFile, HoodieFileReader reader, + HoodieBaseFile baseFile, HoodieAvroFileReader reader, Schema readSchema, boolean externalSchemaTransformation) throws IOException { Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); - HoodieFileReader bootstrapReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, externalFilePath); + HoodieAvroFileReader bootstrapReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, externalFilePath); Schema bootstrapReadSchema; if (externalSchemaTransformation) { bootstrapReadSchema = bootstrapReader.getSchema(); @@ -103,7 +72,7 @@ protected Iterator getMergingIterator(HoodieTable tab } return new MergingIterator<>(reader.getRecordIterator(readSchema), bootstrapReader.getRecordIterator(bootstrapReadSchema), - (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields())); + (inputRecordPair) -> HoodieAvroUtils.stitchRecords((GenericRecord) inputRecordPair.getLeft(), (GenericRecord) inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields())); } /** diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java index fd25d92cba62e..b5aec1119e4e0 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -18,15 +18,6 @@ package org.apache.hudi.io.storage; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.BloomFilterFactory; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; -import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.model.EmptyHoodieRecordPayload; -import org.apache.hudi.common.model.HoodieAvroRecord; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -36,6 +27,14 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; @@ -94,7 +93,7 @@ private static Stream populateMetaFieldsAndTestAvroWithMeta() { }).map(Arguments::of); } - private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateMetaFields) throws Exception { + private HoodieAvroHFileWriter createHFileWriter(Schema avroSchema, boolean populateMetaFields) throws Exception { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); Configuration conf = new Configuration(); TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class); @@ -104,15 +103,15 @@ private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateM String instantTime = "000"; HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024, - HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); - return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier, populateMetaFields); + HoodieAvroHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); + return new HoodieAvroHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier, populateMetaFields); } @ParameterizedTest @MethodSource("populateMetaFieldsAndTestAvroWithMeta") public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithMeta) throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc"); - HoodieHFileWriter writer = createHFileWriter(avroSchema, populateMetaFields); + HoodieAvroHFileWriter writer = createHFileWriter(avroSchema, populateMetaFields); List keys = new ArrayList<>(); Map recordMap = new HashMap<>(); for (int i = 0; i < 100; i++) { @@ -123,11 +122,11 @@ public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithM record.put("time", Integer.toString(RANDOM.nextInt())); record.put("number", i); if (testAvroWithMeta) { - writer.writeAvroWithMetadata(record, new HoodieAvroRecord(new HoodieKey((String) record.get("_row_key"), + writer.writeWithMetadata(record, new HoodieAvroRecord(new HoodieKey((String) record.get("_row_key"), Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed in is what matters // only HoodieKey will be looked up from the 2nd arg(HoodieRecord). } else { - writer.writeAvro(key, record); + writer.write(key, record); } recordMap.put(key, record); } @@ -135,7 +134,7 @@ public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithM Configuration conf = new Configuration(); CacheConfig cacheConfig = new CacheConfig(conf); - HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); + HoodieAvroHFileReader hoodieHFileReader = new HoodieAvroHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); List> records = hoodieHFileReader.readAllRecords(); records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()))); hoodieHFileReader.close(); @@ -145,7 +144,7 @@ public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithM Set rowsToFetch = getRandomKeys(randomRowstoFetch, keys); List rowsList = new ArrayList<>(rowsToFetch); Collections.sort(rowsList); - hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); + hoodieHFileReader = new HoodieAvroHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); List> result = hoodieHFileReader.readRecords(rowsList); assertEquals(result.size(), randomRowstoFetch); result.forEach(entry -> { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java index 68143a215c51c..14756a650f229 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java @@ -47,10 +47,10 @@ import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertEquals; public class TestHoodieOrcReaderWriter { private final Path filePath = new Path(System.getProperty("java.io.tmpdir") + "/f1_1-0-1_000.orc"); @@ -64,7 +64,7 @@ public void clearTempFile() { } } - private HoodieOrcWriter createOrcWriter(Schema avroSchema) throws Exception { + private HoodieAvroOrcWriter createOrcWriter(Schema avroSchema) throws Exception { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); Configuration conf = new Configuration(); int orcStripSize = Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue()); @@ -73,19 +73,19 @@ private HoodieOrcWriter createOrcWriter(Schema avroSchema) throws Exception { HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, orcStripSize, orcBlockSize, maxFileSize, filter); TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class); String instantTime = "000"; - return new HoodieOrcWriter(instantTime, filePath, config, avroSchema, mockTaskContextSupplier); + return new HoodieAvroOrcWriter(instantTime, filePath, config, avroSchema, mockTaskContextSupplier); } @Test public void testWriteReadMetadata() throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); - HoodieOrcWriter writer = createOrcWriter(avroSchema); + HoodieAvroOrcWriter writer = createOrcWriter(avroSchema); for (int i = 0; i < 3; i++) { GenericRecord record = new GenericData.Record(avroSchema); record.put("_row_key", "key" + i); record.put("time", Integer.toString(i)); record.put("number", i); - writer.writeAvro("key" + i, record); + writer.write("key" + i, record); } writer.close(); @@ -98,7 +98,7 @@ public void testWriteReadMetadata() throws Exception { assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY)); assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString()); - HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + HoodieAvroFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); BloomFilter filter = hoodieReader.readBloomFilter(); for (int i = 0; i < 3; i++) { assertTrue(filter.mightContain("key" + i)); @@ -114,13 +114,13 @@ public void testWriteReadMetadata() throws Exception { @Test public void testWriteReadPrimitiveRecord() throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); - HoodieOrcWriter writer = createOrcWriter(avroSchema); + HoodieAvroOrcWriter writer = createOrcWriter(avroSchema); for (int i = 0; i < 3; i++) { GenericRecord record = new GenericData.Record(avroSchema); record.put("_row_key", "key" + i); record.put("time", Integer.toString(i)); record.put("number", i); - writer.writeAvro("key" + i, record); + writer.write("key" + i, record); } writer.close(); @@ -129,7 +129,7 @@ public void testWriteReadPrimitiveRecord() throws Exception { assertEquals("struct<_row_key:string,time:string,number:int>", orcReader.getSchema().toString()); assertEquals(3, orcReader.getNumberOfRows()); - HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + HoodieAvroFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); Iterator iter = hoodieReader.getRecordIterator(); int index = 0; while (iter.hasNext()) { @@ -145,7 +145,7 @@ public void testWriteReadPrimitiveRecord() throws Exception { public void testWriteReadComplexRecord() throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithUDT.avsc"); Schema udtSchema = avroSchema.getField("driver").schema().getTypes().get(1); - HoodieOrcWriter writer = createOrcWriter(avroSchema); + HoodieAvroOrcWriter writer = createOrcWriter(avroSchema); for (int i = 0; i < 3; i++) { GenericRecord record = new GenericData.Record(avroSchema); record.put("_row_key", "key" + i); @@ -156,7 +156,7 @@ public void testWriteReadComplexRecord() throws Exception { innerRecord.put("list", Collections.singletonList(i)); innerRecord.put("map", Collections.singletonMap("key" + i, "value" + i)); record.put("driver", innerRecord); - writer.writeAvro("key" + i, record); + writer.write("key" + i, record); } writer.close(); @@ -166,7 +166,7 @@ public void testWriteReadComplexRecord() throws Exception { reader.getSchema().toString()); assertEquals(3, reader.getNumberOfRows()); - HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + HoodieAvroFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); Iterator iter = hoodieReader.getRecordIterator(); int index = 0; while (iter.hasNext()) { @@ -186,18 +186,18 @@ public void testWriteReadComplexRecord() throws Exception { @Test public void testWriteReadWithEvolvedSchema() throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); - HoodieOrcWriter writer = createOrcWriter(avroSchema); + HoodieAvroOrcWriter writer = createOrcWriter(avroSchema); for (int i = 0; i < 3; i++) { GenericRecord record = new GenericData.Record(avroSchema); record.put("_row_key", "key" + i); record.put("time", Integer.toString(i)); record.put("number", i); - writer.writeAvro("key" + i, record); + writer.write("key" + i, record); } writer.close(); Configuration conf = new Configuration(); - HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + HoodieAvroFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); Schema evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchema.avsc"); Iterator iter = hoodieReader.getRecordIterator(evolvedSchema); int index = 0; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index 3488a1365ce88..7ad76d64aa804 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -19,6 +19,12 @@ package org.apache.hudi.testutils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -38,18 +44,11 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.io.storage.HoodieAvroOrcWriter; import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieOrcConfig; -import org.apache.hudi.io.storage.HoodieOrcWriter; -import org.apache.hudi.io.storage.HoodieParquetWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.orc.CompressionKind; @@ -113,7 +112,7 @@ public HoodieWriteableTestTable withInserts(String partition, String fileId, Lis HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue())); - try (HoodieParquetWriter writer = new HoodieParquetWriter( + try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter( currentInstantTime, new Path(Paths.get(basePath, partition, fileName).toString()), config, schema, contextSupplier, populateMetaFields)) { @@ -123,10 +122,10 @@ public HoodieWriteableTestTable withInserts(String partition, String fileId, Lis if (populateMetaFields) { HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++)); HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); - writer.writeAvro(record.getRecordKey(), avroRecord); + writer.write(record.getRecordKey(), avroRecord); filter.add(record.getRecordKey()); } else { - writer.writeAvro(record.getRecordKey(), avroRecord); + writer.write(record.getRecordKey(), avroRecord); } } } @@ -136,7 +135,7 @@ public HoodieWriteableTestTable withInserts(String partition, String fileId, Lis int orcBlockSize = Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue()); int maxFileSize = Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue()); HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, orcStripSize, orcBlockSize, maxFileSize, filter); - try (HoodieOrcWriter writer = new HoodieOrcWriter( + try (HoodieAvroOrcWriter writer = new HoodieAvroOrcWriter( currentInstantTime, new Path(Paths.get(basePath, partition, fileName).toString()), config, schema, contextSupplier)) { @@ -145,7 +144,7 @@ public HoodieWriteableTestTable withInserts(String partition, String fileId, Lis GenericRecord avroRecord = (GenericRecord) ((HoodieRecordPayload) record.getData()).getInsertValue(schema).get(); HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++)); HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); - writer.writeAvro(record.getRecordKey(), avroRecord); + writer.write(record.getRecordKey(), avroRecord); filter.add(record.getRecordKey()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java index 300e8c512bb34..ed4279a7a89e4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java @@ -18,6 +18,8 @@ package org.apache.hudi.io; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -25,9 +27,6 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +61,7 @@ public FlinkConcatAndReplaceHandle(HoodieWriteConfig config, String instantTime, public void write(GenericRecord oldRecord) { String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); try { - fileWriter.writeAvro(key, oldRecord); + fileWriter.write(key, oldRecord); } catch (IOException | RuntimeException e) { String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s", key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true)); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java index 812155c3d2fb0..f8628d5201eff 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -25,8 +26,6 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +60,7 @@ public FlinkConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTab public void write(GenericRecord oldRecord) { String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); try { - fileWriter.writeAvro(key, oldRecord); + fileWriter.write(key, oldRecord); } catch (IOException | RuntimeException e) { String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s", key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true)); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index c88b4ee66a098..7207ae624b137 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -18,6 +18,9 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; @@ -28,23 +31,14 @@ import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.hadoop.conf.Configuration; +import scala.collection.immutable.List; import java.io.IOException; import java.util.Iterator; -import scala.collection.immutable.List; - public class FlinkMergeHelper extends BaseMergeHelper>, List, List> { @@ -62,41 +56,35 @@ public static FlinkMergeHelper newInstance() { @Override public void runMerge(HoodieTable>, List, List> table, HoodieMergeHandle>, List, List> mergeHandle) throws IOException { - final GenericDatumWriter gWriter; - final GenericDatumReader gReader; - Schema readSchema; - final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation(); HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); + + Schema readerSchema; + Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); + if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { - readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); - gWriter = new GenericDatumWriter<>(readSchema); - gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields()); + readerSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); } else { - gReader = null; - gWriter = null; - readSchema = mergeHandle.getWriterSchemaWithMetaFields(); + readerSchema = mergeHandle.getWriterSchemaWithMetaFields(); } BoundedInMemoryExecutor wrapper = null; Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf()); - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); + HoodieAvroFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); try { final Iterator readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { - readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); + readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readerSchema, externalSchemaTransformation); } else { - readerIterator = reader.getRecordIterator(readSchema); + readerIterator = reader.getRecordIterator(readerSchema); } - ThreadLocal encoderCache = new ThreadLocal<>(); - ThreadLocal decoderCache = new ThreadLocal<>(); wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), Option.of(new UpdateHandler(mergeHandle)), record -> { if (!externalSchemaTransformation) { return record; } - return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); + return transformRecordBasedOnNewSchema((GenericRecord) record, writerSchema); }); wrapper.execute(); } catch (Exception e) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index 7e67b087f9891..7c1942e0d6b18 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -19,6 +19,10 @@ package org.apache.hudi.client.clustering.run.strategy; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -41,7 +45,7 @@ import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner; import org.apache.hudi.io.IOUtils; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; @@ -49,11 +53,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -190,7 +189,7 @@ private List> readRecordsForGroupWithLogs(List baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) + Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))); HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); @@ -216,7 +215,7 @@ private List> readRecordsForGroupBaseFiles(List { try { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + HoodieAvroFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); Iterator recordIterator = baseFileReader.getRecordIterator(readerSchema); recordIterator.forEachRemaining(record -> records.add(transform(record))); } catch (IOException e) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java index 0df6d3a90cc50..50b838bfd5991 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java @@ -18,6 +18,9 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; @@ -28,18 +31,10 @@ import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.hadoop.conf.Configuration; - import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -66,37 +61,31 @@ public void runMerge(HoodieTable>, List, List HoodieMergeHandle>, List, List> mergeHandle = upsertHandle; HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); - final GenericDatumWriter gWriter; - final GenericDatumReader gReader; - Schema readSchema; + Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); + Schema readerSchema; + if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { - readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); - gWriter = new GenericDatumWriter<>(readSchema); - gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields()); + readerSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); } else { - gReader = null; - gWriter = null; - readSchema = mergeHandle.getWriterSchemaWithMetaFields(); + readerSchema = writerSchema; } BoundedInMemoryExecutor wrapper = null; - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); + HoodieAvroFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); try { final Iterator readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { - readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); + readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readerSchema, externalSchemaTransformation); } else { - readerIterator = reader.getRecordIterator(readSchema); + readerIterator = reader.getRecordIterator(readerSchema); } - ThreadLocal encoderCache = new ThreadLocal<>(); - ThreadLocal decoderCache = new ThreadLocal<>(); wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), Option.of(new UpdateHandler(mergeHandle)), record -> { if (!externalSchemaTransformation) { return record; } - return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); + return transformRecordBasedOnNewSchema((GenericRecord) record, writerSchema); }); wrapper.execute(); } catch (Exception e) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 282cc28a39311..c804d409950de 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -51,7 +51,7 @@ import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner; import org.apache.hudi.io.IOUtils; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; @@ -219,7 +219,7 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js .withPartition(clusteringOp.getPartitionPath()) .build(); - Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) + Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))); HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); @@ -244,11 +244,11 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List clusteringOps) { return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { - List> iteratorsForPartition = new ArrayList<>(); + List> iteratorsForPartition = new ArrayList<>(); clusteringOpsPartition.forEachRemaining(clusteringOp -> { try { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + HoodieAvroFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index e4941848b5151..c41cb353830f3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -18,6 +18,11 @@ package org.apache.hudi.client.clustering.run.strategy; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.WriteStatus; @@ -46,12 +51,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -149,7 +148,7 @@ private Iterator> readRecordsForGroupBaseFiles(List>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); - Iterable indexedRecords = () -> { + Iterable indexedRecords = () -> { try { return HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema); } catch (IOException e) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java index 80615451374b6..abc0f94bd20c9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java @@ -18,6 +18,9 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; @@ -27,17 +30,9 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; import java.io.IOException; @@ -65,37 +60,31 @@ public void runMerge(HoodieTable>, JavaRDD HoodieMergeHandle>, JavaRDD, JavaRDD> mergeHandle = upsertHandle; HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); - final GenericDatumWriter gWriter; - final GenericDatumReader gReader; - Schema readSchema; + Schema readerSchema; + Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); + if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { - readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); - gWriter = new GenericDatumWriter<>(readSchema); - gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields()); + readerSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); } else { - gReader = null; - gWriter = null; - readSchema = mergeHandle.getWriterSchemaWithMetaFields(); + readerSchema = writerSchema; } BoundedInMemoryExecutor wrapper = null; - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); + HoodieAvroFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); try { final Iterator readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { - readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); + readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readerSchema, externalSchemaTransformation); } else { - readerIterator = reader.getRecordIterator(readSchema); + readerIterator = reader.getRecordIterator(readerSchema); } - ThreadLocal encoderCache = new ThreadLocal<>(); - ThreadLocal decoderCache = new ThreadLocal<>(); wrapper = new SparkBoundedInMemoryExecutor(table.getConfig(), readerIterator, new UpdateHandler(mergeHandle), record -> { if (!externalSchemaTransformation) { return record; } - return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); + return transformRecordBasedOnNewSchema((GenericRecord) record, writerSchema); }); wrapper.execute(); } catch (Exception e) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index f175dc5dea021..65d3b4c752590 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,6 +18,15 @@ package org.apache.hudi.client.functional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Time; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.client.SparkRDDWriteClient; @@ -73,7 +82,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReader; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader; @@ -88,16 +97,6 @@ import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.testutils.MetadataMergeWriteStatus; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -446,7 +445,7 @@ public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws Except table.getHoodieView().sync(); List fileSlices = table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList()); HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); - HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()), + HoodieAvroHFileReader hoodieHFileReader = new HoodieAvroHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()), new CacheConfig(context.getHadoopConf().get())); List> records = hoodieHFileReader.readAllRecords(); records.forEach(entry -> { @@ -774,7 +773,7 @@ private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable tabl } final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); - HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), + HoodieAvroHFileReader hoodieHFileReader = new HoodieAvroHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()), new CacheConfig(context.getHadoopConf().get())); List> records = hoodieHFileReader.readAllRecords(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 48c2e19b4aff7..feafd710c46c2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -21,6 +21,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.util.Pair; @@ -42,7 +43,7 @@ import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReader; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader; import org.apache.hudi.metadata.HoodieMetadataPayload; @@ -50,8 +51,6 @@ import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; - -import org.apache.hadoop.fs.FileStatus; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -357,7 +356,7 @@ private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable tabl } final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); - HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), + HoodieAvroHFileReader hoodieHFileReader = new HoodieAvroHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()), new CacheConfig(context.getHadoopConf().get())); List> records = hoodieHFileReader.readAllRecords(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java similarity index 79% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java index b7f34ab2b24d8..07738c97dce8a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java @@ -18,15 +18,13 @@ package org.apache.hudi.io.storage; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestBase; - -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -37,7 +35,7 @@ /** * Tests for {@link HoodieFileWriterFactory}. */ -public class TestHoodieFileWriterFactory extends HoodieClientTestBase { +public class TestHoodieAvroFileWriterFactory extends HoodieClientTestBase { @Test public void testGetFileWriter() throws IOException { @@ -47,26 +45,26 @@ public void testGetFileWriter() throws IOException { final HoodieWriteConfig cfg = getConfig(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); - HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime, + HoodieAvroFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime, parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); - assertTrue(parquetWriter instanceof HoodieParquetWriter); + assertTrue(parquetWriter instanceof HoodieAvroParquetWriter); // hfile format. final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile"); - HoodieFileWriter hfileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, + HoodieAvroFileWriter hfileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, hfilePath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); - assertTrue(hfileWriter instanceof HoodieHFileWriter); + assertTrue(hfileWriter instanceof HoodieAvroHFileWriter); // orc file format. final Path orcPath = new Path(basePath + "/partition/path/f1_1-0-1_000.orc"); - HoodieFileWriter orcFileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, + HoodieAvroFileWriter orcFileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, orcPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); - assertTrue(orcFileWriter instanceof HoodieOrcWriter); + assertTrue(orcFileWriter instanceof HoodieAvroOrcWriter); // other file format exception. final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> { - HoodieFileWriter logWriter = HoodieFileWriterFactory.getFileWriter(instantTime, logPath, + HoodieAvroFileWriter logWriter = HoodieFileWriterFactory.getFileWriter(instantTime, logPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); }, "should fail since log storage writer is not supported yet."); assertTrue(thrown.getMessage().contains("format not supported yet.")); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieFileWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieFileWriter.java new file mode 100644 index 0000000000000..6a202471f3cca --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieFileWriter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.io.storage; + +import org.apache.avro.Schema; +import org.apache.hudi.common.model.HoodieRecord; + +import java.io.IOException; +import java.util.Properties; + +public interface HoodieFileWriter { + + // TODO rename + void writeWithMetadata(HoodieRecord record, Schema schema, Properties props) throws IOException; + + // TODO rename + void write(HoodieRecord record, Schema schema, Properties props) throws IOException; + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieRecordFileWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieRecordFileWriter.java new file mode 100644 index 0000000000000..aec2a50613870 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieRecordFileWriter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.io.storage; + +import org.apache.hudi.common.model.HoodieRecord; + +import java.io.IOException; + +public interface HoodieRecordFileWriter extends HoodieFileWriter { + + boolean canWrite(); + + void writeWithMetadata(R newRecord, HoodieRecord record) throws IOException; + + void write(String key, R oldRecord) throws IOException; + + void close() throws IOException; + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index 9a9bbb2b7427f..d7120a87610ae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -19,6 +19,23 @@ package org.apache.hudi.common.model; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.io.storage.HoodieFileWriter; +import org.apache.hudi.common.io.storage.HoodieRecordFileWriter; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +import static org.apache.hudi.TypeUtils.unsafeCast; + public class HoodieAvroRecord extends HoodieRecord { public HoodieAvroRecord(HoodieKey key, T data) { super(key, data); @@ -47,4 +64,104 @@ public T getData() { } return data; } + + @Override + public void writeWithMetadata(HoodieFileWriter writer, Schema schema, Properties props) throws IOException { + HoodieRecordFileWriter avroWriter = unsafeCast(writer); + IndexedRecord avroPayload = (IndexedRecord) getData().getInsertValue(schema, props).get(); + + avroWriter.writeWithMetadata(avroPayload, this); + } + + @Override + public void write(HoodieFileWriter writer, Schema schema, Properties props) throws IOException { + HoodieRecordFileWriter avroWriter = unsafeCast(writer); + IndexedRecord avroPayload = (IndexedRecord) getData().getInsertValue(schema, props).get(); + + avroWriter.write(getRecordKey(), avroPayload); + } + + ////////////////////////////////////////////////////////////////////////////// + + // + // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here + // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` + // is complete + // + // TODO cleanup + + // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other + // object, and may not create a new one + @Override + public HoodieRecord preCombine(HoodieRecord previousRecord) { + T picked = unsafeCast(getData().preCombine(previousRecord.getData())); + return picked.equals(getData()) ? this : previousRecord; + } + + // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could + // be combined + @Override + public Option> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException { + ValidationUtils.checkState(Objects.equals(getKey(), previousRecord.getKey())); + + Option previousRecordAvroPayload = previousRecord.getData().getInsertValue(schema, props); + if (!previousRecordAvroPayload.isPresent()) { + return Option.empty(); + } + + return getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), schema, props) + .map(combinedAvroPayload -> { + // NOTE: It's assumed that records aren't precombined more than once in its lifecycle, + // therefore we simply stub out precombine value here + int newPreCombineVal = 0; + T combinedPayload = instantiateRecordPayloadWrapper(combinedAvroPayload, newPreCombineVal); + return new HoodieAvroRecord<>(getKey(), combinedPayload, getOperation()); + }); + } + + @Override + public HoodieRecord rewriteRecord(Schema schema) throws IOException { + Option avroRecordPayloadOpt = getData().getInsertValue(schema); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload((GenericRecord) avroRecordPayloadOpt.get()), getOperation()); + } + + @Override + public HoodieRecord overrideMetadataFieldValue(HoodieMetadataField metadataField, String value) throws IOException { + // NOTE: RewriteAvroPayload is expected here + Option avroPayloadOpt = getData().getInsertValue(null); + IndexedRecord avroPayload = avroPayloadOpt.get(); + + avroPayload.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(metadataField.getFieldName()), value); + + return new HoodieAvroRecord(getKey(), new RewriteAvroPayload((GenericRecord) avroPayload), getOperation()); + } + + public Option> getMetadata() { + return getData().getMetadata(); + } + + @Override + public boolean canBeIgnored() { + return getData().canBeIgnored(); + } + + @Nonnull + private T instantiateRecordPayloadWrapper(Object combinedAvroPayload, Comparable newPreCombineVal) { + return unsafeCast( + ReflectionUtils.loadPayload( + getData().getClass().getCanonicalName(), + new Object[]{combinedAvroPayload, newPreCombineVal}, + GenericRecord.class, + Comparable.class)); + } + + private static Comparable getPrecombineValue(T data) { + if (data instanceof BaseAvroPayload) { + return ((BaseAvroPayload) data).orderingVal; + } + + return -1; + } + + ////////////////////////////////////////////////////////////////////////////// } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index f90448e7d4f08..49401de4b6f9e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -18,14 +18,19 @@ package org.apache.hudi.common.model; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.io.storage.HoodieFileWriter; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import java.io.IOException; import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -34,12 +39,33 @@ */ public abstract class HoodieRecord implements Serializable { - public static final String COMMIT_TIME_METADATA_FIELD = "_hoodie_commit_time"; - public static final String COMMIT_SEQNO_METADATA_FIELD = "_hoodie_commit_seqno"; - public static final String RECORD_KEY_METADATA_FIELD = "_hoodie_record_key"; - public static final String PARTITION_PATH_METADATA_FIELD = "_hoodie_partition_path"; - public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name"; - public static final String OPERATION_METADATA_FIELD = "_hoodie_operation"; + public static final String COMMIT_TIME_METADATA_FIELD = HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName(); + public static final String COMMIT_SEQNO_METADATA_FIELD = HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.getFieldName(); + public static final String RECORD_KEY_METADATA_FIELD = HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName(); + public static final String PARTITION_PATH_METADATA_FIELD = HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName(); + public static final String FILENAME_METADATA_FIELD = HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(); + public static final String OPERATION_METADATA_FIELD = HoodieMetadataField.OPERATION_METADATA_FIELD.getFieldName(); + + public enum HoodieMetadataField { + COMMIT_TIME_METADATA_FIELD("_hoodie_commit_time"), + COMMIT_SEQNO_METADATA_FIELD("_hoodie_commit_seqno"), + RECORD_KEY_METADATA_FIELD("_hoodie_record_key"), + PARTITION_PATH_METADATA_FIELD("_hoodie_partition_path"), + FILENAME_METADATA_FIELD("_hoodie_file_name"), + OPERATION_METADATA_FIELD("_hoodie_operation"); + + private final String fieldName; + + HoodieMetadataField(String fieldName) { + this.fieldName = fieldName; + } + + public String getFieldName() { + return fieldName; + } + } + + public static final EmptyRecord SENTINEL = new EmptyRecord(); public static final List HOODIE_META_COLUMNS = CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, @@ -195,10 +221,6 @@ public String toString() { return sb.toString(); } - public static String generateSequenceId(String instantTime, int partitionId, long recordIndex) { - return instantTime + "_" + partitionId + "_" + recordIndex; - } - public String getPartitionPath() { assert key != null; return key.getPartitionPath(); @@ -222,4 +244,64 @@ public void checkState() { throw new UnsupportedOperationException("Not allowed to modify after sealed"); } } + + public abstract void writeWithMetadata(HoodieFileWriter writer, Schema schema, Properties props) throws IOException; + + public abstract void write(HoodieFileWriter writer, Schema schema, Properties props) throws IOException; + + ////////////////////////////////////////////////////////////////////////////// + + // + // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here + // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` + // is complete + // + // TODO cleanup + + // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other + // object, and may not create a new one + public abstract HoodieRecord preCombine(HoodieRecord previousRecord); + + // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could + // be combined + public abstract Option> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException; + + public abstract HoodieRecord rewriteRecord(Schema schema) throws IOException; + + public abstract HoodieRecord overrideMetadataFieldValue(HoodieMetadataField metadataField, String value) throws IOException; + + public abstract Option> getMetadata(); + + public abstract boolean canBeIgnored(); + + ////////////////////////////////////////////////////////////////////////////// + + public static String generateSequenceId(String instantTime, int partitionId, long recordIndex) { + return instantTime + "_" + partitionId + "_" + recordIndex; + } + + private static class EmptyRecord implements GenericRecord { + private EmptyRecord() {} + + @Override + public void put(int i, Object v) {} + + @Override + public Object get(int i) { + return null; + } + + @Override + public Schema getSchema() { + return null; + } + + @Override + public void put(String key, Object v) {} + + @Override + public Object get(String key) { + return null; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index 36dd30b659dbf..500d595ffb86c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -18,14 +18,13 @@ package org.apache.hudi.common.model; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.PublicAPIClass; import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.common.util.Option; -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; - import java.io.IOException; import java.io.Serializable; import java.util.Map; @@ -114,4 +113,8 @@ default Option getInsertValue(Schema schema, Properties propertie default Option> getMetadata() { return Option.empty(); } + + default boolean canBeIgnored() { + return false; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/SkipHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/SkipHoodieRecordPayload.java new file mode 100644 index 0000000000000..1b2f5fb951659 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/SkipHoodieRecordPayload.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.util.Option; + +/** + * Stubbed implementation of {@link HoodieRecordPayload} used to signal that it should simply be + * skipped + */ +public class SkipHoodieRecordPayload implements HoodieRecordPayload { + + public SkipHoodieRecordPayload() { + } + + public SkipHoodieRecordPayload(GenericRecord record, Comparable orderingVal) { + } + + @Override + public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload oldValue) { + return oldValue; + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) { + return Option.empty(); + } + + @Override + public Option getInsertValue(Schema schema) { + return Option.empty(); + } + + @Override + public boolean canBeIgnored() { + return true; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java index a786e8305bc27..445877cc3be28 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -18,16 +18,15 @@ package org.apache.hudi.common.table.log; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieFileReader; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import java.io.IOException; import java.util.Iterator; @@ -40,7 +39,7 @@ public class HoodieFileSliceReader implements Ite private final Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 9f11e68ddc36d..9cb0fd45a4220 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -49,6 +49,7 @@ import javax.annotation.Nullable; import java.io.EOFException; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -195,7 +196,9 @@ private HoodieLogBlock readBlock() throws IOException { switch (Objects.requireNonNull(blockType)) { case AVRO_DATA_BLOCK: if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { - return HoodieAvroDataBlock.getBlock(content.get(), readerSchema); + // TODO cleanup + // return HoodieAvroDataBlock.getBlock(content.get(), readerSchema); + throw new UnsupportedEncodingException("not supported"); } else { return new HoodieAvroDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc, Option.ofNullable(readerSchema), header, footer, keyField); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 8977134740f3c..08cd917cfd1ec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -38,8 +39,8 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroHFileReader; import org.apache.hudi.io.storage.HoodieHBaseKVComparator; -import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -74,14 +75,14 @@ public HoodieHFileDataBlock(FSDataInputStream inputStream, Map header, Map footer, boolean enablePointLookups) { - super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, HoodieHFileReader.KEY_FIELD_NAME, enablePointLookups); + super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, HoodieAvroHFileReader.KEY_FIELD_NAME, enablePointLookups); this.compressionAlgorithm = Option.empty(); } public HoodieHFileDataBlock(List records, Map header, Compression.Algorithm compressionAlgorithm) { - super(records, header, new HashMap<>(), HoodieHFileReader.KEY_FIELD_NAME); + super(records, header, new HashMap<>(), HoodieAvroHFileReader.KEY_FIELD_NAME); this.compressionAlgorithm = Option.of(compressionAlgorithm); } @@ -155,7 +156,7 @@ protected List deserializeRecords(byte[] content) throws IOExcept Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); // Read the content - HoodieHFileReader reader = new HoodieHFileReader<>(content); + HoodieAvroHFileReader reader = new HoodieAvroHFileReader(content); List> records = reader.readAllRecords(writerSchema, readerSchema); return records.stream().map(Pair::getSecond).collect(Collectors.toList()); @@ -180,10 +181,10 @@ protected List lookupRecords(List keys) throws IOExceptio // HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks. Collections.sort(keys); - try (HoodieHFileReader reader = - new HoodieHFileReader<>(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) { + try (HoodieAvroHFileReader reader = + new HoodieAvroHFileReader(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) { // Get writer's schema from the header - List> logRecords = reader.readRecords(keys, readerSchema); + List> logRecords = reader.readRecords(keys, readerSchema); return logRecords.stream().map(Pair::getSecond).collect(Collectors.toList()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java similarity index 67% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java index cb330b81432bf..8c5f3755cd008 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java @@ -18,40 +18,40 @@ package org.apache.hudi.io.storage; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.util.Option; + import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.util.Option; - -public interface HoodieFileReader extends AutoCloseable { +public interface HoodieAvroFileReader extends AutoCloseable { - public String[] readMinMaxRecordKeys(); + String[] readMinMaxRecordKeys(); - public BloomFilter readBloomFilter(); + BloomFilter readBloomFilter(); - public Set filterRowKeys(Set candidateRowKeys); + Set filterRowKeys(Set candidateRowKeys); - default Map getRecordsByKeys(List rowKeys) throws IOException { + default Map getRecordsByKeys(List rowKeys) throws IOException { throw new UnsupportedOperationException(); } - public Iterator getRecordIterator(Schema readerSchema) throws IOException; + Iterator getRecordIterator(Schema readerSchema) throws IOException; - default Iterator getRecordIterator() throws IOException { + default Iterator getRecordIterator() throws IOException { return getRecordIterator(getSchema()); } - default Option getRecordByKey(String key, Schema readerSchema) throws IOException { + default Option getRecordByKey(String key, Schema readerSchema) throws IOException { throw new UnsupportedOperationException(); } - default Option getRecordByKey(String key) throws IOException { + default Option getRecordByKey(String key) throws IOException { return getRecordByKey(key, getSchema()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java similarity index 82% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java index 3404d2bd55b55..6349118bf0234 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java @@ -18,20 +18,8 @@ package org.apache.hudi.io.storage; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.stream.Collectors; - import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -58,8 +46,21 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -public class HoodieHFileReader implements HoodieFileReader { - private static final Logger LOG = LogManager.getLogger(HoodieHFileReader.class); +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +public class HoodieAvroHFileReader implements HoodieAvroFileReader { + private static final Logger LOG = LogManager.getLogger(HoodieAvroHFileReader.class); private Path path; private Configuration conf; private HFile.Reader reader; @@ -76,20 +77,20 @@ public class HoodieHFileReader implements HoodieFileRea public static final String KEY_MIN_RECORD = "minRecordKey"; public static final String KEY_MAX_RECORD = "maxRecordKey"; - public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig) throws IOException { + public HoodieAvroHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig) throws IOException { this.conf = configuration; this.path = path; this.reader = HFile.createReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf); } - public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException { + public HoodieAvroHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException { this.conf = configuration; this.path = path; this.fsDataInputStream = fs.open(path); this.reader = HFile.createReader(fs, path, cacheConfig, configuration); } - public HoodieHFileReader(byte[] content) throws IOException { + public HoodieAvroHFileReader(byte[] content) throws IOException { Configuration conf = new Configuration(); Path path = new Path("hoodie"); SeekableByteArrayInputStream bis = new SeekableByteArrayInputStream(content); @@ -159,7 +160,7 @@ public Set filterRowKeys(Set candidateRowKeys) { } @Override - public Map getRecordsByKeys(List rowKeys) throws IOException { + public Map getRecordsByKeys(List rowKeys) throws IOException { return filterRecordsImpl(new TreeSet<>(rowKeys)); } @@ -173,10 +174,10 @@ public Map getRecordsByKeys(List rowKeys) throws IOException * @return Map of keys to fetched records * @throws IOException When the deserialization of records fail */ - private synchronized Map filterRecordsImpl(TreeSet sortedCandidateRowKeys) throws IOException { - HashMap filteredRecords = new HashMap<>(); + private synchronized Map filterRecordsImpl(TreeSet sortedCandidateRowKeys) throws IOException { + HashMap filteredRecords = new HashMap<>(); for (String key : sortedCandidateRowKeys) { - Option record = getRecordByKey(key); + Option record = getRecordByKey(key); if (record.isPresent()) { filteredRecords.put(key, record.get()); } @@ -184,15 +185,15 @@ private synchronized Map filterRecordsImpl(TreeSet sortedCand return filteredRecords; } - public List> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException { + public List> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException { final Option keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); - List> recordList = new LinkedList<>(); + List> recordList = new LinkedList<>(); try { final HFileScanner scanner = reader.getScanner(false, false); if (scanner.seekTo()) { do { Cell c = scanner.getKeyValue(); - final Pair keyAndRecordPair = getRecordFromCell(c, writerSchema, readerSchema, keyFieldSchema); + final Pair keyAndRecordPair = getRecordFromCell(c, writerSchema, readerSchema, keyFieldSchema); recordList.add(keyAndRecordPair); } while (scanner.next()); } @@ -203,23 +204,23 @@ public List> readAllRecords(Schema writerSchema, Schema readerSc } } - public List> readAllRecords() throws IOException { + public List> readAllRecords() throws IOException { Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes()))); return readAllRecords(schema, schema); } - public List> readRecords(List keys) throws IOException { + public List> readRecords(List keys) throws IOException { reader.loadFileInfo(); Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes()))); return readRecords(keys, schema); } - public List> readRecords(List keys, Schema schema) throws IOException { + public List> readRecords(List keys, Schema schema) throws IOException { this.schema = schema; reader.loadFileInfo(); - List> records = new ArrayList<>(); + List> records = new ArrayList<>(); for (String key: keys) { - Option value = getRecordByKey(key, schema); + Option value = getRecordByKey(key, schema); if (value.isPresent()) { records.add(new Pair(key, value.get())); } @@ -233,8 +234,8 @@ public Iterator getRecordIterator(Schema readerSchema) throws IOException { final Option keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); ValidationUtils.checkState(keyFieldSchema != null, "Missing key field '" + KEY_FIELD_NAME + "' in the schema!"); - return new Iterator() { - private R next = null; + return new Iterator() { + private IndexedRecord next = null; private boolean eof = false; @Override @@ -243,7 +244,7 @@ public boolean hasNext() { // To handle when hasNext() is called multiple times for idempotency and/or the first time if (this.next == null && !this.eof) { if (!scanner.isSeeked() && scanner.seekTo()) { - final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema); + final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema); this.next = keyAndRecordPair.getSecond(); } } @@ -254,7 +255,7 @@ public boolean hasNext() { } @Override - public R next() { + public IndexedRecord next() { try { // To handle case when next() is called before hasNext() if (this.next == null) { @@ -262,9 +263,9 @@ public R next() { throw new HoodieIOException("No more records left to read from hfile"); } } - R retVal = this.next; + IndexedRecord retVal = this.next; if (scanner.next()) { - final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema); + final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema); this.next = keyAndRecordPair.getSecond(); } else { this.next = null; @@ -311,17 +312,17 @@ public Option getRecordByKey(String key, Schema readerSchema) throws IOException } if (value != null) { - R record = deserialize(key.getBytes(), value, getSchema(), readerSchema, keyFieldSchema); + IndexedRecord record = deserialize(key.getBytes(), value, getSchema(), readerSchema, keyFieldSchema); return Option.of(record); } return Option.empty(); } - private Pair getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema, Option keyFieldSchema) throws IOException { + private Pair getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema, Option keyFieldSchema) throws IOException { final byte[] keyBytes = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength()); final byte[] valueBytes = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength()); - R record = deserialize(keyBytes, valueBytes, writerSchema, readerSchema, keyFieldSchema); + IndexedRecord record = deserialize(keyBytes, valueBytes, writerSchema, readerSchema, keyFieldSchema); return new Pair<>(new String(keyBytes), record); } @@ -335,9 +336,9 @@ private Pair getRecordFromCell(Cell cell, Schema writerSchema, Schema * @param keyFieldSchema - Key field id in the schema * @return Deserialized record object */ - private R deserialize(final byte[] keyBytes, final byte[] valueBytes, Schema writerSchema, Schema readerSchema, + private IndexedRecord deserialize(final byte[] keyBytes, final byte[] valueBytes, Schema writerSchema, Schema readerSchema, Option keyFieldSchema) throws IOException { - R record = (R) HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema); + IndexedRecord record = (IndexedRecord) HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema); materializeRecordIfNeeded(keyBytes, record, keyFieldSchema); return record; } @@ -349,7 +350,7 @@ private R deserialize(final byte[] keyBytes, final byte[] valueBytes, Schema wri * @param record - Record object to materialize * @param keyFieldSchema - Key field id in the schema */ - private void materializeRecordIfNeeded(final byte[] keyBytes, R record, Option keyFieldSchema) { + private void materializeRecordIfNeeded(final byte[] keyBytes, IndexedRecord record, Option keyFieldSchema) { if (keyFieldSchema.isPresent()) { final Object keyObject = record.get(keyFieldSchema.get().pos()); if (keyObject != null && keyObject.toString().isEmpty()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java similarity index 91% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java index 319f8d7da1add..e36d88afab75d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java @@ -18,11 +18,8 @@ package org.apache.hudi.io.storage; -import java.io.IOException; -import java.util.Iterator; -import java.util.Set; import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.bloom.BloomFilter; @@ -37,12 +34,16 @@ import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; -public class HoodieOrcReader implements HoodieFileReader { +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; + +public class HoodieAvroOrcReader implements HoodieAvroFileReader { private Path path; private Configuration conf; private final BaseFileUtils orcUtils; - public HoodieOrcReader(Configuration configuration, Path path) { + public HoodieAvroOrcReader(Configuration configuration, Path path) { this.conf = configuration; this.path = path; this.orcUtils = BaseFileUtils.getInstance(HoodieFileFormat.ORC); @@ -64,7 +65,7 @@ public Set filterRowKeys(Set candidateRowKeys) { } @Override - public Iterator getRecordIterator(Schema schema) throws IOException { + public Iterator getRecordIterator(Schema schema) throws IOException { try { Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema); diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java similarity index 85% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java index 9ad07dfafbf60..dd7cac0af5318 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java @@ -18,12 +18,8 @@ package org.apache.hudi.io.storage; -import java.io.IOException; -import java.util.Iterator; -import java.util.Set; - import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.bloom.BloomFilter; @@ -34,12 +30,16 @@ import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; -public class HoodieParquetReader implements HoodieFileReader { +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; + +public class HoodieAvroParquetReader implements HoodieAvroFileReader { private final Path path; private final Configuration conf; private final BaseFileUtils parquetUtils; - public HoodieParquetReader(Configuration configuration, Path path) { + public HoodieAvroParquetReader(Configuration configuration, Path path) { this.conf = configuration; this.path = path; this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); @@ -61,9 +61,9 @@ public Set filterRowKeys(Set candidateRowKeys) { } @Override - public Iterator getRecordIterator(Schema schema) throws IOException { + public Iterator getRecordIterator(Schema schema) throws IOException { AvroReadSupport.setAvroReadSchema(conf, schema); - ParquetReader reader = AvroParquetReader.builder(path).withConf(conf).build(); + ParquetReader reader = AvroParquetReader.builder(path).withConf(conf).build(); return new ParquetReaderIterator<>(reader); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java index f913df7e152a9..d2e35eb450a3e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java @@ -18,22 +18,20 @@ package org.apache.hudi.io.storage; -import org.apache.hudi.common.fs.FSUtils; - -import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hudi.common.fs.FSUtils; import java.io.IOException; +import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; import static org.apache.hudi.common.model.HoodieFileFormat.ORC; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; -import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; public class HoodieFileReaderFactory { - public static HoodieFileReader getFileReader(Configuration conf, Path path) throws IOException { + public static HoodieAvroFileReader getFileReader(Configuration conf, Path path) throws IOException { final String extension = FSUtils.getFileExtension(path.toString()); if (PARQUET.getFileExtension().equals(extension)) { return newParquetFileReader(conf, path); @@ -48,16 +46,16 @@ public static HoodieFileReader getFileReader(Config throw new UnsupportedOperationException(extension + " format not supported yet."); } - private static HoodieFileReader newParquetFileReader(Configuration conf, Path path) { - return new HoodieParquetReader<>(conf, path); + private static HoodieAvroFileReader newParquetFileReader(Configuration conf, Path path) { + return new HoodieAvroParquetReader(conf, path); } - private static HoodieFileReader newHFileFileReader(Configuration conf, Path path) throws IOException { + private static HoodieAvroFileReader newHFileFileReader(Configuration conf, Path path) throws IOException { CacheConfig cacheConfig = new CacheConfig(conf); - return new HoodieHFileReader<>(conf, path, cacheConfig); + return new HoodieAvroHFileReader(conf, path, cacheConfig); } - private static HoodieFileReader newOrcFileReader(Configuration conf, Path path) { - return new HoodieOrcReader<>(conf, path); + private static HoodieAvroFileReader newOrcFileReader(Configuration conf, Path path) { + return new HoodieAvroOrcReader(conf, path); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index fb6ce7d5bb271..89dacb7d52337 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -50,7 +50,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -82,7 +82,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { private final boolean reuse; // Readers for the latest file slice corresponding to file groups in the metadata partition - private Map, Pair> partitionReaders = + private Map, Pair> partitionReaders = new ConcurrentHashMap<>(); public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, @@ -136,11 +136,11 @@ protected List>>> getRec List>>> result = new ArrayList<>(); AtomicInteger fileSlicesKeysCount = new AtomicInteger(); partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> { - Pair readers = openReadersIfNeeded(partitionName, + Pair readers = openReadersIfNeeded(partitionName, partitionFileSlicePair.getRight()); try { List timings = new ArrayList<>(); - HoodieFileReader baseFileReader = readers.getKey(); + HoodieAvroFileReader baseFileReader = readers.getKey(); HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); if (baseFileReader == null && logRecordScanner == null) { @@ -196,7 +196,7 @@ private Map>> readLogRecords( return logRecords; } - private List>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader, + private List>>> readFromBaseAndMergeWithLogRecords(HoodieAvroFileReader baseFileReader, List keys, Map>> logRecords, List timings, String partitionName) throws IOException { List>>> result = new ArrayList<>(); @@ -279,14 +279,14 @@ private Map, List> getPartitionFileSliceToKeysMa * @param slice - The file slice to open readers for * @return File reader and the record scanner pair for the requested file slice */ - private Pair openReadersIfNeeded(String partitionName, FileSlice slice) { + private Pair openReadersIfNeeded(String partitionName, FileSlice slice) { return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> { try { HoodieTimer timer = new HoodieTimer().startTimer(); // Open base file reader - Pair baseFileReaderOpenTimePair = getBaseFileReader(slice, timer); - HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey(); + Pair baseFileReaderOpenTimePair = getBaseFileReader(slice, timer); + HoodieAvroFileReader baseFileReader = baseFileReaderOpenTimePair.getKey(); final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue(); // Open the log record scanner using the log files from the latest file slice @@ -303,8 +303,8 @@ private Pair openReadersI }); } - private Pair getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException { - HoodieFileReader baseFileReader = null; + private Pair getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException { + HoodieAvroFileReader baseFileReader = null; Long baseFileOpenMs; // If the base file is present then create a reader Option basefile = slice.getBaseFile(); @@ -425,7 +425,7 @@ public void close() { * @param partitionFileSlicePair - Partition and FileSlice */ private synchronized void close(Pair partitionFileSlicePair) { - Pair readers = + Pair readers = partitionReaders.remove(partitionFileSlicePair); if (readers != null) { try { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 650e86146639d..18aa2dce23dd9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -18,6 +18,13 @@ package org.apache.hudi.metadata; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieMetadataFileInfo; @@ -35,15 +42,7 @@ import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; -import org.apache.hudi.io.storage.HoodieHFileReader; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hudi.io.storage.HoodieAvroHFileReader; import java.io.IOException; import java.nio.ByteBuffer; @@ -91,7 +90,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload convertMetadataToBloomFilterRecords(HoodieCommi final Path writeFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition); try { - HoodieFileReader fileReader = + HoodieAvroFileReader fileReader = HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), writeFilePath); try { final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); @@ -635,7 +634,7 @@ private static List convertFilesToBloomFilterRecords(HoodieEngineC final String pathWithPartition = partitionName + "/" + appendedFile; final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition); try { - HoodieFileReader fileReader = + HoodieAvroFileReader fileReader = HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), appendedFilePath); final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); if (fileBloomFilter == null) { diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java similarity index 78% rename from hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java rename to hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java index ec334bde1e437..f049033688efb 100644 --- a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java +++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java @@ -18,7 +18,6 @@ package org.apache.hudi.io.storage; -import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; @@ -32,7 +31,7 @@ /** * Tests for {@link HoodieFileReaderFactory}. */ -public class TestHoodieFileReaderFactory { +public class TestHoodieAvroFileReaderFactory { @TempDir public java.nio.file.Path tempDir; @@ -41,19 +40,19 @@ public void testGetFileReader() throws IOException { // parquet file format. final Configuration hadoopConf = new Configuration(); final Path parquetPath = new Path("/partition/path/f1_1-0-1_000.parquet"); - HoodieFileReader parquetReader = HoodieFileReaderFactory.getFileReader(hadoopConf, parquetPath); - assertTrue(parquetReader instanceof HoodieParquetReader); + HoodieAvroFileReader parquetReader = HoodieFileReaderFactory.getFileReader(hadoopConf, parquetPath); + assertTrue(parquetReader instanceof HoodieAvroParquetReader); // log file format. final Path logPath = new Path("/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> { - HoodieFileReader logWriter = HoodieFileReaderFactory.getFileReader(hadoopConf, logPath); + HoodieAvroFileReader logWriter = HoodieFileReaderFactory.getFileReader(hadoopConf, logPath); }, "should fail since log storage reader is not supported yet."); assertTrue(thrown.getMessage().contains("format not supported yet.")); // Orc file format. final Path orcPath = new Path("/partition/path/f1_1-0-1_000.orc"); - HoodieFileReader orcReader = HoodieFileReaderFactory.getFileReader(hadoopConf, orcPath); - assertTrue(orcReader instanceof HoodieOrcReader); + HoodieAvroFileReader orcReader = HoodieFileReaderFactory.getFileReader(hadoopConf, orcPath); + assertTrue(orcReader instanceof HoodieAvroOrcReader); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java index 53ccb7413f9b6..86de6ea28d831 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java @@ -18,9 +18,6 @@ package org.apache.hudi.hadoop; -import java.io.IOException; -import java.util.Iterator; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; @@ -34,20 +31,23 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; -import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReader; + +import java.io.IOException; +import java.util.Iterator; public class HoodieHFileRecordReader implements RecordReader { private long count = 0; private ArrayWritable valueObj; - private HoodieHFileReader reader; + private HoodieAvroHFileReader reader; private Iterator recordIterator; private Schema schema; public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException { FileSplit fileSplit = (FileSplit) split; Path path = fileSplit.getPath(); - reader = new HoodieHFileReader(conf, path, new CacheConfig(conf)); + reader = new HoodieAvroHFileReader(conf, path, new CacheConfig(conf)); schema = reader.getSchema(); valueObj = new ArrayWritable(Writable.class, new Writable[schema.getFields().size()]); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java index e485e72c25755..ae550cb335a56 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java @@ -18,19 +18,20 @@ package org.apache.hudi.hadoop; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.FileSplit; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + public class InputSplitUtils { public static void writeString(String str, DataOutput out) throws IOException { @@ -63,7 +64,7 @@ public static boolean readBoolean(DataInput in) throws IOException { public static Schema getBaseFileSchema(FileSplit split, Configuration conf) { try { if (split instanceof BootstrapBaseFileSplit) { - HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, + HoodieAvroFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, ((BootstrapBaseFileSplit)(split)).getBootstrapFileSplit().getPath()); return HoodieAvroUtils.addMetadataFields(storageReader.getSchema()); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 0aa74ef154334..da62eabf1770a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -18,13 +18,6 @@ package org.apache.hudi.hadoop.utils; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; @@ -32,8 +25,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; @@ -46,6 +39,13 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -68,7 +68,7 @@ public class HoodieRealtimeRecordReaderUtils { */ public static Schema readSchema(Configuration conf, Path filePath) { try { - HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + HoodieAvroFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, filePath); return storageReader.getSchema(); } catch (IOException e) { throw new HoodieIOException("Failed to read schema from " + filePath, e); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 2648740f54e0f..7d17eaf4bbe7b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -18,6 +18,10 @@ package org.apache.hudi.integ.testsuite.reader; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieCommonConfig; @@ -34,19 +38,15 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieMemoryConfig; -import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Tuple2; import java.io.IOException; import java.io.UncheckedIOException; @@ -61,8 +61,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import scala.Tuple2; - import static java.util.Map.Entry.comparingByValue; import static java.util.stream.Collectors.toMap; @@ -264,7 +262,7 @@ private Iterator readColumnarOrLogFiles(FileSlice fileSlice) thro if (fileSlice.getBaseFile().isPresent()) { // Read the base files using the latest writer schema. Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), + HoodieAvroFileReader reader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), new Path(fileSlice.getBaseFile().get().getPath())); return reader.getRecordIterator(schema); } else {