diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java index ccf2332699c5e..2ea57939427f2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java @@ -38,7 +38,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.hadoop.HoodieAvroParquetReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java index 091d1d7195aaf..bff523f7f2149 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; -import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -71,7 +71,7 @@ public void testProperWriting() throws IOException { HoodieParquetConfig parquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf.unwrap(), 0.1, true); + ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf, 0.1, true); StoragePath filePath = new StoragePath(tmpDir.resolve("test.parquet").toAbsolutePath().toString()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index a135a2b22e4aa..b45d80b1ee658 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -39,18 +39,18 @@ import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.io.storage.HoodieAvroOrcWriter; -import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter; +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.orc.CompressionKind; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; @@ -124,7 +124,7 @@ public StoragePath withInserts(String partition, String fileId, List config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, - new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true); + storage.getConf(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true); try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter( new StoragePath(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime, contextSupplier, populateMetaFields)) { @@ -142,7 +142,7 @@ public StoragePath withInserts(String partition, String fileId, List conf, HoodieConfig config, Schema schema) throws IOException { + OutputStream outputStream, StorageConfiguration conf, HoodieConfig config, Schema schema) throws IOException { boolean enableBloomFilter = false; HoodieRowParquetWriteSupport writeSupport = getHoodieRowParquetWriteSupport(conf, schema, config, enableBloomFilter); String compressionCodecName = config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); @@ -83,7 +84,7 @@ protected HoodieFileWriter newParquetFileWriter( writeSupport.getHadoopConf(), config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)); parquetConfig.getHadoopConf().addResource(writeSupport.getHadoopConf()); - return new HoodieSparkParquetStreamWriter(outputStream, parquetConfig); + return new HoodieSparkParquetStreamWriter(new FSDataOutputStream(outputStream, null), parquetConfig); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java index 09f8d8dbe1c44..ba4ab63006d42 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter; import org.apache.hudi.io.storage.row.HoodieRowParquetConfig; import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java index ffad5a895cbbd..8e7287a70246a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java @@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.conf.Configuration; @@ -79,7 +80,7 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(Stora writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), + new HadoopStorageConfiguration(writeSupport.getHadoopConf()), writeConfig.getParquetCompressionRatio(), writeConfig.parquetDictionaryEnabled() )); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java index dcb1f197a04af..f7ad33d2cbb27 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java @@ -18,7 +18,7 @@ package org.apache.hudi.io.storage.row; -import org.apache.hudi.io.storage.HoodieBaseParquetWriter; +import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java index f5f6d7b0a5bb1..f3b0f34b929c7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.storage.row; import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -31,6 +32,11 @@ public class HoodieRowParquetConfig extends HoodieParquetConfig records) throws IOException } Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) { - HoodieFileWriter parquetWriter = null; - HoodieConfig config = new HoodieConfig(); - config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get().name()); - config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); - config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); - config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); - config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get())); - config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get())); - HoodieRecordType recordType = records.iterator().next().getRecordType(); - try { - parquetWriter = HoodieFileWriterFactory.getFileWriter( - HoodieFileFormat.PARQUET, - outputStream, - HoodieStorageUtils.getStorageConf(new Configuration()), - config, - writerSchema, - recordType); - for (HoodieRecord record : records) { - String recordKey = getRecordKey(record).orElse(null); - parquetWriter.write(recordKey, record, writerSchema); - } - outputStream.flush(); - } finally { - if (parquetWriter != null) { - parquetWriter.close(); - } + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + HoodieConfig config = new HoodieConfig(); + config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get().name()); + config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); + config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); + config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); + config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get())); + config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get())); + HoodieRecordType recordType = records.iterator().next().getRecordType(); + try (HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter( + HoodieFileFormat.PARQUET, outputStream, HoodieStorageUtils.getStorageConf(new Configuration()), + config, writerSchema, recordType)) { + for (HoodieRecord record : records) { + String recordKey = getRecordKey(record).orElse(null); + parquetWriter.write(recordKey, record, writerSchema); } + outputStream.flush(); } - - return baos.toByteArray(); + return outputStream.toByteArray(); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 58ec48ea6306f..42f8a6a2753cc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.StoragePath; @@ -266,7 +266,7 @@ public static void loadInstants( .filter(fileName -> filter == null || LSMTimeline.isFileInRange(filter, fileName)) .parallel().forEach(fileName -> { // Read the archived file - try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + try (HoodieAvroFileReader reader = (HoodieAvroFileReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, metaClient.getStorageConf(), new StoragePath(metaClient.getArchivePath(), fileName))) { try (ClosableIterator iterator = reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), readSchema)) { while (iterator.hasNext()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java index a829880d5f948..9b49fa871e225 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java @@ -18,10 +18,32 @@ package org.apache.hudi.io.storage; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; + +import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import java.io.IOException; + +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; + /** - * Marker interface for every {@link HoodieFileReader} reading in Avro (ie - * producing {@link IndexedRecord}s) + * Base class for every Avro file reader */ -public interface HoodieAvroFileReader extends HoodieFileReader {} +public abstract class HoodieAvroFileReader implements HoodieFileReader { + + @Override + public ClosableIterator> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + ClosableIterator iterator = getIndexedRecordIterator(readerSchema, requestedSchema); + return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); + } + + protected ClosableIterator getIndexedRecordIterator(Schema readerSchema) throws IOException { + return getIndexedRecordIterator(readerSchema, readerSchema); + } + + public abstract ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; +} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java deleted file mode 100644 index b15ce11fd530e..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.io.storage; - -import org.apache.hudi.common.model.HoodieAvroIndexedRecord; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.collection.ClosableIterator; -import org.apache.hudi.common.util.collection.CloseableMappingIterator; - -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; - -import java.io.IOException; - -import static org.apache.hudi.common.util.TypeUtils.unsafeCast; - -/** - * Base class for every {@link HoodieAvroFileReader} - */ -abstract class HoodieAvroFileReaderBase implements HoodieAvroFileReader { - - @Override - public ClosableIterator> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { - ClosableIterator iterator = getIndexedRecordIterator(readerSchema, requestedSchema); - return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); - } - - protected ClosableIterator getIndexedRecordIterator(Schema readerSchema) throws IOException { - return getIndexedRecordIterator(readerSchema, readerSchema); - } - - public abstract ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; -} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java index 5e1a260e1589e..dd28d5f558940 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java @@ -38,7 +38,7 @@ import static org.apache.hudi.common.util.CollectionUtils.toStream; import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes; -public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReaderBase +public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReader implements HoodieSeekingFileReader { // TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling public static final String SCHEMA_KEY = "schema"; @@ -54,7 +54,7 @@ public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReader *

* Reads all the records with given schema */ - public static List readAllRecords(HoodieAvroFileReaderBase reader) + public static List readAllRecords(HoodieAvroFileReader reader) throws IOException { Schema schema = reader.getSchema(); return toStream(reader.getIndexedRecordIterator(schema)) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java index fe075ccdc8fff..c285f04a2b2da 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java @@ -46,14 +46,21 @@ public class HoodieFileReaderFactory { public static HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType) { switch (recordType) { case AVRO: - return new HoodieAvroFileReaderFactory(); + + try { + Class clazz = + ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory"); + return (HoodieFileReaderFactory) clazz.newInstance(); + } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) { + throw new HoodieException("Unable to create HoodieAvroFileReaderFactory", e); + } case SPARK: try { Class clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory"); return (HoodieFileReaderFactory) clazz.newInstance(); } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) { - throw new HoodieException("Unable to create hoodie spark file writer factory", e); + throw new HoodieException("Unable to create HoodieSparkFileReaderFactory", e); } default: throw new UnsupportedOperationException(recordType + " record type not supported yet."); diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index d57dd55fcd554..69a8924f50821 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -33,9 +33,9 @@ import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; -import org.apache.hadoop.fs.FSDataOutputStream; import java.io.IOException; +import java.io.OutputStream; import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; import static org.apache.hudi.common.model.HoodieFileFormat.ORC; @@ -46,13 +46,18 @@ public class HoodieFileWriterFactory { private static HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType) { switch (recordType) { case AVRO: - return new HoodieAvroFileWriterFactory(); + try { + Class clazz = ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory"); + return (HoodieFileWriterFactory) clazz.newInstance(); + } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) { + throw new HoodieException("Unable to create HoodieAvroFileWriterFactory", e); + } case SPARK: try { Class clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory"); return (HoodieFileWriterFactory) clazz.newInstance(); } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) { - throw new HoodieException("Unable to create hoodie spark file writer factory", e); + throw new HoodieException("Unable to create HoodieSparkFileWriterFactory", e); } default: throw new UnsupportedOperationException(recordType + " record type not supported yet."); @@ -67,8 +72,8 @@ public static HoodieFileWriter getFileWriter( return factory.getFileWriterByFormat(extension, instantTime, path, conf, config, schema, taskContextSupplier); } - public static HoodieFileWriter getFileWriter(HoodieFileFormat format, - FSDataOutputStream outputStream, StorageConfiguration conf, HoodieConfig config, Schema schema, HoodieRecordType recordType) + public static HoodieFileWriter getFileWriter(HoodieFileFormat format, OutputStream outputStream, + StorageConfiguration conf, HoodieConfig config, Schema schema, HoodieRecordType recordType) throws IOException { HoodieFileWriterFactory factory = getWriterFactory(recordType); return factory.getFileWriterByFormat(format, outputStream, conf, config, schema); @@ -89,8 +94,8 @@ protected HoodieFileWriter getFileWriterByFormat( throw new UnsupportedOperationException(extension + " format not supported yet."); } - protected HoodieFileWriter getFileWriterByFormat(HoodieFileFormat format, - FSDataOutputStream outputStream, StorageConfiguration conf, HoodieConfig config, Schema schema) throws IOException { + protected HoodieFileWriter getFileWriterByFormat(HoodieFileFormat format, OutputStream outputStream, + StorageConfiguration conf, HoodieConfig config, Schema schema) throws IOException { switch (format) { case PARQUET: return newParquetFileWriter(outputStream, conf, config, schema); @@ -106,7 +111,7 @@ protected HoodieFileWriter newParquetFileWriter( } protected HoodieFileWriter newParquetFileWriter( - FSDataOutputStream outputStream, StorageConfiguration conf, HoodieConfig config, Schema schema) throws IOException { + OutputStream outputStream, StorageConfiguration conf, HoodieConfig config, Schema schema) throws IOException { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java index c45e02452e32b..7cac57fa91956 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java @@ -18,23 +18,24 @@ package org.apache.hudi.io.storage; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.storage.StorageConfiguration; + import org.apache.orc.CompressionKind; public class HoodieOrcConfig { - static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema"; + public static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema"; private final CompressionKind compressionKind; private final int stripeSize; private final int blockSize; private final long maxFileSize; - private final Configuration hadoopConf; + private final StorageConfiguration storageConf; private final BloomFilter bloomFilter; - public HoodieOrcConfig(Configuration hadoopConf, CompressionKind compressionKind, int stripeSize, + public HoodieOrcConfig(StorageConfiguration storageConf, CompressionKind compressionKind, int stripeSize, int blockSize, long maxFileSize, BloomFilter bloomFilter) { - this.hadoopConf = hadoopConf; + this.storageConf = storageConf; this.compressionKind = compressionKind; this.stripeSize = stripeSize; this.blockSize = blockSize; @@ -42,8 +43,8 @@ public HoodieOrcConfig(Configuration hadoopConf, CompressionKind compressionKind this.bloomFilter = bloomFilter; } - public Configuration getHadoopConf() { - return hadoopConf; + public StorageConfiguration getStorageConf() { + return storageConf; } public CompressionKind getCompressionKind() { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java index b5e567b7644e1..e17a017d6797c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java @@ -18,7 +18,8 @@ package org.apache.hudi.io.storage; -import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.storage.StorageConfiguration; + import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** @@ -31,18 +32,18 @@ public class HoodieParquetConfig { private final int blockSize; private final int pageSize; private final long maxFileSize; - private final Configuration hadoopConf; + private final StorageConfiguration storageConf; private final double compressionRatio; private final boolean dictionaryEnabled; - public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, - int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) { + public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize, + long maxFileSize, StorageConfiguration storageConf, double compressionRatio, boolean dictionaryEnabled) { this.writeSupport = writeSupport; this.compressionCodecName = compressionCodecName; this.blockSize = blockSize; this.pageSize = pageSize; this.maxFileSize = maxFileSize; - this.hadoopConf = hadoopConf; + this.storageConf = storageConf; this.compressionRatio = compressionRatio; this.dictionaryEnabled = dictionaryEnabled; } @@ -63,8 +64,8 @@ public long getMaxFileSize() { return maxFileSize; } - public Configuration getHadoopConf() { - return hadoopConf; + public StorageConfiguration getStorageConf() { + return storageConf; } public double getCompressionRatio() { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java index 43002a723ef4f..01052d4b00f8b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java @@ -19,14 +19,12 @@ package org.apache.hudi.common.testutils.reader; -import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieReaderConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.LocalTaskContextSupplier; -import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; @@ -35,6 +33,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock; @@ -44,12 +43,13 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.io.storage.HoodieAvroFileWriter; -import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; @@ -57,7 +57,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -78,7 +77,6 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; import static org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE; import static org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT; -import static org.apache.hudi.io.storage.HoodieAvroFileWriterFactory.HOODIE_AVRO_PARQUET_WRITER; public class HoodieFileSliceTestUtils { public static final String FORWARD_SLASH = "/"; @@ -247,36 +245,31 @@ public static HoodieBaseFile createBaseFile( Schema schema, String baseInstantTime ) throws IOException { - Configuration hadoopConf = new Configuration(); + StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConfWithDefaults(); // TODO: Optimize these hard-coded parameters for test purpose. (HUDI-7214) - BloomFilter filter = BloomFilterFactory.createBloomFilter( - 1000, - 0.0001, - 10000, - BloomFilterTypeCode.DYNAMIC_V0.name()); - HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport<>( - new AvroSchemaConverter().convert(schema), - schema, - Option.of(filter), - new Properties()); - HoodieParquetConfig parquetConfig = new HoodieParquetConfig( - writeSupport, - CompressionCodecName.GZIP, - ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, - 1024 * 1024 * 1024, - hadoopConf, - 0.1, - true); - - try (HoodieAvroFileWriter writer = (HoodieAvroFileWriter) ReflectionUtils.loadClass(HOODIE_AVRO_PARQUET_WRITER, - new Class[] {StoragePath.class, HoodieParquetConfig.class, String.class, TaskContextSupplier.class, boolean.class}, - new StoragePath(baseFilePath), - parquetConfig, - baseInstantTime, - new LocalTaskContextSupplier(), - true)) { + HoodieConfig cfg = new HoodieConfig(); + //enable bloom filter + cfg.setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true"); + cfg.setValue(HoodieStorageConfig.PARQUET_WITH_BLOOM_FILTER_ENABLED.key(), "true"); + + //set bloom filter values + cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.key(), String.valueOf(1000)); + cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE.key(), String.valueOf(0.00001)); + cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(), String.valueOf(10000)); + cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(), BloomFilterTypeCode.DYNAMIC_V0.name()); + + //set parquet config values + cfg.setValue(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key(), CompressionCodecName.GZIP.name()); + cfg.setValue(HoodieStorageConfig.PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); + cfg.setValue(HoodieStorageConfig.PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); + cfg.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); + cfg.setValue(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(0.1)); + cfg.setValue(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.key(), "true"); + + try (HoodieAvroFileWriter writer = (HoodieAvroFileWriter) HoodieFileWriterFactory + .getFileWriter(baseInstantTime, new StoragePath(baseFilePath), conf, cfg, + schema, new LocalTaskContextSupplier(), HoodieRecord.HoodieRecordType.AVRO)) { for (IndexedRecord record : records) { writer.writeAvro( (String) record.get(schema.getField(ROW_KEY).pos()), record); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java index b2d7306d6ab4a..6eb6733b04b17 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java @@ -20,11 +20,13 @@ package org.apache.hudi.common.testutils.reader; import org.apache.hudi.avro.model.HoodieDeleteRecord; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroRecordMerger; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.util.ConfigUtils; @@ -32,7 +34,8 @@ import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; @@ -78,7 +81,9 @@ public ClosableIterator getFileRecordIterator( Schema requiredSchema, StorageConfiguration conf ) throws IOException { - HoodieAvroParquetReader reader = new HoodieAvroParquetReader(conf, new StoragePath(filePath.toUri())); + HoodieAvroFileReader reader = (HoodieAvroFileReader) HoodieFileReaderFactory + .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new HoodieConfig(), + conf, filePath, HoodieFileFormat.PARQUET, Option.empty()); return reader.getIndexedRecordIterator(dataSchema, requiredSchema); } diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java index a0ec0dfdb89c5..2fc38c156a366 100644 --- a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java @@ -44,7 +44,7 @@ * Utils for reader and writer tests. */ public class TestHoodieReaderWriterUtils { - static void writeHFileForTesting(String fileLocation, + public static void writeHFileForTesting(String fileLocation, int blockSize, Compression.Algorithm compressionAlgo, int numEntries, diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java similarity index 81% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java index 6a6b0b67aa507..3a4d0b910aba5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java @@ -7,19 +7,25 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader; +import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java similarity index 80% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java index 2a727158e1782..d0b8faa75894e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -27,6 +28,11 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieFileWriter; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.io.storage.HoodieOrcConfig; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -40,18 +46,19 @@ import org.apache.parquet.schema.MessageType; import java.io.IOException; +import java.io.OutputStream; import java.util.Properties; -import static org.apache.hudi.io.storage.HoodieHFileConfig.CACHE_DATA_IN_L1; -import static org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION; -import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR; -import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.CACHE_DATA_IN_L1; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.PREFETCH_ON_OPEN; public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { //hardcoded classes to remove at a later time - public static final String HOODIE_AVRO_PARQUET_WRITER = "org.apache.hudi.io.storage.HoodieAvroParquetWriter"; - public static final String HOODIE_AVRO_HFILE_WRITER = "org.apache.hudi.io.storage.HoodieAvroHFileWriter"; - public static final String HOODIE_AVRO_ORC_WRITER = "org.apache.hudi.io.storage.HoodieAvroOrcWriter"; + public static final String HOODIE_AVRO_PARQUET_WRITER = "org.apache.hudi.io.hadoop.HoodieAvroParquetWriter"; + public static final String HOODIE_AVRO_HFILE_WRITER = "org.apache.hudi.io.hadoop.HoodieAvroHFileWriter"; + public static final String HOODIE_AVRO_ORC_WRITER = "org.apache.hudi.io.hadoop.HoodieAvroOrcWriter"; @Override protected HoodieFileWriter newParquetFileWriter( @@ -70,7 +77,7 @@ protected HoodieFileWriter newParquetFileWriter( config.getIntOrDefault(HoodieStorageConfig.PARQUET_BLOCK_SIZE), config.getIntOrDefault(HoodieStorageConfig.PARQUET_PAGE_SIZE), config.getLongOrDefault(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE), - conf.unwrapAs(Configuration.class), config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), + conf, config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)); try { return (HoodieFileWriter) ReflectionUtils.loadClass(HOODIE_AVRO_PARQUET_WRITER, @@ -83,16 +90,16 @@ protected HoodieFileWriter newParquetFileWriter( } protected HoodieFileWriter newParquetFileWriter( - FSDataOutputStream outputStream, StorageConfiguration conf, HoodieConfig config, Schema schema) throws IOException { + OutputStream outputStream, StorageConfiguration conf, HoodieConfig config, Schema schema) throws IOException { HoodieAvroWriteSupport writeSupport = getHoodieAvroWriteSupport(conf, schema, config, false); HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.fromConf(config.getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME)), config.getInt(HoodieStorageConfig.PARQUET_BLOCK_SIZE), config.getInt(HoodieStorageConfig.PARQUET_PAGE_SIZE), config.getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE), // todo: 1024*1024*1024 - conf.unwrapAs(Configuration.class), config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), + conf, config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), config.getBoolean(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)); - return new HoodieParquetStreamWriter(outputStream, parquetConfig); + return new HoodieParquetStreamWriter(new FSDataOutputStream(outputStream, null), parquetConfig); } protected HoodieFileWriter newHFileFileWriter( @@ -120,7 +127,7 @@ protected HoodieFileWriter newOrcFileWriter( String instantTime, StoragePath path, StorageConfiguration conf, HoodieConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { BloomFilter filter = createBloomFilter(config); - HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf.unwrapAs(Configuration.class), + HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf, CompressionKind.valueOf(config.getString(HoodieStorageConfig.ORC_COMPRESSION_CODEC_NAME)), config.getInt(HoodieStorageConfig.ORC_STRIPE_SIZE), config.getInt(HoodieStorageConfig.ORC_BLOCK_SIZE), diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java similarity index 93% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java index 6de6b24868b59..379df6e97b947 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; @@ -26,6 +27,8 @@ import org.apache.hudi.exception.HoodieDuplicateKeyException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java similarity index 86% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java index 4d90590d953bf..c1f5b79c227f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieFileFormat; @@ -26,6 +27,8 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -47,7 +50,7 @@ /** * {@link HoodieFileReader} implementation for ORC format. */ -public class HoodieAvroOrcReader extends HoodieAvroFileReaderBase { +public class HoodieAvroOrcReader extends HoodieAvroFileReader { private final StoragePath path; private final StorageConfiguration conf; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java similarity index 91% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java index 07e7bc7f12234..40e37fa145fe6 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; @@ -27,6 +28,8 @@ import org.apache.hudi.common.util.AvroOrcUtils; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; @@ -70,7 +73,7 @@ public class HoodieAvroOrcWriter implements HoodieAvroFileWriter, Closeable { public HoodieAvroOrcWriter(String instantTime, StoragePath file, HoodieOrcConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { - Configuration conf = HadoopFSUtils.registerFileSystem(file, config.getHadoopConf()); + Configuration conf = HadoopFSUtils.registerFileSystem(file, config.getStorageConf().unwrapAs(Configuration.class)); this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf); this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf); this.instantTime = instantTime; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java similarity index 93% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java index 66ff6b483e05e..d75660a9a7eef 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; @@ -29,6 +30,8 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -52,7 +55,7 @@ /** * {@link HoodieFileReader} implementation for parquet format. */ -public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase { +public class HoodieAvroParquetReader extends HoodieAvroFileReader { private final StoragePath path; private final StorageConfiguration conf; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java similarity index 84% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java index 4269e6513a284..f8f9a8ccea0f8 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java @@ -7,20 +7,23 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; import org.apache.avro.generic.IndexedRecord; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java similarity index 90% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java index 06f1e513055fa..8f17fa0fa1e19 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java @@ -7,20 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; import org.apache.hadoop.conf.Configuration; @@ -52,8 +54,9 @@ public abstract class HoodieBaseParquetWriter implements Closeable { public HoodieBaseParquetWriter(StoragePath file, HoodieParquetConfig> parquetConfig) throws IOException { + Configuration hadoopConf = parquetConfig.getStorageConf().unwrapAs(Configuration.class); ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder( - HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf())) { + HoodieWrapperFileSystem.convertToHoodiePath(file, hadoopConf)) { @Override protected ParquetWriter.Builder self() { return this; @@ -73,8 +76,8 @@ protected WriteSupport getWriteSupport(Configuration conf) { parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled()); parquetWriterbuilder.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED); parquetWriterbuilder.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION); - parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - handleParquetBloomFilters(parquetWriterbuilder, parquetConfig.getHadoopConf()); + parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file, hadoopConf)); + handleParquetBloomFilters(parquetWriterbuilder, hadoopConf); parquetWriter = parquetWriterbuilder.build(); // We cannot accurately measure the snappy compressed output file size. We are choosing a diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java similarity index 87% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java index 64cc607ef6324..83b659a6be031 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java @@ -7,18 +7,20 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.io.storage.HoodieHBaseKVComparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java similarity index 84% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java index 226266bf6cf97..5fdd6505733f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java @@ -7,19 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile; import org.apache.avro.generic.IndexedRecord; @@ -54,7 +57,7 @@ public HoodieParquetStreamWriter(FSDataOutputStream outputStream, .withDictionaryPageSize(parquetConfig.getPageSize()) .withDictionaryEncoding(parquetConfig.dictionaryEnabled()) .withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION) - .withConf(parquetConfig.getHadoopConf()) + .withConf(parquetConfig.getStorageConf().unwrapAs(Configuration.class)) .build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java similarity index 100% rename from hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java similarity index 83% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java index 96b8ea9e6b3c5..7faf84a1ee53f 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java @@ -7,19 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java similarity index 86% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java index f9909b0f5f24e..82a80b1ce2624 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java @@ -7,28 +7,31 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -83,7 +86,7 @@ public void setCurrentDataSize(long currentDataSize) { public void testCanWrite() throws IOException { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, BloomFilterTypeCode.DYNAMIC_V0.name()); - Configuration hadoopConf = new Configuration(); + StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConfWithDefaults(); Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), @@ -92,7 +95,7 @@ public void testCanWrite() throws IOException { long maxFileSize = 2 * 1024 * 1024; HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, hadoopConf, 0, true); + ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, conf, 0, true); StoragePath filePath = new StoragePath( new StoragePath(tempDir.toUri()), "test_fileSize.parquet"); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java similarity index 93% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java index 48fa1ddc5015b..8c227b88e0f96 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java @@ -7,19 +7,24 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader; +import org.apache.hudi.io.storage.HoodieHFileUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java similarity index 85% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java index 6fe0e2ffea54c..b87af2c8371c1 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java @@ -7,19 +7,23 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java similarity index 98% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java index dcd791956c5c1..3bdd561a28236 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.config.HoodieStorageConfig; @@ -29,6 +29,9 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; @@ -75,7 +78,7 @@ import static org.apache.hudi.io.hfile.TestHFileReader.COMPLEX_SCHEMA_HFILE_SUFFIX; import static org.apache.hudi.io.hfile.TestHFileReader.SIMPLE_SCHEMA_HFILE_SUFFIX; import static org.apache.hudi.io.hfile.TestHFileReader.readHFileFromResources; -import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java similarity index 87% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java index bc719be8bc836..6a94a32ed3c59 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -25,6 +26,10 @@ import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -57,7 +62,7 @@ protected StoragePath getFilePath() { protected HoodieAvroOrcWriter createWriter( Schema avroSchema, boolean populateMetaFields) throws Exception { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); - Configuration conf = new Configuration(); + StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConfWithDefaults(); int orcStripSize = Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue()); int orcBlockSize = Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue()); int maxFileSize = Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue()); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java similarity index 97% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java index 226cf10f97e22..3fd0ad803199d 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; @@ -26,6 +26,11 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala index 4d925d3d4ed0d..791435f4bb7f9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala @@ -24,12 +24,13 @@ import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.util.{BaseFileUtils, Option} -import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig} +import org.apache.hudi.io.storage.HoodieParquetConfig import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath} import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -61,12 +62,12 @@ object SparkHelpers { HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, - conf.unwrap(), + conf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble, HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.defaultValue) // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. - parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) + conf.unwrap().setClassLoader(Thread.currentThread.getContextClassLoader) val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true) for (rec <- sourceRecords) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index d39be52dd22f7..225cab39286c1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -55,7 +55,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.hadoop.HoodieAvroParquetReader; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index 65d140da8b375..95f151336c74c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -29,6 +29,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; import org.apache.hudi.testutils.SparkDatasetTestUtils; @@ -89,7 +90,7 @@ public void testProperWriting(boolean parquetWriteLegacyFormatEnabled) throws Ex HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); + new HadoopStorageConfiguration(writeSupport.getHadoopConf()), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); StoragePath filePath = new StoragePath(basePath + "/internal_row_writer.parquet");