diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index fbfc1d8ec902e..b9f8df5fc2337 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -23,12 +23,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.avro.HoodieAvroWriteSupport import org.apache.hudi.client.SparkTaskContextSupplier -import org.apache.hudi.common.HoodieJsonPayload import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.util.BaseFileUtils import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig} -import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter} +import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig} import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -45,7 +44,7 @@ object SparkHelpers { val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble, HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue); val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter)) - val parquetConfig: HoodieAvroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble) + val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble) // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java index 6f7940d04d0f2..06631dc53fb1c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java @@ -47,11 +47,11 @@ public class HoodieAvroParquetWriter @SuppressWarnings({"unchecked", "rawtypes"}) public HoodieAvroParquetWriter(Path file, - HoodieAvroParquetConfig parquetConfig, + HoodieParquetConfig parquetConfig, String instantTime, TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException { - super(file, (HoodieBaseParquetConfig) parquetConfig); + super(file, (HoodieParquetConfig) parquetConfig); this.fileName = file.getName(); this.writeSupport = parquetConfig.getWriteSupport(); this.instantTime = instantTime; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java index b4aa6de1bd577..e38b41d422a74 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -43,7 +43,7 @@ public abstract class HoodieBaseParquetWriter extends ParquetWriter { private long lastCachedDataSize = -1; public HoodieBaseParquetWriter(Path file, - HoodieBaseParquetConfig> parquetConfig) throws IOException { + HoodieParquetConfig> parquetConfig) throws IOException { super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index ffdff25738ed7..9ee8571ebd066 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -77,7 +77,7 @@ private static HoodieFi Option filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty(); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(conf).convert(schema), schema, filter); - HoodieAvroParquetConfig parquetConfig = new HoodieAvroParquetConfig(writeSupport, config.getParquetCompressionCodec(), + HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, config.getParquetCompressionCodec(), config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(), conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index 6b847d4960fb6..2f00b82772395 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -19,6 +19,12 @@ package org.apache.hudi.testutils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -38,18 +44,11 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieStorageConfig; -import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.io.storage.HoodieOrcWriter; -import org.apache.hudi.io.storage.HoodieAvroParquetWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.orc.CompressionKind; @@ -110,7 +109,7 @@ public Path withInserts(String partition, String fileId, List reco if (HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.PARQUET)) { HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( new AvroSchemaConverter().convert(schema), schema, Option.of(filter)); - HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, + HoodieParquetConfig config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue())); try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>( diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java index 8cd8bc89a7ccb..98d4a866e0ee9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.table.HoodieTable; import org.apache.flink.table.types.logical.RowType; @@ -67,7 +68,7 @@ private static HoodieRowDataFileWriter newParquetInternalRowFileWriter( HoodieRowDataParquetWriteSupport writeSupport = new HoodieRowDataParquetWriteSupport(table.getHadoopConf(), rowType, filter); return new HoodieRowDataParquetWriter( - path, new HoodieRowDataParquetConfig( + path, new HoodieParquetConfig<>( writeSupport, writeConfig.getParquetCompressionCodec(), writeConfig.getParquetBlockSize(), diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java deleted file mode 100644 index 99b72da2218b8..0000000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.io.storage.row; - -import org.apache.hudi.io.storage.HoodieBaseParquetConfig; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; - -/** - * ParquetConfig for datasource implementation with {@link org.apache.flink.table.data.RowData}. - */ -public class HoodieRowDataParquetConfig extends HoodieBaseParquetConfig { - - public HoodieRowDataParquetConfig(HoodieRowDataParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName, - int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, - double compressionRatio) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio); - } -} \ No newline at end of file diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java index 373e6b1f5c60d..7b2a87512d285 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java @@ -23,6 +23,7 @@ import org.apache.flink.table.data.RowData; import org.apache.hadoop.fs.Path; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; @@ -39,7 +40,7 @@ public class HoodieRowDataParquetWriter extends ParquetWriter private final long maxFileSize; private final HoodieRowDataParquetWriteSupport writeSupport; - public HoodieRowDataParquetWriter(Path file, HoodieRowDataParquetConfig parquetConfig) + public HoodieRowDataParquetWriter(Path file, HoodieParquetConfig parquetConfig) throws IOException { super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(), diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java index 8dd19d8883235..eb408f81c12d2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.Path; @@ -68,14 +69,17 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter( HoodieRowParquetWriteSupport writeSupport = new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter, writeConfig); return new HoodieInternalRowParquetWriter( - path, new HoodieRowParquetConfig( + path, + new HoodieParquetConfig<>( writeSupport, writeConfig.getParquetCompressionCodec(), writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), writeSupport.getHadoopConf(), - writeConfig.getParquetCompressionRatio())); + writeConfig.getParquetCompressionRatio(), + writeConfig.parquetDictionaryEnabled() + )); } public static HoodieInternalRowFileWriter getInternalRowFileWriterWithoutMetaFields( @@ -93,13 +97,15 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriterWithou HoodieRowParquetWriteSupport writeSupport = new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null, writeConfig); return new HoodieInternalRowParquetWriter( - path, new HoodieRowParquetConfig( + path, new HoodieParquetConfig<>( writeSupport, writeConfig.getParquetCompressionCodec(), writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), writeSupport.getHadoopConf(), - writeConfig.getParquetCompressionRatio())); + writeConfig.getParquetCompressionRatio(), + writeConfig.parquetDictionaryEnabled()) + ); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java index 5a0a60ea07500..1d11529352c4d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.storage.row; import org.apache.hadoop.fs.Path; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieBaseParquetWriter; import org.apache.spark.sql.catalyst.InternalRow; @@ -32,7 +33,7 @@ public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter parquetConfig) throws IOException { super(file, parquetConfig); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java deleted file mode 100644 index ac187dcdd9434..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.io.storage.row; - -import org.apache.hudi.io.storage.HoodieBaseParquetConfig; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; - -/** - * ParquetConfig for datasource implementation with {@link org.apache.hudi.client.model.HoodieInternalRow}. - */ -public class HoodieRowParquetConfig extends HoodieBaseParquetConfig { - - public HoodieRowParquetConfig(HoodieRowParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName, - int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, - double compressionRatio) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio); - } -} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index dd509a86b4e92..d6c060c6bd3d3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.SparkDatasetTestUtils; @@ -73,9 +74,9 @@ public void endToEndTest(boolean parquetWriteLegacyFormatEnabled) throws Excepti // init write support and parquet config HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled); HoodieWriteConfig cfg = writeConfigBuilder.build(); - HoodieRowParquetConfig parquetConfig = new HoodieRowParquetConfig(writeSupport, + HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio()); + writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); // prepare path String fileId = UUID.randomUUID().toString(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index 5e7bef90a08ba..afb448f844891 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -18,21 +18,20 @@ package org.apache.hudi.common.table.log.block; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.fs.inline.InLineFSUtils; import org.apache.hudi.common.fs.inline.InLineFileSystem; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetReaderIterator; -import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieParquetStreamWriter; - -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroSchemaConverter; @@ -43,7 +42,6 @@ import org.apache.parquet.io.InputFile; import javax.annotation.Nonnull; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; @@ -97,8 +95,8 @@ protected byte[] serializeRecords(List records) throws IOExceptio HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( new AvroSchemaConverter().convert(writerSchema), writerSchema, Option.empty()); - HoodieAvroParquetConfig avroParquetConfig = - new HoodieAvroParquetConfig( + HoodieParquetConfig avroParquetConfig = + new HoodieParquetConfig<>( writeSupport, compressionCodecName.get(), ParquetWriter.DEFAULT_BLOCK_SIZE, diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java deleted file mode 100644 index 1a10e6a716cd6..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.io.storage; - -import org.apache.hudi.avro.HoodieAvroWriteSupport; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; - -/** - * ParquetConfig for writing avro records in Parquet files. - */ -public class HoodieAvroParquetConfig extends HoodieBaseParquetConfig { - - public HoodieAvroParquetConfig(HoodieAvroWriteSupport writeSupport, CompressionCodecName compressionCodecName, - int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, - double compressionRatio) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio); - } - - public HoodieAvroParquetConfig(HoodieAvroWriteSupport writeSupport, CompressionCodecName compressionCodecName, - int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, - double compressionRatio, boolean directoryEnabled) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, directoryEnabled); - } -} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java similarity index 82% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java index 6db1de012c240..77fea6beee520 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java @@ -25,7 +25,7 @@ * Base ParquetConfig to hold config params for writing to Parquet. * @param */ -public class HoodieBaseParquetConfig { +public class HoodieParquetConfig { private final T writeSupport; private final CompressionCodecName compressionCodecName; private final int blockSize; @@ -35,13 +35,13 @@ public class HoodieBaseParquetConfig { private final double compressionRatio; private final boolean dictionaryEnabled; - public HoodieBaseParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, - int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio) { + public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, + int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio) { this(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, false); } - public HoodieBaseParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, - int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) { + public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, + int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) { this.writeSupport = writeSupport; this.compressionCodecName = compressionCodecName; this.blockSize = blockSize; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java index a2736018242b6..c8f78c3501158 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java @@ -38,7 +38,7 @@ public class HoodieParquetStreamWriter implements AutoC private final HoodieAvroWriteSupport writeSupport; public HoodieParquetStreamWriter(FSDataOutputStream outputStream, - HoodieAvroParquetConfig parquetConfig) throws IOException { + HoodieParquetConfig parquetConfig) throws IOException { this.writeSupport = parquetConfig.getWriteSupport(); this.writer = new Builder(new OutputStreamBackedOutputFile(outputStream), writeSupport) .withWriteMode(ParquetFileWriter.Mode.CREATE)