Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker/compose/docker-compose_hadoop284_hive233_spark244.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ services:
container_name: hive-metastore-postgresql

hivemetastore:
image: apachehudi/hudi-hadoop_2.8.4-hive_2.3.3:latest
image: yihua/hudi-hadoop_2.8.4-hive_2.3.3:latest
hostname: hivemetastore
container_name: hivemetastore
links:
Expand All @@ -109,7 +109,7 @@ services:
- "namenode"

hiveserver:
image: apachehudi/hudi-hadoop_2.8.4-hive_2.3.3:latest
image: yihua/hudi-hadoop_2.8.4-hive_2.3.3:latest
hostname: hiveserver
container_name: hiveserver
env_file:
Expand Down
3 changes: 3 additions & 0 deletions docker/hoodie/hadoop/hive_base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ RUN echo "Hive URL is :${HIVE_URL}" && wget ${HIVE_URL} -O hive.tar.gz && \
wget https://jdbc.postgresql.org/download/postgresql-9.4.1212.jar -O $HIVE_HOME/lib/postgresql-jdbc.jar && \
rm hive.tar.gz && mkdir -p /var/hoodie/ws/docker/hoodie/hadoop/hive_base/target/

RUN rm hive/lib/hbase*
RUN rm hive/lib/commons-io-2.4.jar

#Spark should be compiled with Hive to be able to use it
#hive-site.xml should be copied to $SPARK_HOME/conf folder

Expand Down
19 changes: 19 additions & 0 deletions hudi-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,25 @@
</dependency>

<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.hudi.common.bloom.BloomFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;

public class HoodieHFileConfig {

public static final KeyValue.KVComparator HFILE_COMPARATOR = new HoodieHBaseKVComparator();
public static final CellComparator HFILE_COMPARATOR = new HoodieHBaseKVComparator();
public static final boolean PREFETCH_ON_OPEN = CacheConfig.DEFAULT_PREFETCH_ON_OPEN;
public static final boolean CACHE_DATA_IN_L1 = HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1;
// This is private in CacheConfig so have been copied here.
Expand All @@ -42,12 +42,12 @@ public class HoodieHFileConfig {
private final boolean dropBehindCacheCompaction;
private final Configuration hadoopConf;
private final BloomFilter bloomFilter;
private final KeyValue.KVComparator hfileComparator;
private final CellComparator hfileComparator;
private final String keyFieldName;

public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
long maxFileSize, String keyFieldName, boolean prefetchBlocksOnOpen, boolean cacheDataInL1,
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, KeyValue.KVComparator hfileComparator) {
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, CellComparator hfileComparator) {
this.hadoopConf = hadoopConf;
this.compressionAlgorithm = compressionAlgorithm;
this.blockSize = blockSize;
Expand Down Expand Up @@ -96,7 +96,7 @@ public BloomFilter getBloomFilter() {
return bloomFilter;
}

public KeyValue.KVComparator getHfileComparator() {
public CellComparator getHFileComparator() {
return hfileComparator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -38,8 +40,6 @@
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.io.Writable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down Expand Up @@ -95,6 +95,7 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC

HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
.withCompression(hfileConfig.getCompressionAlgorithm())
.withCellComparator(hfileConfig.getHFileComparator())
.build();

conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen()));
Expand All @@ -104,7 +105,6 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
this.writer = HFile.getWriterFactory(conf, cacheConfig)
.withPath(this.fs, this.file)
.withFileContext(context)
.withComparator(hfileConfig.getHfileComparator())
.create();

writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
Expand Down
20 changes: 20 additions & 0 deletions hudi-client/hudi-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,26 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
Expand Down
12 changes: 12 additions & 0 deletions hudi-client/hudi-spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
</dependency>

<!-- HBase - Tests -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
Expand All @@ -110,6 +116,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<scope>test</scope>
</dependency>

<!-- Hive - Tests -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.io.storage.HoodieHFileReader.KEY_SCHEMA;

/**
* Utility methods to aid testing inside the HoodieClient module.
*/
Expand Down Expand Up @@ -241,9 +243,9 @@ public static Stream<GenericRecord> readHFile(JavaSparkContext jsc, String[] pat
Schema schema = null;
for (String path : paths) {
try {
HFile.Reader reader = HFile.createReader(fs, new Path(path), cacheConfig, fs.getConf());
HFile.Reader reader = HFile.createReader(fs, new Path(path), cacheConfig, true, fs.getConf());
if (schema == null) {
schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get("schema".getBytes())));
schema = new Schema.Parser().parse(new String(reader.getHFileInfo().get(KEY_SCHEMA.getBytes())));
}
HFileScanner scanner = reader.getScanner(false, false);
if (!scanner.seekTo()) {
Expand All @@ -252,7 +254,7 @@ public static Stream<GenericRecord> readHFile(JavaSparkContext jsc, String[] pat
}

do {
Cell c = scanner.getKeyValue();
Cell c = scanner.getCell();
byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
valuesAsList.add(HoodieAvroUtils.bytesToAvro(value, schema));
} while (scanner.next());
Expand Down
3 changes: 1 addition & 2 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,13 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<!-- Unfortunately, HFile is packaged ONLY under hbase-server -->
<!-- Unfortunately, HFile is packaged ONLY under hbase-server -->
<scope>compile</scope>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
Expand Down Expand Up @@ -178,9 +179,7 @@ private static String getUserKeyFromCellKey(String cellKey) {
private static HFile.Reader createReader(String hFilePath, Configuration conf, FileSystem fileSystem) {
try {
LOG.info("Opening HFile for reading :" + hFilePath);
HFile.Reader reader = HFile.createReader(fileSystem, new HFilePathForReader(hFilePath),
new CacheConfig(conf), conf);
return reader;
return HFile.createReader(fileSystem, new HFilePathForReader(hFilePath), new CacheConfig(conf), true, conf);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
Expand Down Expand Up @@ -259,7 +258,7 @@ private void initIndexInfo() {

private HoodieBootstrapIndexInfo fetchBootstrapIndexInfo() throws IOException {
return TimelineMetadataUtils.deserializeAvroMetadata(
partitionIndexReader().loadFileInfo().get(INDEX_INFO_KEY),
partitionIndexReader().getHFileInfo().get(INDEX_INFO_KEY),
HoodieBootstrapIndexInfo.class);
}

Expand Down Expand Up @@ -306,7 +305,7 @@ private <T> List<T> getAllKeys(HFileScanner scanner, Function<String, T> convert
try {
boolean available = scanner.seekTo();
while (available) {
keys.add(converter.apply(getUserKeyFromCellKey(CellUtil.getCellKeyAsString(scanner.getKeyValue()))));
keys.add(converter.apply(getUserKeyFromCellKey(CellUtil.getCellKeyAsString(scanner.getCell()))));
available = scanner.next();
}
} catch (IOException ioe) {
Expand Down Expand Up @@ -528,13 +527,13 @@ public void close() {
@Override
public void begin() {
try {
HFileContext meta = new HFileContextBuilder().build();
HFileContext meta = new HFileContextBuilder().withCellComparator(new HoodieKVComparator()).build();
this.indexByPartitionWriter = HFile.getWriterFactory(metaClient.getHadoopConf(),
new CacheConfig(metaClient.getHadoopConf())).withPath(metaClient.getFs(), indexByPartitionPath)
.withFileContext(meta).withComparator(new HoodieKVComparator()).create();
.withFileContext(meta).create();
this.indexByFileIdWriter = HFile.getWriterFactory(metaClient.getHadoopConf(),
new CacheConfig(metaClient.getHadoopConf())).withPath(metaClient.getFs(), indexByFileIdPath)
.withFileContext(meta).withComparator(new HoodieKVComparator()).create();
.withFileContext(meta).create();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
Expand Down Expand Up @@ -581,6 +580,6 @@ public String getName() {
* This class is explicitly used as Key Comparator to workaround hard coded
* legacy format class names inside HBase. Otherwise we will face issues with shading.
*/
public static class HoodieKVComparator extends KeyValue.KVComparator {
public static class HoodieKVComparator extends CellComparatorImpl {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -427,6 +428,9 @@ private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int
processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
break;
case HFILE_DATA_BLOCK:
if (!keys.isPresent()) {
keys = Option.of(Collections.emptyList());
}
processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
break;
case PARQUET_DATA_BLOCK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@

package org.apache.hudi.common.table.log.block;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieHFileReader;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -31,15 +41,6 @@
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -95,6 +96,7 @@ protected byte[] serializeRecords(List<IndexedRecord> records) throws IOExceptio
HFileContext context = new HFileContextBuilder()
.withBlockSize(DEFAULT_BLOCK_SIZE)
.withCompression(compressionAlgorithm.get())
.withCellComparator(new HoodieHBaseKVComparator())
.build();

Configuration conf = new Configuration();
Expand Down Expand Up @@ -128,7 +130,7 @@ protected byte[] serializeRecords(List<IndexedRecord> records) throws IOExceptio
}

HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
.withOutputStream(ostream).withFileContext(context).withComparator(new HoodieHBaseKVComparator()).create();
.withOutputStream(ostream).withFileContext(context).create();

// Write the records
sortedRecordsMap.forEach((recordKey, recordBytes) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.apache.hudi.io.storage;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.CellComparatorImpl;

/**
* This class is explicitly used as Key Comparator to work around the hard coded
* legacy format class names inside HBase. Otherwise, we will face issues with shading.
*/
public class HoodieHBaseKVComparator extends KeyValue.KVComparator {
public class HoodieHBaseKVComparator extends CellComparatorImpl {
}
Loading