Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header)
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());
List<HoodieLogBlock> blocks = new ArrayList<>(2);
if (recordList.size() > 0) {
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockType(), recordList, header));
}
if (keysToDelete.size() > 0) {
blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -679,16 +680,21 @@ public HoodieFileFormat getLogFileFormat() {
return metaClient.getTableConfig().getLogFileFormat();
}

public HoodieLogBlockType getLogDataBlockFormat() {
switch (getBaseFileFormat()) {
case PARQUET:
case ORC:
return HoodieLogBlockType.AVRO_DATA_BLOCK;
case HFILE:
return HoodieLogBlockType.HFILE_DATA_BLOCK;
default:
throw new HoodieException("Base file format " + getBaseFileFormat()
+ " does not have associated log block format");
public HoodieLogBlockType getLogDataBlockType() {
HoodieLogBlock.HoodieLogBlockType logBlockType = metaClient.getTableConfig().getLogBlockFormat();
if (logBlockType != null) {
return logBlockType;
} else {
switch (getBaseFileFormat()) {
case PARQUET:
case ORC:
return HoodieLogBlockType.AVRO_DATA_BLOCK;
case HFILE:
return HoodieLogBlockType.HFILE_DATA_BLOCK;
default:
throw new HoodieException("Base file format " + getBaseFileFormat()
+ " does not have associated log block format");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.storage;

import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.engine.TaskContextSupplier;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestHoodieHFileReaderWriter {
private final Path filePath = new Path(System.getProperty("java.io.tmpdir") + "/f1_1-0-1_000.hfile");

@BeforeEach
@AfterEach
public void clearTempFile() {
File file = new File(filePath.toString());
if (file.exists()) {
file.delete();
}
}

private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws Exception {
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name());
Configuration conf = new Configuration();
TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class);
String instantTime = "000";

HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
filter);
return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier);
}

@Test
public void testWriteReadHFile() throws Exception {
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc");
HoodieHFileWriter writer = createHFileWriter(avroSchema);
List<String> keys = new ArrayList<>();
Map<String, GenericRecord> recordMap = new HashMap<>();
for (int i = 0; i < 100; i++) {
GenericRecord record = new GenericData.Record(avroSchema);
String key = String.format("%s%04d", "key", i);
record.put("_row_key", key);
keys.add(key);
record.put("time", Integer.toString(RANDOM.nextInt()));
record.put("number", i);
writer.writeAvro(key, record);
recordMap.put(key, record);
}
writer.close();

Configuration conf = new Configuration();
CacheConfig cacheConfig = new CacheConfig(conf);
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf));
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())));
hoodieHFileReader.close();

for (int i = 0; i < 20; i++) {
int randomRowstoFetch = 5 + RANDOM.nextInt(50);
Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
List<String> rowsList = new ArrayList<>(rowsToFetch);
Collections.sort(rowsList);
hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf));
List<Pair<String, GenericRecord>> result = hoodieHFileReader.readRecordsByKey(rowsList);
assertEquals(result.size(), randomRowstoFetch);
result.forEach(entry -> {
assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
});
hoodieHFileReader.close();
}
}

private Set<String> getRandomKeys(int count, List<String> keys) {
Set<String> rowKeys = new HashSet<>();
int totalKeys = keys.size();
while (rowKeys.size() < count) {
int index = RANDOM.nextInt(totalKeys);
if (!rowKeys.contains(index)) {
rowKeys.add(keys.get(index));
}
}
return rowKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ public static List<Arguments> bootstrapAndTableOperationTestArgs() {
);
}

public static List<Arguments> tableOperationTestArgs() {
return asList(
Arguments.of(COPY_ON_WRITE, true, false),
Arguments.of(COPY_ON_WRITE, true, true),
Arguments.of(COPY_ON_WRITE, false, true),
Arguments.of(MERGE_ON_READ, true, false)
);
}

/**
* Metadata Table bootstrap scenarios.
*/
Expand Down Expand Up @@ -221,9 +230,9 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep
* Test various table operations sync to Metadata Table correctly.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testTableOperations(HoodieTableType tableType) throws Exception {
init(tableType);
@MethodSource("tableOperationTestArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan, boolean enableInlineReading) throws Exception {
init(tableType, true, enableFullScan, enableInlineReading);
doWriteInsertAndUpsert(testTable);

// trigger an upsert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public void init(HoodieTableType tableType) throws IOException {
}

public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException {
init(tableType, enableMetadataTable, true, false);
}

public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableInineReading) throws IOException {
this.tableType = tableType;
initPath();
initSparkContexts("TestHoodieMetadata");
Expand All @@ -80,7 +84,8 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable) throws
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
writeConfig = getWriteConfig(true, enableMetadataTable);
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, false,
enableFullScan, enableInineReading).build();
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
}

Expand Down Expand Up @@ -256,7 +261,13 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, bo
return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics);
}

protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics) {
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, false);
}

protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics, boolean enableFullScan, boolean enableInlineReading) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
.withAutoCommit(autoCommit)
Expand All @@ -271,6 +282,8 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(useFileListingMetadata)
.enableFullScan(enableFullScan)
.enableInlineReading(enableInlineReading)
.enableMetrics(enableMetrics).build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Parallelism to use, when listing the table on lake storage.");

public static final ConfigProperty<Boolean> ENABLE_INLINE_READING_LOG_FILES = ConfigProperty
.key(METADATA_PREFIX + ".enable.inline.reading.log.files")
.defaultValue(true)
.sinceVersion("0.10.0")
.withDocumentation("Enable inline reading of Log files");

public static final ConfigProperty<Boolean> ENABLE_FULL_SCAN_LOG_FILES = ConfigProperty
.key(METADATA_PREFIX + ".enable.full.scan.log.files")
.defaultValue(true)
.sinceVersion("0.10.0")
.withDocumentation("Enable full scanning of log files while reading log records");

private HoodieMetadataConfig() {
super();
}
Expand Down Expand Up @@ -143,6 +155,14 @@ public String getDirectoryFilterRegex() {
return getString(DIR_FILTER_REGEX);
}

public boolean enableFullScan() {
return getBoolean(ENABLE_FULL_SCAN_LOG_FILES);
}

public boolean enableInlineReading() {
return getBoolean(ENABLE_INLINE_READING_LOG_FILES);
}

public static class Builder {

private final HoodieMetadataConfig metadataConfig = new HoodieMetadataConfig();
Expand Down Expand Up @@ -210,6 +230,16 @@ public Builder withDirectoryFilterRegex(String regex) {
return this;
}

public Builder enableFullScan(boolean enableFullScan) {
metadataConfig.setValue(ENABLE_FULL_SCAN_LOG_FILES, String.valueOf(enableFullScan));
return this;
}

public Builder enableInlineReading(boolean enableInlineReading) {
metadataConfig.setValue(ENABLE_INLINE_READING_LOG_FILES, String.valueOf(enableInlineReading));
return this;
}

public HoodieMetadataConfig build() {
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
return metadataConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -119,6 +120,11 @@ public class HoodieTableConfig extends HoodieConfig {
.withAlternatives("hoodie.table.rt.file.format")
.withDocumentation("Log format used for the delta logs.");

public static final ConfigProperty<String> LOG_BLOCK_TYPE = ConfigProperty
.key("hoodie.table.log.block.type")
.noDefaultValue()
.withDocumentation("Log block type used for the delta logs.");

public static final ConfigProperty<String> TIMELINE_LAYOUT_VERSION = ConfigProperty
.key("hoodie.timeline.layout.version")
.noDefaultValue()
Expand Down Expand Up @@ -187,7 +193,6 @@ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName

/**
* For serializing and de-serializing.
*
*/
public HoodieTableConfig() {
super();
Expand Down Expand Up @@ -342,6 +347,20 @@ public HoodieFileFormat getLogFileFormat() {
return HoodieFileFormat.valueOf(getStringOrDefault(LOG_FILE_FORMAT));
}

/**
* Get the log block Format.
*
* @return HoodieBlockFormat for the log block
*/
public HoodieLogBlock.HoodieLogBlockType getLogBlockFormat() {
String logBlockTypeConfig = getString(LOG_BLOCK_TYPE);
if (logBlockTypeConfig != null) {
return HoodieLogBlock.HoodieLogBlockType.valueOf(getStringOrDefault(LOG_BLOCK_TYPE));
} else {
return null;
}
}

/**
* Get the relative path of archive log folder under metafolder, for this table.
*/
Expand Down
Loading