diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index a33383a05c02d..65ecc4d6f842c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -360,7 +360,7 @@ protected void appendDataAndDeleteBlocks(Map header) header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString()); List blocks = new ArrayList<>(2); if (recordList.size() > 0) { - blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header)); + blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockType(), recordList, header)); } if (keysToDelete.size() > 0) { blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 4b582b1d53674..dd6ed26eeb991 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -702,16 +703,21 @@ public HoodieFileFormat getLogFileFormat() { return metaClient.getTableConfig().getLogFileFormat(); } - public HoodieLogBlockType getLogDataBlockFormat() { - switch (getBaseFileFormat()) { - case PARQUET: - case ORC: - return HoodieLogBlockType.AVRO_DATA_BLOCK; - case HFILE: - return HoodieLogBlockType.HFILE_DATA_BLOCK; - default: - throw new HoodieException("Base file format " + getBaseFileFormat() - + " does not have associated log block format"); + public HoodieLogBlockType getLogDataBlockType() { + HoodieLogBlock.HoodieLogBlockType logBlockType = metaClient.getTableConfig().getLogBlockFormat(); + if (logBlockType != null) { + return logBlockType; + } else { + switch (getBaseFileFormat()) { + case PARQUET: + case ORC: + return HoodieLogBlockType.AVRO_DATA_BLOCK; + case HFILE: + return HoodieLogBlockType.HFILE_DATA_BLOCK; + default: + throw new HoodieException("Base file format " + getBaseFileFormat() + + " does not have associated log block type"); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index dc57fd1c6ff8b..1b04295cd7439 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -120,6 +121,11 @@ public class HoodieTableConfig extends HoodieConfig { .withAlternatives("hoodie.table.rt.file.format") .withDocumentation("Log format used for the delta logs."); + public static final ConfigProperty LOG_BLOCK_TYPE = ConfigProperty + .key("hoodie.table.log.block.type") + .noDefaultValue() + .withDocumentation("Log block type used for the delta logs."); + public static final ConfigProperty TIMELINE_LAYOUT_VERSION = ConfigProperty .key("hoodie.timeline.layout.version") .noDefaultValue() @@ -346,6 +352,20 @@ public HoodieFileFormat getLogFileFormat() { return HoodieFileFormat.valueOf(getStringOrDefault(LOG_FILE_FORMAT)); } + /** + * Get the log block Format. + * + * @return HoodieBlockFormat for the log block + */ + public HoodieLogBlock.HoodieLogBlockType getLogBlockFormat() { + String logBlockTypeConfig = getString(LOG_BLOCK_TYPE); + if (logBlockTypeConfig != null) { + return HoodieLogBlock.HoodieLogBlockType.valueOf(getStringOrDefault(LOG_BLOCK_TYPE)); + } else { + return null; + } + } + /** * Get the relative path of archive log folder under metafolder, for this table. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index e2e76ad7d6503..8fcf62346a2ec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; @@ -204,6 +205,7 @@ public void scan(Option> keys) { switch (logBlock.getBlockType()) { case HFILE_DATA_BLOCK: case AVRO_DATA_BLOCK: + case PARQUET_DATA_BLOCK: LOG.info("Reading a data block from file " + logFile.getPath() + " at instant " + logBlock.getLogBlockHeader().get(INSTANT_TIME)); if (isNewInstantBlock(logBlock) && !readBlocksLazily) { @@ -381,6 +383,9 @@ private void processQueuedBlocksForInstant(Deque logBlocks, int case HFILE_DATA_BLOCK: processDataBlock((HoodieHFileDataBlock) lastBlock, keys); break; + case PARQUET_DATA_BLOCK: + processDataBlock((HoodieParquetDataBlock) lastBlock, keys); + break; case DELETE_BLOCK: Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey); break; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index cdf3065587d13..0fee6b65fe802 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; +import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.CorruptedLogFileException; @@ -256,6 +257,9 @@ private HoodieLogBlock readBlock() throws IOException { case HFILE_DATA_BLOCK: return new HoodieHFileDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, contentPosition, contentLength, blockEndPos, readerSchema, header, footer, enableInlineReading); + case PARQUET_DATA_BLOCK: + return new HoodieParquetDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, + contentPosition, contentLength, blockEndPos, readerSchema, header, footer); case DELETE_BLOCK: return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, contentPosition, contentLength, blockEndPos, header, footer); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 2e4338ef785d0..404bdee5c37b3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -79,6 +79,8 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, Lis return new HoodieAvroDataBlock(recordList, header); case HFILE_DATA_BLOCK: return new HoodieHFileDataBlock(recordList, header); + case PARQUET_DATA_BLOCK: + return new HoodieParquetDataBlock(recordList, header); default: throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index 2fbcd992087e2..577240093e770 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -109,7 +109,7 @@ public Option getContent() { * Type of the log block WARNING: This enum is serialized as the ordinal. Only add new enums at the end. */ public enum HoodieLogBlockType { - COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, HFILE_DATA_BLOCK + COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, HFILE_DATA_BLOCK, PARQUET_DATA_BLOCK } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java new file mode 100644 index 0000000000000..f88ae5927d367 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetReader; +import org.apache.hudi.io.storage.HoodieParquetStreamWriter; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import javax.annotation.Nonnull; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * HoodieParquetDataBlock contains a list of records serialized using Parquet. + */ +public class HoodieParquetDataBlock extends HoodieDataBlock { + + public HoodieParquetDataBlock(@Nonnull Map logBlockHeader, + @Nonnull Map logBlockFooter, + @Nonnull Option blockContentLocation, @Nonnull Option content, + FSDataInputStream inputStream, boolean readBlockLazily) { + super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily); + } + + public HoodieParquetDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option content, + boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, + Map header, Map footer) { + super(content, inputStream, readBlockLazily, + Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, + footer); + } + + public HoodieParquetDataBlock(@Nonnull List records, @Nonnull Map header) { + super(records, header, new HashMap<>()); + } + + @Override + public HoodieLogBlockType getBlockType() { + return HoodieLogBlockType.PARQUET_DATA_BLOCK; + } + + @Override + protected byte[] serializeRecords() throws IOException { + // TODO: Need to decide from where to fetch all config values required below. We can't re-use index config as the purpose is different. + // And these are very specific to data blocks. Once we have consensus, we might need to route them to log block constructors ( + // as of now, log block constructors does not take in any configs in general). + BloomFilter filter = BloomFilterFactory.createBloomFilter( + Integer.parseInt("60000"),//HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES.defaultValue()), + Double.parseDouble("0.000000001"),//HoodieIndexConfig.BLOOM_FILTER_FPP.defaultValue()), + Integer.parseInt("100000"),//HoodieIndexConfig.HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue()), + BloomFilterTypeCode.SIMPLE.name());//HoodieIndexConfig.BLOOM_INDEX_FILTER_TYPE.defaultValue()); + + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( + new AvroSchemaConverter().convert(schema), schema, Option.of(filter)); + + HoodieAvroParquetConfig avroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, + ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, + new Configuration(), Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue())); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream outputStream = null; + HoodieParquetStreamWriter parquetWriter = null; + try { + outputStream = new DataOutputStream(baos); + parquetWriter = new HoodieParquetStreamWriter<>(outputStream, avroParquetConfig); + Iterator itr = records.iterator(); + if (records.size() > 0) { + Schema.Field keyField = records.get(0).getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD); + if (keyField == null) { + throw new HoodieIOException("Record key field missing from schema for records to be written to Parquet data block"); + } + while (itr.hasNext()) { + IndexedRecord record = itr.next(); + String recordKey = record.get(keyField.pos()).toString(); + parquetWriter.writeAvro(recordKey, record); + } + outputStream.flush(); + } + } finally { + if (outputStream != null) { + outputStream.close(); + } + if (parquetWriter != null) { + parquetWriter.close(); + } + } + + return baos.toByteArray(); + } + + @Override + public List getRecords() { + try { + records = new ArrayList<>(); + // Get schema from the header + Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + // If readerSchema was not present, use writerSchema + if (schema == null) { + schema = writerSchema; + } + Configuration conf = new Configuration(); + Configuration inlineConf = new Configuration(); + inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName()); + + Path inlinePath = InLineFSUtils.getInlineFilePath( + getBlockContentLocation().get().getLogFile().getPath(), + getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(), + getBlockContentLocation().get().getContentPositionInLogFile(), + getBlockContentLocation().get().getBlockSize()); + + HoodieParquetReader parquetReader = new HoodieParquetReader<>(inlineConf, inlinePath); + Iterator recordIterator = parquetReader.getRecordIterator(schema); + while (recordIterator.hasNext()) { + records.add(recordIterator.next()); + } + return records; + } catch (IOException e) { + throw new HoodieIOException("Reading parquet inlining failed ", e); + } + } + + @Override + protected void deserializeRecords() throws IOException { + throw new IOException("Not implemented"); + } +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java new file mode 100644 index 0000000000000..5de1fe34c7763 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.avro.HoodieAvroWriteSupport; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * HoodieParquetStreamWriter wraps the ParquetWriter to assist in writing to OutputStream. + */ +public class HoodieParquetStreamWriter { + + private final ParquetWriter writer; + private final HoodieAvroWriteSupport writeSupport; + + public HoodieParquetStreamWriter(OutputStream bufferedOutputStream, + HoodieAvroParquetConfig parquetConfig) throws IOException { + writer = new Builder(new ParquetBufferedWriter(bufferedOutputStream), parquetConfig.getWriteSupport()) + .withWriteMode(ParquetFileWriter.Mode.CREATE) + .withCompressionCodec(parquetConfig.getCompressionCodecName()) + .withRowGroupSize(parquetConfig.getBlockSize()) + .withPageSize(parquetConfig.getPageSize()) + .withDictionaryPageSize(parquetConfig.getPageSize()) + .withDictionaryEncoding(parquetConfig.dictionaryEnabled()) + .withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION) + .withConf(parquetConfig.getHadoopConf()).build(); + + this.writeSupport = parquetConfig.getWriteSupport(); + } + + public void writeAvro(String key, R object) throws IOException { + writer.write(object); + writeSupport.add(key); + } + + public void close() throws IOException { + writer.close(); + } + + public long getDataSize() { + return writer.getDataSize(); + } + + // TODO: Need to understand if this is the right way to directly write data to output stream using Parquet writer + public static class ParquetBufferedWriter implements OutputFile { + + private final OutputStream out; + + public ParquetBufferedWriter(OutputStream out) { + this.out = out; + } + + @Override + public PositionOutputStream create(long blockSizeHint) throws IOException { + return createPositionOutputStream(); + } + + private PositionOutputStream createPositionOutputStream() { + return new PositionOutputStream() { + + int pos = 0; + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + pos++; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + pos += len; + } + }; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { + return createPositionOutputStream(); + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 0; + } + } + + private static class Builder extends ParquetWriter.Builder> { + + private final WriteSupport writeSupport; + + private Builder(Path file, WriteSupport writeSupport) { + super(file); + this.writeSupport = writeSupport; + } + + private Builder(OutputFile file, WriteSupport writeSupport) { + super(file); + this.writeSupport = writeSupport; + } + + @Override + protected Builder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return writeSupport; + } + } +} \ No newline at end of file diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 1771db056cfa2..4065685a61d0d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -147,6 +147,12 @@ public void testHFileInlineReader() throws Exception { HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK); } + @Test + public void testParquetInlineReader() throws Exception { + testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false, + HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK); + } + private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean partitioned) throws Exception { diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 13d921979c70a..e41058614b7eb 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock; import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; @@ -323,8 +324,14 @@ public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); - HoodieDataBlock dataBlock = (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) ? new HoodieHFileDataBlock(records, header) : - new HoodieAvroDataBlock(records, header); + HoodieDataBlock dataBlock = null; + if (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) { + dataBlock = new HoodieHFileDataBlock(records, header); + } else if (logBlockType == HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) { + dataBlock = new HoodieParquetDataBlock(records, header); + } else { + dataBlock = new HoodieAvroDataBlock(records, header); + } writer.appendBlock(dataBlock); return writer; }