Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dac1526
Pull classes from hbase-common
yihua Jan 21, 2022
0bcb7f6
Pull shaded protos used internally and HFile related classes
yihua Jan 23, 2022
c9812ec
Use hudi-io module in hudi-common for HBase file format and remove de…
yihua Jan 24, 2022
30615c4
Fix build for hudi-client-common
yihua Jan 24, 2022
636fd0d
Fix build for hudi-hadoop-mr
yihua Jan 25, 2022
30d7dd1
Fix build for hudi-spark-client
yihua Jan 25, 2022
b5b1a2a
Fix build for hudi-java-client
yihua Jan 25, 2022
5ed9505
Rename all remaining org.apache.hadoop.hbase to org.apache.hudi.hbase…
yihua Jan 26, 2022
bbdb91e
Fix HBase class reference in HoodieClientTestUtils
yihua Jan 26, 2022
2b9ec24
Remove HBase exception usage in HoodieTestHiveBase
yihua Jan 26, 2022
3241f42
Fix API changes in HoodieClientTestUtils
yihua Jan 26, 2022
e3d0d34
Fix tests in hudi-flink
yihua Jan 26, 2022
619f770
Fix backward compatibility logic for HFile comparator
yihua Jan 26, 2022
60ac4f0
Fix bundle deps
yihua Jan 26, 2022
2eb5347
Fix TestHoodieBackedTableMetadata imports
yihua Jan 26, 2022
91c24b6
Trim deps in hudi-io module
yihua Jan 26, 2022
8b7aba0
Fix HoodieHFileReader
yihua Jan 26, 2022
2356490
Address deps conflict
yihua Jan 26, 2022
df3ba6e
Exclude more deps
yihua Jan 26, 2022
59038aa
Add back hbase dependency for experimentation
yihua Jan 27, 2022
7bb321e
Run all tests in CI
yihua Jan 27, 2022
a2ab4f0
Remove exclusion in hudi-spark-client
yihua Jan 27, 2022
e697ccf
Add debug logs in ITTestBase and remove usage of htrace in HFileReade…
yihua Jan 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions hudi-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
Expand Down
22 changes: 20 additions & 2 deletions hudi-client/hudi-client-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
<exclusions>
Expand All @@ -140,9 +139,28 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;

import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hudi.hbase.io.compress.Compression;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.orc.CompressionKind;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@
import org.apache.hudi.common.bloom.BloomFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hudi.hbase.CellComparator;
import org.apache.hudi.hbase.io.compress.Compression;
import org.apache.hudi.hbase.io.hfile.CacheConfig;

public class HoodieHFileConfig {

public static final KeyValue.KVComparator HFILE_COMPARATOR = new HoodieHBaseKVComparator();
public static final CellComparator HFILE_COMPARATOR = new HoodieHBaseKVComparator();
public static final boolean PREFETCH_ON_OPEN = CacheConfig.DEFAULT_PREFETCH_ON_OPEN;
public static final boolean CACHE_DATA_IN_L1 = HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1;
public static final boolean CACHE_DATA_IN_L1 = false;// HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1;
// This is private in CacheConfig so have been copied here.
public static final boolean DROP_BEHIND_CACHE_COMPACTION = true;

Expand All @@ -42,12 +41,12 @@ public class HoodieHFileConfig {
private final boolean dropBehindCacheCompaction;
private final Configuration hadoopConf;
private final BloomFilter bloomFilter;
private final KeyValue.KVComparator hfileComparator;
private final CellComparator hfileComparator;
private final String keyFieldName;

public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
long maxFileSize, String keyFieldName, boolean prefetchBlocksOnOpen, boolean cacheDataInL1,
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, KeyValue.KVComparator hfileComparator) {
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, CellComparator hfileComparator) {
this.hadoopConf = hadoopConf;
this.compressionAlgorithm = compressionAlgorithm;
this.blockSize = blockSize;
Expand Down Expand Up @@ -96,7 +95,7 @@ public BloomFilter getBloomFilter() {
return bloomFilter;
}

public KeyValue.KVComparator getHfileComparator() {
public CellComparator getHFileComparator() {
return hfileComparator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hudi.hbase.KeyValue;
import org.apache.hudi.hbase.io.hfile.CacheConfig;
import org.apache.hudi.hbase.io.hfile.HFile;
import org.apache.hudi.hbase.io.hfile.HFileContext;
import org.apache.hudi.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.io.Writable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -56,6 +55,8 @@
*/
public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
implements HoodieFileWriter<R> {
// TODO(yihua): pulled from HColumnDescriptor
public static final String CACHE_DATA_IN_L1 = "CACHE_DATA_IN_L1";
private static AtomicLong recordIndex = new AtomicLong(1);

private final Path file;
Expand Down Expand Up @@ -95,16 +96,17 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC

HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
.withCompression(hfileConfig.getCompressionAlgorithm())
.withCellComparator(hfileConfig.getHFileComparator())
.build();

conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen()));
conf.set(HColumnDescriptor.CACHE_DATA_IN_L1, String.valueOf(hfileConfig.shouldCacheDataInL1()));
// HColumnDescriptor.CACHE_DATA_IN_L1
conf.set(CACHE_DATA_IN_L1, String.valueOf(hfileConfig.shouldCacheDataInL1()));
conf.set(DROP_BEHIND_CACHE_COMPACTION_KEY, String.valueOf(hfileConfig.shouldDropBehindCacheCompaction()));
CacheConfig cacheConfig = new CacheConfig(conf);
this.writer = HFile.getWriterFactory(conf, cacheConfig)
.withPath(this.fs, this.file)
.withFileContext(context)
.withComparator(hfileConfig.getHfileComparator())
.create();

writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.hbase.io.compress.Compression;
import org.apache.hudi.hbase.io.hfile.CacheConfig;
import org.apache.hudi.hbase.util.Pair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
Expand Down
20 changes: 20 additions & 0 deletions hudi-client/hudi-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,26 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
Expand Down
26 changes: 26 additions & 0 deletions hudi-client/hudi-spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,32 @@
<artifactId>parquet-avro</artifactId>
</dependency>

<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Hoodie - Test -->
<dependency>
<groupId>org.apache.hudi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.hbase.io.hfile.CacheConfig;
import org.apache.hudi.hbase.util.Pair;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.hbase.io.hfile.CacheConfig;
import org.apache.hudi.hbase.util.Pair;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieMetadataConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hudi.hbase.Cell;
import org.apache.hudi.hbase.io.hfile.CacheConfig;
import org.apache.hudi.hbase.io.hfile.HFile;
import org.apache.hudi.hbase.io.hfile.HFileScanner;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -241,9 +241,9 @@ public static Stream<GenericRecord> readHFile(JavaSparkContext jsc, String[] pat
Schema schema = null;
for (String path : paths) {
try {
HFile.Reader reader = HFile.createReader(fs, new Path(path), cacheConfig, fs.getConf());
HFile.Reader reader = HFile.createReader(fs, new Path(path), cacheConfig, true, fs.getConf());
if (schema == null) {
schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get("schema".getBytes())));
schema = new Schema.Parser().parse(new String(reader.getHFileInfo().get("schema".getBytes())));
}
HFileScanner scanner = reader.getScanner(false, false);
if (!scanner.seekTo()) {
Expand All @@ -252,7 +252,7 @@ public static Stream<GenericRecord> readHFile(JavaSparkContext jsc, String[] pat
}

do {
Cell c = scanner.getKeyValue();
Cell c = scanner.getCell();
byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
valuesAsList.add(HoodieAvroUtils.bytesToAvro(value, schema));
} while (scanner.next());
Expand Down
8 changes: 7 additions & 1 deletion hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@
</build>

<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-io</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
Expand Down Expand Up @@ -293,7 +299,7 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<!-- Unfortunately, HFile is packaged ONLY under hbase-server -->
<!-- Unfortunately, HFile is packaged ONLY under hbase-server -->
<scope>compile</scope>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.hbase.CellComparatorImpl;
import org.apache.hudi.hbase.CellUtil;
import org.apache.hudi.hbase.HConstants;
import org.apache.hudi.hbase.KeyValue;
import org.apache.hudi.hbase.io.hfile.CacheConfig;
import org.apache.hudi.hbase.io.hfile.HFile;
import org.apache.hudi.hbase.io.hfile.HFileContext;
import org.apache.hudi.hbase.io.hfile.HFileContextBuilder;
import org.apache.hudi.hbase.io.hfile.HFileScanner;
import org.apache.hudi.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -178,9 +179,7 @@ private static String getUserKeyFromCellKey(String cellKey) {
private static HFile.Reader createReader(String hFilePath, Configuration conf, FileSystem fileSystem) {
try {
LOG.info("Opening HFile for reading :" + hFilePath);
HFile.Reader reader = HFile.createReader(fileSystem, new HFilePathForReader(hFilePath),
new CacheConfig(conf), conf);
return reader;
return HFile.createReader(fileSystem, new HFilePathForReader(hFilePath), new CacheConfig(conf), true, conf);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
Expand Down Expand Up @@ -259,7 +258,7 @@ private void initIndexInfo() {

private HoodieBootstrapIndexInfo fetchBootstrapIndexInfo() throws IOException {
return TimelineMetadataUtils.deserializeAvroMetadata(
partitionIndexReader().loadFileInfo().get(INDEX_INFO_KEY),
partitionIndexReader().getHFileInfo().get(INDEX_INFO_KEY),
HoodieBootstrapIndexInfo.class);
}

Expand Down Expand Up @@ -306,7 +305,7 @@ private <T> List<T> getAllKeys(HFileScanner scanner, Function<String, T> convert
try {
boolean available = scanner.seekTo();
while (available) {
keys.add(converter.apply(getUserKeyFromCellKey(CellUtil.getCellKeyAsString(scanner.getKeyValue()))));
keys.add(converter.apply(getUserKeyFromCellKey(CellUtil.getCellKeyAsString(scanner.getCell()))));
available = scanner.next();
}
} catch (IOException ioe) {
Expand Down Expand Up @@ -528,13 +527,13 @@ public void close() {
@Override
public void begin() {
try {
HFileContext meta = new HFileContextBuilder().build();
HFileContext meta = new HFileContextBuilder().withCellComparator(new HoodieKVComparator()).build();
this.indexByPartitionWriter = HFile.getWriterFactory(metaClient.getHadoopConf(),
new CacheConfig(metaClient.getHadoopConf())).withPath(metaClient.getFs(), indexByPartitionPath)
.withFileContext(meta).withComparator(new HoodieKVComparator()).create();
.withFileContext(meta).create();
this.indexByFileIdWriter = HFile.getWriterFactory(metaClient.getHadoopConf(),
new CacheConfig(metaClient.getHadoopConf())).withPath(metaClient.getFs(), indexByFileIdPath)
.withFileContext(meta).withComparator(new HoodieKVComparator()).create();
.withFileContext(meta).create();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
Expand Down Expand Up @@ -581,6 +580,6 @@ public String getName() {
* This class is explicitly used as Key Comparator to workaround hard coded
* legacy format class names inside HBase. Otherwise we will face issues with shading.
*/
public static class HoodieKVComparator extends KeyValue.KVComparator {
public static class HoodieKVComparator extends CellComparatorImpl {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -424,6 +425,9 @@ private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int
processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
break;
case HFILE_DATA_BLOCK:
if (!keys.isPresent()) {
keys = Option.of(Collections.emptyList());
}
processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
break;
case DELETE_BLOCK:
Expand Down
Loading