Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5bbecae
[HUDI-2783] Upgrade HBase
codope Nov 17, 2021
25896f3
Fix metablock buffered read in hfile reader
codope Dec 8, 2021
ca393d2
Replace hbase-shaded-server and shade some more deps
codope Jan 3, 2022
1295af0
Rebase master
yihua Feb 17, 2022
d2f94af
Fix HoodieHFileReader
yihua Feb 18, 2022
9dfe1cf
Revert hadoop upgrade
yihua Feb 18, 2022
b0b0e16
Fix hudi-hadoop-mr-bundle to include more dependencies and address CI…
yihua Feb 19, 2022
615325f
Fix hudi-utilities-bundle
yihua Feb 25, 2022
ff7635d
Fix hudi-common
yihua Feb 25, 2022
079d58b
Fix other bundles
yihua Feb 26, 2022
6ae79af
Use hadoop 2.10.1
yihua Feb 28, 2022
8c681be
Shade dependencies in hudi-hadoop-mr-bundle
yihua Feb 28, 2022
324c1ab
Shade dependencies in other bundles
yihua Feb 28, 2022
f9de883
Fix bundle shading
yihua Feb 28, 2022
4751986
Fix hbase shading
yihua Mar 2, 2022
d8a8b4f
Fix hudi-integ-test-bundle
yihua Mar 2, 2022
cf882fb
Fix rebase issue
yihua Mar 2, 2022
a9f41b2
Add hbase-site to skip version check
yihua Mar 4, 2022
feb4cb4
Add hbase-site.xml to hudi-common
yihua Mar 9, 2022
6e0909f
Update presto setup in docker demo
yihua Mar 9, 2022
6f81ee7
Some cleanup in docker setup
yihua Mar 9, 2022
2c8ad78
Simplify dependency changes
yihua Mar 10, 2022
756d26a
Trim bundle dependencies
yihua Mar 10, 2022
8980f84
Make adjustment to bundles
yihua Mar 10, 2022
4ecc97f
Change kafka-connect-bundle and timeline-server-bundle
yihua Mar 10, 2022
ea9eabe
Adjust hudi-integ-test-bundle
yihua Mar 12, 2022
2cf29a5
Improve HFile code logic
yihua Mar 18, 2022
7de5986
Restructure tests for HoodieFileReader and FoodieFileWriter and add m…
yihua Mar 18, 2022
61a7ecf
Add more HFile reader tests
yihua Mar 19, 2022
27ea17a
Improve TestInLineFileSystemHFileInLining
yihua Mar 19, 2022
07a5a46
Address comments
yihua Mar 19, 2022
2956986
Add hfile fixture and compatibility tests
yihua Mar 19, 2022
5327266
Add HFile compatibility tests
yihua Mar 20, 2022
2b2bd08
Exclude org.apache.hadoop.hbase.KeyValue.KeyComparator from shading f…
yihua Mar 20, 2022
c916edb
Improve docs
yihua Mar 21, 2022
0733dcc
Address minor comments
yihua Mar 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
case AVRO_DATA_BLOCK:
return new HoodieAvroDataBlock(recordList, header, keyField);
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(recordList, header, writeConfig.getHFileCompressionAlgorithm());
return new HoodieHFileDataBlock(
recordList, header, writeConfig.getHFileCompressionAlgorithm(), new Path(writeConfig.getBasePath()));
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(recordList, header, keyField, writeConfig.getParquetCompressionCodec());
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;

Expand All @@ -53,39 +54,42 @@ public static <T extends HoodieRecordPayload, R extends IndexedRecord, I, K, O>
return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier, config.populateMetaFields());
}
if (HFILE.getFileExtension().equals(extension)) {
return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
return newHFileFileWriter(
instantTime, path, config, schema, hoodieTable.getHadoopConf(), taskContextSupplier);
}
if (ORC.getFileExtension().equals(extension)) {
return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
return newOrcFileWriter(
instantTime, path, config, schema, hoodieTable.getHadoopConf(), taskContextSupplier);
}
throw new UnsupportedOperationException(extension + " format not supported yet.");
}

private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException {
return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier, populateMetaFields, populateMetaFields);
return newParquetFileWriter(instantTime, path, config, schema, hoodieTable.getHadoopConf(),
taskContextSupplier, populateMetaFields, populateMetaFields);
}

private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
TaskContextSupplier taskContextSupplier, boolean populateMetaFields, boolean enableBloomFilter) throws IOException {
Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(hoodieTable.getHadoopConf()).convert(schema), schema, filter);
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(conf).convert(schema), schema, filter);

HoodieAvroParquetConfig parquetConfig = new HoodieAvroParquetConfig(writeSupport, config.getParquetCompressionCodec(),
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
hoodieTable.getHadoopConf(), config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());
conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());

return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields);
}

private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newHFileFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newHFileFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
TaskContextSupplier taskContextSupplier) throws IOException {

BloomFilter filter = createBloomFilter(config);
HoodieHFileConfig hfileConfig = new HoodieHFileConfig(hoodieTable.getHadoopConf(),
HoodieHFileConfig hfileConfig = new HoodieHFileConfig(conf,
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(),
HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION,
filter, HFILE_COMPARATOR);
Expand All @@ -94,10 +98,10 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
}

private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
TaskContextSupplier taskContextSupplier) throws IOException {
BloomFilter filter = createBloomFilter(config);
HoodieOrcConfig orcConfig = new HoodieOrcConfig(hoodieTable.getHadoopConf(), config.getOrcCompressionCodec(),
HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf, config.getOrcCompressionCodec(),
config.getOrcStripeSize(), config.getOrcBlockSize(), config.getOrcMaxFileSize(), filter);
return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, taskContextSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.hudi.common.bloom.BloomFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;

public class HoodieHFileConfig {

public static final KeyValue.KVComparator HFILE_COMPARATOR = new HoodieHBaseKVComparator();
public static final CellComparator HFILE_COMPARATOR = new HoodieHBaseKVComparator();
public static final boolean PREFETCH_ON_OPEN = CacheConfig.DEFAULT_PREFETCH_ON_OPEN;
public static final boolean CACHE_DATA_IN_L1 = HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1;
// This is private in CacheConfig so have been copied here.
Expand All @@ -42,12 +42,12 @@ public class HoodieHFileConfig {
private final boolean dropBehindCacheCompaction;
private final Configuration hadoopConf;
private final BloomFilter bloomFilter;
private final KeyValue.KVComparator hfileComparator;
private final CellComparator hfileComparator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these backwards and forward compatible . i.e KVComparator is written into the HFile footer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion. there is a potential issue of 0.11 queries not being able to read 0.9 tables, because the org.apache.hadoop.hbase.KeyValue.KVComparator class is written into HFile 1.x (0.9 table) and since we now shade hbase in 0.11 as org.apache.hudi.org.apache.hadoop.hbase.KeyValue.KVComparator.

If we ship the KVComparator class along with our jar, then we will be good.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can just extend CellComparatorImpl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I took another approach here. In HBase, there is already backward compatibility to transform the old comparator. When we package the Hudi bundles, we need to make sure that org.apache.hadoop.hbase.KeyValue$KeyComparator (written to the metadata table HFiles in Hudi 0.9.0) is not shaded, not to break the backward compatibility logic in HBase. The new rule is added for this. More details can be found in the PR description.

<relocation>
  <pattern>org.apache.hadoop.hbase.</pattern>
  <shadedPattern>org.apache.hudi.org.apache.hadoop.hbase.</shadedPattern>
  <excludes>
    <exclude>org.apache.hadoop.hbase.KeyValue$KeyComparator</exclude>
  </excludes>
</relocation>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this way, we don't need to ship our own comparator in org.apache.hadoop.hbase and exclude that from shading.

private final String keyFieldName;

public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
long maxFileSize, String keyFieldName, boolean prefetchBlocksOnOpen, boolean cacheDataInL1,
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, KeyValue.KVComparator hfileComparator) {
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, CellComparator hfileComparator) {
this.hadoopConf = hadoopConf;
this.compressionAlgorithm = compressionAlgorithm;
this.blockSize = blockSize;
Expand Down Expand Up @@ -96,7 +96,7 @@ public BloomFilter getBloomFilter() {
return bloomFilter;
}

public KeyValue.KVComparator getHfileComparator() {
public CellComparator getHFileComparator() {
return hfileComparator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -38,8 +40,6 @@
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.io.Writable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down Expand Up @@ -95,6 +95,7 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC

HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
.withCompression(hfileConfig.getCompressionAlgorithm())
.withCellComparator(hfileConfig.getHFileComparator())
.build();

conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen()));
Expand All @@ -104,7 +105,6 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
this.writer = HFile.getWriterFactory(conf, cacheConfig)
.withPath(this.fs, this.file)
.withFileContext(context)
.withComparator(hfileConfig.getHfileComparator())
.create();

writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
Expand Down
Loading