diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 1315c99406ab6..aebb689b093ad 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -360,7 +360,7 @@ protected void appendDataAndDeleteBlocks(Map header) header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString()); List blocks = new ArrayList<>(2); if (recordList.size() > 0) { - blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header)); + blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockType(), recordList, header)); } if (keysToDelete.size() > 0) { blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index a82a8bccf9bb1..d916d30acff2e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -43,6 +43,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -679,16 +680,21 @@ public HoodieFileFormat getLogFileFormat() { return metaClient.getTableConfig().getLogFileFormat(); } - public HoodieLogBlockType getLogDataBlockFormat() { - switch (getBaseFileFormat()) { - case PARQUET: - case ORC: - return HoodieLogBlockType.AVRO_DATA_BLOCK; - case HFILE: - return HoodieLogBlockType.HFILE_DATA_BLOCK; - default: - throw new HoodieException("Base file format " + getBaseFileFormat() - + " does not have associated log block format"); + public HoodieLogBlockType getLogDataBlockType() { + HoodieLogBlock.HoodieLogBlockType logBlockType = metaClient.getTableConfig().getLogBlockFormat(); + if (logBlockType != null) { + return logBlockType; + } else { + switch (getBaseFileFormat()) { + case PARQUET: + case ORC: + return HoodieLogBlockType.AVRO_DATA_BLOCK; + case HFILE: + return HoodieLogBlockType.HFILE_DATA_BLOCK; + default: + throw new HoodieException("Base file format " + getBaseFileFormat() + + " does not have associated log block format"); + } } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java new file mode 100644 index 0000000000000..11c61bd139c85 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.engine.TaskContextSupplier; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieHFileReaderWriter { + private final Path filePath = new Path(System.getProperty("java.io.tmpdir") + "/f1_1-0-1_000.hfile"); + + @BeforeEach + @AfterEach + public void clearTempFile() { + File file = new File(filePath.toString()); + if (file.exists()) { + file.delete(); + } + } + + private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws Exception { + BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); + Configuration conf = new Configuration(); + TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class); + String instantTime = "000"; + + HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024, + filter); + return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier); + } + + @Test + public void testWriteReadHFile() throws Exception { + Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); + HoodieHFileWriter writer = createHFileWriter(avroSchema); + List keys = new ArrayList<>(); + Map recordMap = new HashMap<>(); + for (int i = 0; i < 100; i++) { + GenericRecord record = new GenericData.Record(avroSchema); + String key = String.format("%s%04d", "key", i); + record.put("_row_key", key); + keys.add(key); + record.put("time", Integer.toString(RANDOM.nextInt())); + record.put("number", i); + writer.writeAvro(key, record); + recordMap.put(key, record); + } + writer.close(); + + Configuration conf = new Configuration(); + CacheConfig cacheConfig = new CacheConfig(conf); + HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); + List> records = hoodieHFileReader.readAllRecords(); + records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()))); + hoodieHFileReader.close(); + + for (int i = 0; i < 20; i++) { + int randomRowstoFetch = 5 + RANDOM.nextInt(50); + Set rowsToFetch = getRandomKeys(randomRowstoFetch, keys); + List rowsList = new ArrayList<>(rowsToFetch); + Collections.sort(rowsList); + hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); + List> result = hoodieHFileReader.readRecordsByKey(rowsList); + assertEquals(result.size(), randomRowstoFetch); + result.forEach(entry -> { + assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())); + }); + hoodieHFileReader.close(); + } + } + + private Set getRandomKeys(int count, List keys) { + Set rowKeys = new HashSet<>(); + int totalKeys = keys.size(); + while (rowKeys.size() < count) { + int index = RANDOM.nextInt(totalKeys); + if (!rowKeys.contains(index)) { + rowKeys.add(keys.get(index)); + } + } + return rowKeys; + } +} \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 468444bec8a45..9ccda382bc496 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -135,6 +135,15 @@ public static List bootstrapAndTableOperationTestArgs() { ); } + public static List tableOperationTestArgs() { + return asList( + Arguments.of(COPY_ON_WRITE, true, false), + Arguments.of(COPY_ON_WRITE, true, true), + Arguments.of(COPY_ON_WRITE, false, true), + Arguments.of(MERGE_ON_READ, true, false) + ); + } + /** * Metadata Table bootstrap scenarios. */ @@ -221,9 +230,9 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep * Test various table operations sync to Metadata Table correctly. */ @ParameterizedTest - @EnumSource(HoodieTableType.class) - public void testTableOperations(HoodieTableType tableType) throws Exception { - init(tableType); + @MethodSource("tableOperationTestArgs") + public void testTableOperations(HoodieTableType tableType, boolean enableFullScan, boolean enableInlineReading) throws Exception { + init(tableType, true, enableFullScan, enableInlineReading); doWriteInsertAndUpsert(testTable); // trigger an upsert diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 85f869f7835b3..34a6e60b96b16 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -72,6 +72,10 @@ public void init(HoodieTableType tableType) throws IOException { } public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException { + init(tableType, enableMetadataTable, true, false); + } + + public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableInineReading) throws IOException { this.tableType = tableType; initPath(); initSparkContexts("TestHoodieMetadata"); @@ -80,7 +84,8 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable) throws initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); - writeConfig = getWriteConfig(true, enableMetadataTable); + writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, false, + enableFullScan, enableInineReading).build(); initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable); } @@ -256,7 +261,13 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, bo return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics); } - protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { + protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, + boolean enableMetrics) { + return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, false); + } + + protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, + boolean enableMetrics, boolean enableFullScan, boolean enableInlineReading) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2) .withAutoCommit(autoCommit) @@ -271,6 +282,8 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(useFileListingMetadata) + .enableFullScan(enableFullScan) + .enableInlineReading(enableInlineReading) .enableMetrics(enableMetrics).build()) .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) .withExecutorMetrics(true).build()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index d085f2c92e763..b8e7b52641ff5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -115,6 +115,18 @@ public final class HoodieMetadataConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Parallelism to use, when listing the table on lake storage."); + public static final ConfigProperty ENABLE_INLINE_READING_LOG_FILES = ConfigProperty + .key(METADATA_PREFIX + ".enable.inline.reading.log.files") + .defaultValue(true) + .sinceVersion("0.10.0") + .withDocumentation("Enable inline reading of Log files"); + + public static final ConfigProperty ENABLE_FULL_SCAN_LOG_FILES = ConfigProperty + .key(METADATA_PREFIX + ".enable.full.scan.log.files") + .defaultValue(true) + .sinceVersion("0.10.0") + .withDocumentation("Enable full scanning of log files while reading log records"); + private HoodieMetadataConfig() { super(); } @@ -143,6 +155,14 @@ public String getDirectoryFilterRegex() { return getString(DIR_FILTER_REGEX); } + public boolean enableFullScan() { + return getBoolean(ENABLE_FULL_SCAN_LOG_FILES); + } + + public boolean enableInlineReading() { + return getBoolean(ENABLE_INLINE_READING_LOG_FILES); + } + public static class Builder { private final HoodieMetadataConfig metadataConfig = new HoodieMetadataConfig(); @@ -210,6 +230,16 @@ public Builder withDirectoryFilterRegex(String regex) { return this; } + public Builder enableFullScan(boolean enableFullScan) { + metadataConfig.setValue(ENABLE_FULL_SCAN_LOG_FILES, String.valueOf(enableFullScan)); + return this; + } + + public Builder enableInlineReading(boolean enableInlineReading) { + metadataConfig.setValue(ENABLE_INLINE_READING_LOG_FILES, String.valueOf(enableInlineReading)); + return this; + } + public HoodieMetadataConfig build() { metadataConfig.setDefaults(HoodieMetadataConfig.class.getName()); return metadataConfig; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 129bccefa81e1..c04fc9a17644d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -119,6 +120,11 @@ public class HoodieTableConfig extends HoodieConfig { .withAlternatives("hoodie.table.rt.file.format") .withDocumentation("Log format used for the delta logs."); + public static final ConfigProperty LOG_BLOCK_TYPE = ConfigProperty + .key("hoodie.table.log.block.type") + .noDefaultValue() + .withDocumentation("Log block type used for the delta logs."); + public static final ConfigProperty TIMELINE_LAYOUT_VERSION = ConfigProperty .key("hoodie.timeline.layout.version") .noDefaultValue() @@ -187,7 +193,6 @@ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName /** * For serializing and de-serializing. - * */ public HoodieTableConfig() { super(); @@ -342,6 +347,20 @@ public HoodieFileFormat getLogFileFormat() { return HoodieFileFormat.valueOf(getStringOrDefault(LOG_FILE_FORMAT)); } + /** + * Get the log block Format. + * + * @return HoodieBlockFormat for the log block + */ + public HoodieLogBlock.HoodieLogBlockType getLogBlockFormat() { + String logBlockTypeConfig = getString(LOG_BLOCK_TYPE); + if (logBlockTypeConfig != null) { + return HoodieLogBlock.HoodieLogBlockType.valueOf(getStringOrDefault(LOG_BLOCK_TYPE)); + } else { + return null; + } + } + /** * Get the relative path of archive log folder under metafolder, for this table. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java similarity index 81% rename from hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 868c7cb895c76..2f5972f2edbf4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -30,9 +30,11 @@ import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -47,7 +49,9 @@ import java.io.IOException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Deque; import java.util.HashSet; import java.util.List; @@ -71,9 +75,9 @@ *

* This results in two I/O passes over the log file. */ -public abstract class AbstractHoodieLogRecordScanner { +public abstract class AbstractHoodieLogRecordReader { - private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordScanner.class); + private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordReader.class); // Reader schema for the records protected final Schema readerSchema; @@ -114,12 +118,26 @@ public abstract class AbstractHoodieLogRecordScanner { private AtomicLong totalCorruptBlocks = new AtomicLong(0); // Store the last instant log blocks (needed to implement rollback) private Deque currentInstantLogBlocks = new ArrayDeque<>(); + // Enables inline reading for Hfile data blocks + protected final boolean enableInlineReading; + // Enables full scan of log records + protected final boolean enableFullScan; + private int totalScannedLogFiles; // Progress private float progress = 0.0f; + private AtomicLong repeatedCall = new AtomicLong(0L); - protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, - int bufferSize, Option instantRange, boolean withOperationField) { + protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, + String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, + int bufferSize, Option instantRange, boolean withOperationField) { + this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField, + false, true); + } + + protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, + String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, + int bufferSize, Option instantRange, boolean withOperationField, + boolean enableInlineReading, boolean enableFullScan) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); @@ -132,18 +150,31 @@ protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List keys) { + currentInstantLogBlocks = new ArrayDeque<>(); + progress = 0.0f; + totalLogFiles = new AtomicLong(0); + totalRollbacks = new AtomicLong(0); + totalCorruptBlocks = new AtomicLong(0); + totalLogBlocks = new AtomicLong(0); + totalLogRecords = new AtomicLong(0); HoodieLogFormatReader logFormatReaderWrapper = null; HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline(); HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants(); @@ -152,7 +183,7 @@ public void scan() { // iterate over the paths logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), - readerSchema, readBlocksLazily, reverseReader, bufferSize); + readerSchema, readBlocksLazily, reverseReader, bufferSize, enableInlineReading); Set scannedLogFiles = new HashSet<>(); while (logFormatReaderWrapper.hasNext()) { HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); @@ -160,16 +191,16 @@ public void scan() { scannedLogFiles.add(logFile); totalLogFiles.set(scannedLogFiles.size()); // Use the HoodieLogFileReader to iterate through the blocks in the log file - HoodieLogBlock r = logFormatReaderWrapper.next(); - final String instantTime = r.getLogBlockHeader().get(INSTANT_TIME); + HoodieLogBlock logBlock = logFormatReaderWrapper.next(); + final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); totalLogBlocks.incrementAndGet(); - if (r.getBlockType() != CORRUPT_BLOCK - && !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime + if (logBlock.getBlockType() != CORRUPT_BLOCK + && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime )) { // hit a block with instant time greater than should be processed, stop processing further break; } - if (r.getBlockType() != CORRUPT_BLOCK && r.getBlockType() != COMMAND_BLOCK) { + if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) { if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) || inflightInstantsTimeline.containsInstant(instantTime)) { // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one @@ -180,28 +211,29 @@ public void scan() { continue; } } - switch (r.getBlockType()) { + switch (logBlock.getBlockType()) { case HFILE_DATA_BLOCK: case AVRO_DATA_BLOCK: + case PARQUET_DATA_BLOCK: LOG.info("Reading a data block from file " + logFile.getPath() + " at instant " - + r.getLogBlockHeader().get(INSTANT_TIME)); - if (isNewInstantBlock(r) && !readBlocksLazily) { + + logBlock.getLogBlockHeader().get(INSTANT_TIME)); + if (isNewInstantBlock(logBlock) && !readBlocksLazily) { // If this is an avro data block belonging to a different commit/instant, // then merge the last blocks and records into the main result - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size()); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys); } // store the current block - currentInstantLogBlocks.push(r); + currentInstantLogBlocks.push(logBlock); break; case DELETE_BLOCK: LOG.info("Reading a delete block from file " + logFile.getPath()); - if (isNewInstantBlock(r) && !readBlocksLazily) { + if (isNewInstantBlock(logBlock) && !readBlocksLazily) { // If this is a delete data block belonging to a different commit/instant, // then merge the last blocks and records into the main result - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size()); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys); } // store deletes so can be rolled back - currentInstantLogBlocks.push(r); + currentInstantLogBlocks.push(logBlock); break; case COMMAND_BLOCK: // Consider the following scenario @@ -218,9 +250,9 @@ public void scan() { // both B1 & B2 LOG.info("Reading a command block from file " + logFile.getPath()); // This is a command block - take appropriate action based on the command - HoodieCommandBlock commandBlock = (HoodieCommandBlock) r; + HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock; String targetInstantForCommandBlock = - r.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME); + logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME); switch (commandBlock.getType()) { // there can be different types of command blocks case ROLLBACK_PREVIOUS_BLOCK: // Rollback the last read log block @@ -264,7 +296,7 @@ public void scan() { LOG.info("Found a corrupt block in " + logFile.getPath()); totalCorruptBlocks.incrementAndGet(); // If there is a corrupt block - we will assume that this was the next data block - currentInstantLogBlocks.push(r); + currentInstantLogBlocks.push(logBlock); break; default: throw new UnsupportedOperationException("Block type not supported yet"); @@ -273,7 +305,7 @@ public void scan() { // merge the last read block when all the blocks are done reading if (!currentInstantLogBlocks.isEmpty()) { LOG.info("Merging the final data blocks"); - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size()); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys); } // Done progress = 1.0f; @@ -308,9 +340,14 @@ private boolean isNewInstantBlock(HoodieLogBlock logBlock) { * Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to * handle it. */ - private void processDataBlock(HoodieDataBlock dataBlock) throws Exception { + private void processDataBlock(HoodieDataBlock dataBlock, List keys) throws Exception { // TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use that here - List recs = dataBlock.getRecords(); + List recs = new ArrayList<>(); + if (keys.isEmpty()) { + recs = dataBlock.getRecords(); + } else { + recs = dataBlock.getRecords(keys); + } totalLogRecords.addAndGet(recs.size()); for (IndexedRecord rec : recs) { processNextRecord(createHoodieRecord(rec)); @@ -342,17 +379,20 @@ protected HoodieRecord createHoodieRecord(IndexedRecord rec) { /** * Process the set of log blocks belonging to the last instant which is read fully. */ - private void processQueuedBlocksForInstant(Deque lastBlocks, int numLogFilesSeen) throws Exception { - while (!lastBlocks.isEmpty()) { - LOG.info("Number of remaining logblocks to merge " + lastBlocks.size()); + private void processQueuedBlocksForInstant(Deque logBlocks, int numLogFilesSeen, List keys) throws Exception { + while (!logBlocks.isEmpty()) { + LOG.warn("Number of remaining logblocks to merge " + logBlocks.size()); // poll the element at the bottom of the stack since that's the order it was inserted - HoodieLogBlock lastBlock = lastBlocks.pollLast(); + HoodieLogBlock lastBlock = logBlocks.pollLast(); switch (lastBlock.getBlockType()) { case AVRO_DATA_BLOCK: - processDataBlock((HoodieAvroDataBlock) lastBlock); + processDataBlock((HoodieAvroDataBlock) lastBlock, keys); break; case HFILE_DATA_BLOCK: - processDataBlock((HoodieHFileDataBlock) lastBlock); + processDataBlock((HoodieHFileDataBlock) lastBlock, keys); + break; + case PARQUET_DATA_BLOCK: + processDataBlock((HoodieParquetDataBlock) lastBlock, keys); break; case DELETE_BLOCK: Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey); @@ -432,6 +472,6 @@ public Builder withOperationField(boolean withOperationField) { throw new UnsupportedOperationException(); } - public abstract AbstractHoodieLogRecordScanner build(); + public abstract AbstractHoodieLogRecordReader build(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index f0f3842e97b36..0fee6b65fe802 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; +import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.CorruptedLogFileException; @@ -70,17 +71,24 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private long reverseLogFilePosition; private long lastReverseLogFilePosition; private boolean reverseReader; + private boolean enableInlineReading; private boolean closed = false; private transient Thread shutdownThread = null; public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { + this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false); + } + + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, + boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading) throws IOException { FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); this.logFile = logFile; this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize); this.readerSchema = readerSchema; this.readBlockLazily = readBlockLazily; this.reverseReader = reverseReader; + this.enableInlineReading = enableInlineReading; if (this.reverseReader) { this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen(); } @@ -248,7 +256,10 @@ private HoodieLogBlock readBlock() throws IOException { } case HFILE_DATA_BLOCK: return new HoodieHFileDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, - contentPosition, contentLength, blockEndPos, readerSchema, header, footer); + contentPosition, contentLength, blockEndPos, readerSchema, header, footer, enableInlineReading); + case PARQUET_DATA_BLOCK: + return new HoodieParquetDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, + contentPosition, contentLength, blockEndPos, readerSchema, header, footer); case DELETE_BLOCK: return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, contentPosition, contentLength, blockEndPos, header, footer); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index 72672278b6b65..36fa187aa4111 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -49,7 +49,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class); HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, - boolean reverseLogReader, int bufferSize) throws IOException { + boolean reverseLogReader, int bufferSize) throws IOException { + this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader, bufferSize, false); + } + + HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, + boolean reverseLogReader, int bufferSize, boolean enableInlineReading) throws IOException { this.logFiles = logFiles; this.fs = fs; this.readerSchema = readerSchema; @@ -59,7 +64,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.prevReadersInOpenState = new ArrayList<>(); if (logFiles.size() > 0) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 18b267294aa4a..0c3fb0c02552c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -54,7 +54,7 @@ * This results in two I/O passes over the log file. */ -public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner +public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader implements Iterable> { private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class); @@ -77,8 +77,9 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List instantRange, boolean autoScan, ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, - boolean withOperationField) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField); + boolean withOperationField, boolean enableInlineReading, boolean enableFullScan) { + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField, + enableInlineReading, enableFullScan); try { // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), @@ -166,7 +167,7 @@ public void close() { /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ - public static class Builder extends AbstractHoodieLogRecordScanner.Builder { + public static class Builder extends AbstractHoodieLogRecordReader.Builder { protected FileSystem fs; protected String basePath; protected List logFilePaths; @@ -276,7 +277,7 @@ public HoodieMergedLogRecordScanner build() { return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, bufferSize, spillableMapBasePath, instantRange, autoScan, - diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField); + diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, false, true); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 8b26f72579c80..f781a148a3938 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -31,7 +31,7 @@ /** * A scanner used to scan hoodie unmerged log records. */ -public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { +public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReader { private final LogRecordScannerCallback callback; @@ -72,7 +72,7 @@ public static interface LogRecordScannerCallback { /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ - public static class Builder extends AbstractHoodieLogRecordScanner.Builder { + public static class Builder extends AbstractHoodieLogRecordReader.Builder { private FileSystem fs; private String basePath; private List logFilePaths; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 8f5b741f37909..95047a21f7e70 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -79,6 +79,8 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, Lis return new HoodieAvroDataBlock(recordList, header); case HFILE_DATA_BLOCK: return new HoodieHFileDataBlock(recordList, header); + case PARQUET_DATA_BLOCK: + return new HoodieParquetDataBlock(recordList, header); default: throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented"); } @@ -111,6 +113,17 @@ public List getRecords() { return records; } + /** + * Batch get of keys of interest. Implementation can choose to either do full scan and return matched entries or + * do a seek based parsing and return matched entries. + * @param keys keys of interest. + * @return List of IndexedRecords for the keys of interest. + * @throws IOException + */ + public List getRecords(List keys) throws IOException { + throw new UnsupportedOperationException("On demand batch get based on keys not supported"); + } + public Schema getSchema() { // if getSchema was invoked before converting byte [] to records if (records == null) { @@ -119,7 +132,7 @@ public Schema getSchema() { return schema; } - private void createRecordsFromContentBytes() throws IOException { + protected void createRecordsFromContentBytes() throws IOException { if (readBlockLazily && !getContent().isPresent()) { // read log block contents from disk inflate(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 6d2682a4ffa09..d40d8e8e1f614 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -19,12 +19,16 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieHFileReader; + +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -44,6 +48,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -61,6 +66,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { private static final Logger LOG = LogManager.getLogger(HoodieHFileDataBlock.class); private static Compression.Algorithm compressionAlgorithm = Compression.Algorithm.GZ; private static int blockSize = 1 * 1024 * 1024; + private boolean enableInlineReading = false; public HoodieHFileDataBlock(@Nonnull Map logBlockHeader, @Nonnull Map logBlockFooter, @@ -71,10 +77,11 @@ public HoodieHFileDataBlock(@Nonnull Map logBlockHea public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option content, boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, - Map header, Map footer) { + Map header, Map footer, boolean enableInlineReading) { super(content, inputStream, readBlockLazily, Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, footer); + this.enableInlineReading = enableInlineReading; } public HoodieHFileDataBlock(@Nonnull List records, @Nonnull Map header) { @@ -141,6 +148,50 @@ protected byte[] serializeRecords() throws IOException { return baos.toByteArray(); } + @Override + protected void createRecordsFromContentBytes() throws IOException { + if (enableInlineReading) { + getRecords(Collections.emptyList()); + } else { + super.createRecordsFromContentBytes(); + } + } + + @Override + public List getRecords(List keys) throws IOException { + readWithInlineFS(keys); + return records; + } + + private void readWithInlineFS(List keys) throws IOException { + boolean enableFullScan = keys.isEmpty(); + // Get schema from the header + Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + // If readerSchema was not present, use writerSchema + if (schema == null) { + schema = writerSchema; + } + Configuration conf = new Configuration(); + CacheConfig cacheConf = new CacheConfig(conf); + Configuration inlineConf = new Configuration(); + inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName()); + + Path inlinePath = InLineFSUtils.getInlineFilePath( + getBlockContentLocation().get().getLogFile().getPath(), + getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(), + getBlockContentLocation().get().getContentPositionInLogFile(), + getBlockContentLocation().get().getBlockSize()); + if (!enableFullScan) { + // HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks. + Collections.sort(keys); + } + HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, cacheConf, inlinePath.getFileSystem(inlineConf)); + List> logRecords = enableFullScan ? reader.readAllRecords(writerSchema, schema) : + reader.readRecordsByKey(keys, schema); + reader.close(); + this.records = logRecords.stream().map(t -> t.getSecond()).collect(Collectors.toList()); + } + @Override protected void deserializeRecords() throws IOException { // Get schema from the header diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index 2fbcd992087e2..577240093e770 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -109,7 +109,7 @@ public Option getContent() { * Type of the log block WARNING: This enum is serialized as the ordinal. Only add new enums at the end. */ public enum HoodieLogBlockType { - COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, HFILE_DATA_BLOCK + COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, HFILE_DATA_BLOCK, PARQUET_DATA_BLOCK } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java new file mode 100644 index 0000000000000..c647deadb9c2d --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetReader; +import org.apache.hudi.io.storage.HoodieParquetStreamWriter; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import javax.annotation.Nonnull; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * HoodieParquetDataBlock contains a list of records serialized using Parquet. + */ +public class HoodieParquetDataBlock extends HoodieDataBlock { + + public HoodieParquetDataBlock(@Nonnull Map logBlockHeader, + @Nonnull Map logBlockFooter, + @Nonnull Option blockContentLocation, @Nonnull Option content, + FSDataInputStream inputStream, boolean readBlockLazily) { + super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily); + } + + public HoodieParquetDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option content, + boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, + Map header, Map footer) { + super(content, inputStream, readBlockLazily, + Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, + footer); + } + + public HoodieParquetDataBlock(@Nonnull List records, @Nonnull Map header) { + super(records, header, new HashMap<>()); + } + + @Override + public HoodieLogBlockType getBlockType() { + return HoodieLogBlockType.PARQUET_DATA_BLOCK; + } + + @Override + protected byte[] serializeRecords() throws IOException { + // TODO: Need to decide from where to fetch all config values required below. We can't re-use index config as the purpose is different. + // And these are very specific to data blocks. Once we have consensus, we might need to route them to log block constructors ( + // as of now, log block constructors does not take in any configs in general). + BloomFilter filter = BloomFilterFactory.createBloomFilter( + Integer.parseInt("60000"),//HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES.defaultValue()), + Double.parseDouble("0.000000001"),//HoodieIndexConfig.BLOOM_FILTER_FPP.defaultValue()), + Integer.parseInt("100000"),//HoodieIndexConfig.HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue()), + BloomFilterTypeCode.SIMPLE.name());//HoodieIndexConfig.BLOOM_INDEX_FILTER_TYPE.defaultValue()); + + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( + new AvroSchemaConverter().convert(schema), schema, Option.of(filter)); + + HoodieAvroParquetConfig avroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, + ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, + new Configuration(), Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue())); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream outputStream = null; + HoodieParquetStreamWriter parquetWriter = null; + try { + outputStream = new DataOutputStream(baos); + parquetWriter = new HoodieParquetStreamWriter<>(outputStream, avroParquetConfig); + Iterator itr = records.iterator(); + if (records.size() > 0) { + Schema.Field keyField = records.get(0).getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD); + if (keyField == null) { + throw new HoodieIOException("Record key field missing from schema for records to be written to Parquet data block"); + } + while (itr.hasNext()) { + IndexedRecord record = itr.next(); + String recordKey = record.get(keyField.pos()).toString(); + parquetWriter.writeAvro(recordKey, record); + } + outputStream.flush(); + } + } finally { + if (outputStream != null) { + outputStream.close(); + } + if (parquetWriter != null) { + parquetWriter.close(); + } + } + + return baos.toByteArray(); + } + + @Override + public List getRecords() { + try { + records = new ArrayList<>(); + // Get schema from the header + Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + // If readerSchema was not present, use writerSchema + if (schema == null) { + schema = writerSchema; + } + Configuration conf = new Configuration(); + Configuration inlineConf = new Configuration(); + inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName()); + + Path inlinePath = InLineFSUtils.getInlineFilePath( + getBlockContentLocation().get().getLogFile().getPath(), + getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(), + getBlockContentLocation().get().getContentPositionInLogFile(), + getBlockContentLocation().get().getBlockSize()); + + HoodieParquetReader parquetReader = new HoodieParquetReader<>(inlineConf, inlinePath); + Iterator recordIterator = parquetReader.getRecordIterator(schema); + while (recordIterator.hasNext()) { + records.add(recordIterator.next()); + } + return records; + } catch (IOException e) { + throw new HoodieIOException("Reading parquet inlining failed ", e); + } + } + + @Override + protected void deserializeRecords() throws IOException { + throw new IOException("Not implemented"); + } +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetConfig.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java rename to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetConfig.java diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index b954e57e77c7e..1bfa4ec6c3e24 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; @@ -33,6 +34,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; @@ -55,6 +57,7 @@ public class HoodieHFileReader implements HoodieFileRea private Path path; private Configuration conf; private HFile.Reader reader; + private FSDataInputStream fsDataInputStream; private Schema schema; // Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each // key retrieval. @@ -72,6 +75,13 @@ public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cac this.reader = HFile.createReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf); } + public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem inlineFs) throws IOException { + this.conf = configuration; + this.path = path; + this.fsDataInputStream = inlineFs.open(path); + this.reader = HFile.createReader(inlineFs, path, cacheConfig, configuration); + } + public HoodieHFileReader(byte[] content) throws IOException { Configuration conf = new Configuration(); Path path = new Path("hoodie"); @@ -164,6 +174,25 @@ public List> readAllRecords() throws IOException { return readAllRecords(schema, schema); } + public List> readRecordsByKey(List keys) throws IOException { + reader.loadFileInfo(); + Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes()))); + return readRecordsByKey(keys, schema); + } + + public List> readRecordsByKey(List keys, Schema schema) throws IOException { + this.schema = schema; + reader.loadFileInfo(); + List> records = new ArrayList<>(); + for (String key: keys) { + Option value = getRecordByKey(key, schema); + if (value.isPresent()) { + records.add(new Pair(key, value.get())); + } + } + return records; + } + @Override public Iterator getRecordIterator(Schema readerSchema) throws IOException { final HFileScanner scanner = reader.getScanner(false, false); @@ -217,7 +246,7 @@ public Option getRecordByKey(String key, Schema readerSchema) throws IOException synchronized (this) { if (keyScanner == null) { - keyScanner = reader.getScanner(true, true); + keyScanner = reader.getScanner(false, true); } if (keyScanner.seekTo(kv) == 0) { @@ -250,6 +279,9 @@ public synchronized void close() { try { reader.close(); reader = null; + if (fsDataInputStream != null) { + fsDataInputStream.close(); + } keyScanner = null; } catch (IOException e) { throw new HoodieIOException("Error closing the hfile reader", e); diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java new file mode 100644 index 0000000000000..5de1fe34c7763 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.avro.HoodieAvroWriteSupport; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * HoodieParquetStreamWriter wraps the ParquetWriter to assist in writing to OutputStream. + */ +public class HoodieParquetStreamWriter { + + private final ParquetWriter writer; + private final HoodieAvroWriteSupport writeSupport; + + public HoodieParquetStreamWriter(OutputStream bufferedOutputStream, + HoodieAvroParquetConfig parquetConfig) throws IOException { + writer = new Builder(new ParquetBufferedWriter(bufferedOutputStream), parquetConfig.getWriteSupport()) + .withWriteMode(ParquetFileWriter.Mode.CREATE) + .withCompressionCodec(parquetConfig.getCompressionCodecName()) + .withRowGroupSize(parquetConfig.getBlockSize()) + .withPageSize(parquetConfig.getPageSize()) + .withDictionaryPageSize(parquetConfig.getPageSize()) + .withDictionaryEncoding(parquetConfig.dictionaryEnabled()) + .withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION) + .withConf(parquetConfig.getHadoopConf()).build(); + + this.writeSupport = parquetConfig.getWriteSupport(); + } + + public void writeAvro(String key, R object) throws IOException { + writer.write(object); + writeSupport.add(key); + } + + public void close() throws IOException { + writer.close(); + } + + public long getDataSize() { + return writer.getDataSize(); + } + + // TODO: Need to understand if this is the right way to directly write data to output stream using Parquet writer + public static class ParquetBufferedWriter implements OutputFile { + + private final OutputStream out; + + public ParquetBufferedWriter(OutputStream out) { + this.out = out; + } + + @Override + public PositionOutputStream create(long blockSizeHint) throws IOException { + return createPositionOutputStream(); + } + + private PositionOutputStream createPositionOutputStream() { + return new PositionOutputStream() { + + int pos = 0; + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + pos++; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + pos += len; + } + }; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { + return createPositionOutputStream(); + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 0; + } + } + + private static class Builder extends ParquetWriter.Builder> { + + private final WriteSupport writeSupport; + + private Builder(Path file, WriteSupport writeSupport) { + super(file); + this.writeSupport = writeSupport; + } + + private Builder(OutputFile file, WriteSupport writeSupport) { + super(file); + this.writeSupport = writeSupport; + } + + @Override + protected Builder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return writeSupport; + } + } +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index b0940a7f3469c..abbea03f23df1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -81,7 +81,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { private final boolean reuse; // Readers for latest file slice corresponding to file groups in the metadata partition of interest - private Map> partitionReaders = new ConcurrentHashMap<>(); + private Map> partitionReaders = new ConcurrentHashMap<>(); public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapDirectory) { @@ -121,12 +121,12 @@ private void initIfNeeded() { @Override protected Option> getRecordByKeyFromMetadata(String key, String partitionName) { - Pair readers = openReadersIfNeeded(key, partitionName); + Pair readers = openReadersIfNeeded(key, partitionName); try { List timings = new ArrayList<>(); HoodieTimer timer = new HoodieTimer().startTimer(); HoodieFileReader baseFileReader = readers.getKey(); - HoodieMetadataMergedLogRecordScanner logRecordScanner = readers.getRight(); + HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); // Retrieve record from base file HoodieRecord hoodieRecord = null; @@ -146,7 +146,19 @@ protected Option> getRecordByKeyFromMetadata // Retrieve record from log file timer.startTimer(); if (logRecordScanner != null) { - Option> logHoodieRecord = logRecordScanner.getRecordByKey(key); + Option> logHoodieRecord = Option.empty(); + if (metadataConfig.enableFullScan()) { + // path which does full scan of log files + logHoodieRecord = logRecordScanner.getRecordByKey(key); + } else { + // this path will do seeks pertaining to the keys passed in + List>>> keyHoodieRecordPair = + logRecordScanner.getRecordsByKeys(Collections.singletonList(key)); + if (keyHoodieRecordPair.size() == 1 && keyHoodieRecordPair.get(0).getKey().equals(key)) { + logHoodieRecord = keyHoodieRecordPair.get(0).getValue(); + } + } + if (logHoodieRecord.isPresent()) { if (hoodieRecord != null) { // Merge the payloads @@ -172,13 +184,13 @@ protected Option> getRecordByKeyFromMetadata /** * Returns a new pair of readers to the base and log files. */ - private Pair openReadersIfNeeded(String key, String partitionName) { + private Pair openReadersIfNeeded(String key, String partitionName) { return partitionReaders.computeIfAbsent(partitionName, k -> { try { final long baseFileOpenMs; final long logScannerOpenMs; HoodieFileReader baseFileReader = null; - HoodieMetadataMergedLogRecordScanner logRecordScanner = null; + HoodieMetadataMergedLogRecordReader logRecordScanner = null; // Metadata is in sync till the latest completed instant on the dataset HoodieTimer timer = new HoodieTimer().startTimer(); @@ -192,7 +204,7 @@ private Pair openReaders baseFileOpenMs = baseFileReaderOpenTimePair.getValue(); // Open the log record scanner using the log files from the latest file slice - Pair logRecordScannerOpenTimePair = getLogRecordScanner(slice); + Pair logRecordScannerOpenTimePair = getLogRecordScanner(slice); logRecordScanner = logRecordScannerOpenTimePair.getKey(); logScannerOpenMs = logRecordScannerOpenTimePair.getValue(); @@ -244,7 +256,7 @@ private Set getValidInstantTimestamps() { return validInstantTimestamps; } - private Pair getLogRecordScanner(FileSlice slice) { + private Pair getLogRecordScanner(FileSlice slice) { HoodieTimer timer = new HoodieTimer().startTimer(); List logFilePaths = slice.getLogFiles() .sorted(HoodieLogFile.getLogFileComparator()) @@ -261,7 +273,7 @@ private Pair getLogRecordScanner(Fil // Load the schema Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build(); - HoodieMetadataMergedLogRecordScanner logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder() + HoodieMetadataMergedLogRecordReader logRecordScanner = HoodieMetadataMergedLogRecordReader.newBuilder() .withFileSystem(metadataMetaClient.getFs()) .withBasePath(metadataBasePath) .withLogFilePaths(logFilePaths) @@ -273,6 +285,8 @@ private Pair getLogRecordScanner(Fil .withDiskMapType(commonConfig.getSpillableDiskMapType()) .withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled()) .withLogBlockTimestamps(validInstantTimestamps) + .enableFullScan(metadataConfig.enableFullScan()) + .enableInlineReading(metadataConfig.enableInlineReading()) .build(); Long logScannerOpenMs = timer.endTimer(); @@ -319,7 +333,7 @@ public void close() { } private synchronized void close(String partitionName) { - Pair readers = partitionReaders.remove(partitionName); + Pair readers = partitionReaders.remove(partitionName); if (readers != null) { try { if (readers.getKey() != null) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java similarity index 76% rename from hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java rename to hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java index 3132ea6346f3b..75df8bfa736c4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java @@ -19,12 +19,16 @@ package org.apache.hudi.metadata; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -32,26 +36,31 @@ import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.collection.Pair; /** * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is * useful in limiting memory usage when only a small subset of updates records are to be read. */ -public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordScanner { +public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordScanner { + + private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class); // Set of all record keys that are to be read in memory private Set mergeKeyFilter; - private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, + private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize, String spillableMapBasePath, Set mergeKeyFilter, ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, - Option instantRange) { + Option instantRange, boolean enableInlineReading, boolean enableFullScan) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize, - spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false); + spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false, + enableInlineReading, enableFullScan); this.mergeKeyFilter = mergeKeyFilter; - - performScan(); + if (enableFullScan) { + performScan(); + } } @Override @@ -71,8 +80,8 @@ protected void processNextDeletedKey(HoodieKey hoodieKey) { /** * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}. */ - public static HoodieMetadataMergedLogRecordScanner.Builder newBuilder() { - return new HoodieMetadataMergedLogRecordScanner.Builder(); + public static HoodieMetadataMergedLogRecordReader.Builder newBuilder() { + return new HoodieMetadataMergedLogRecordReader.Builder(); } /** @@ -85,11 +94,27 @@ public Option> getRecordByKey(String key) { return Option.ofNullable((HoodieRecord) records.get(key)); } + public List>>> getRecordsByKeys(List keys) { + records.close(); + scan(keys); + List>>> metadataRecords = new ArrayList<>(); + keys.forEach(entry -> { + if (records.containsKey(entry)) { + metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord) records.get(entry)))); + } else { + metadataRecords.add(Pair.of(entry, Option.empty())); + } + }); + return metadataRecords; + } + /** * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}. */ public static class Builder extends HoodieMergedLogRecordScanner.Builder { private Set mergeKeyFilter = Collections.emptySet(); + private boolean enableFullScan; + private boolean enableInlineReading; @Override public Builder withFileSystem(FileSystem fs) { @@ -171,11 +196,21 @@ public Builder withLogBlockTimestamps(Set validLogBlockTimestamps) { return this; } + public Builder enableFullScan(boolean enableFullScan) { + this.enableFullScan = enableFullScan; + return this; + } + + public Builder enableInlineReading(boolean enableInlineReading) { + this.enableInlineReading = enableInlineReading; + return this; + } + @Override - public HoodieMetadataMergedLogRecordScanner build() { - return new HoodieMetadataMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, + public HoodieMetadataMergedLogRecordReader build() { + return new HoodieMetadataMergedLogRecordReader(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter, - diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange); + diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, enableInlineReading, enableFullScan); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index a647da9b9b99f..4065685a61d0d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -137,6 +138,30 @@ protected Properties getPropertiesForKeyGen() { public void testReader(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean partitioned) throws Exception { + testReaderInternal(diskMapType, isCompressionEnabled, partitioned); + } + + @Test + public void testHFileInlineReader() throws Exception { + testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false, + HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK); + } + + @Test + public void testParquetInlineReader() throws Exception { + testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false, + HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK); + } + + private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean partitioned) throws Exception { + testReaderInternal(diskMapType, isCompressionEnabled, partitioned, HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK); + } + + private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean partitioned, HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); @@ -175,7 +200,7 @@ public void testReader(ExternalSpillableMap.DiskMapType diskMapType, } else { writer = InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant, - instantTime, 120, 0, logVersion); + instantTime, 120, 0, logVersion, logBlockType); } long size = writer.getCurrentSize(); writer.close(); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index d10ccfca91594..e41058614b7eb 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -27,7 +27,10 @@ import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; @@ -301,7 +304,14 @@ public static HoodieLogFormat.Writer writeRollback(File partitionDir, FileSystem public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, String fileId, - String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) + String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) throws IOException, InterruptedException { + return writeDataBlockToLogFile(partitionDir, fs, schema, fileId, baseCommit, newCommit, numberOfRecords, offset, logVersion, HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK); + } + + public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, String + fileId, + String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion, + HoodieLogBlock.HoodieLogBlockType logBlockType) throws InterruptedException, IOException { HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion) @@ -314,7 +324,14 @@ public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); - HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header); + HoodieDataBlock dataBlock = null; + if (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) { + dataBlock = new HoodieHFileDataBlock(records, header); + } else if (logBlockType == HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) { + dataBlock = new HoodieParquetDataBlock(records, header); + } else { + dataBlock = new HoodieAvroDataBlock(records, header); + } writer.appendBlock(dataBlock); return writer; }