Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion hudi-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -256,7 +266,7 @@
<groupId>${hive.groupid}</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
<!--scope>test</scope!-->
<classifier>${hive.exec.classifier}</classifier>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.config;

import org.apache.hadoop.hbase.io.compress.Compression;

import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
Expand All @@ -28,6 +29,7 @@
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.MetricsReporterType;
Expand Down Expand Up @@ -116,13 +118,25 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;


public static final String ORC_STRIPE_SIZE = "hoodie.hive.exec.orc.default.stripe.size";

public static final String DEFAULT_ORC_STRIPE_SIZE = String.valueOf(67108864);
public static final String ORC_BLOCK_SIZE = "hoodie.hive.exec.orc.default.block.size";
public static final String DEFAULT_ORC_BLOCK_SIZE = String.valueOf(268435456);
public static final String ORC_COLUMNS = "hoodie.orc.columns";
public static final String ORC_COLUMNS_TYPES = "hoodie.orc.columns.types";
public static final String ORC_BLOOM_FILTER_COLUMNS = "hoodie.orc.bloom.filter.columns";
public static final String ORC_BLOOM_FILTER_FPP = "hoodie.orc.bloom.filter.fpp";
public static final String DEFAULT_ORC_BLOOM_FILTER_FPP = String.valueOf(0.05);

/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
*
* <p>
* Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It
* only works with COW table. Hudi 0.5.x had stopped this behavior.
*
* <p>
* Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag
* (disabled by default) which will allow this old behavior.
*/
Expand Down Expand Up @@ -571,6 +585,35 @@ public Compression.Algorithm getHFileCompressionAlgorithm() {
return Compression.Algorithm.valueOf(props.getProperty(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM));
}

/**
* ORC configs .
*/

// orc
public long getOrcStripeSize() {
return Long.valueOf(props.getProperty(ORC_STRIPE_SIZE));
}

public long getOrcBlockSize() {
return Long.valueOf(props.getProperty(ORC_BLOCK_SIZE));
}

public String getOrcColumns() {
return props.getProperty(ORC_COLUMNS);
}

public String getOrcColumnsTypes() {
return props.getProperty(ORC_COLUMNS_TYPES);
}

public String getOrcBloomFilterColumns() {
return props.getProperty(ORC_BLOOM_FILTER_COLUMNS);
}

public String getOrcBloomFilterFpp() {
return props.getProperty(ORC_BLOOM_FILTER_FPP);
}

/**
* metrics properties.
*/
Expand Down Expand Up @@ -672,7 +715,7 @@ public String getPushGatewayJobName() {
public boolean getPushGatewayRandomJobNameSuffix() {
return Boolean.parseBoolean(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX));
}

/**
* memory configs.
*/
Expand Down Expand Up @@ -965,11 +1008,44 @@ public Builder withExternalSchemaTrasformation(boolean enabled) {
return this;
}

/**
* ORC Builder Methods ..
*/
public Builder withProperties(Properties properties) {
this.props.putAll(properties);
return this;
}

public Builder orcStripeSize(long stripeSize) {
props.setProperty(ORC_STRIPE_SIZE, String.valueOf(stripeSize));
return this;
}

public Builder orcBlockSize(long blockSize) {
props.setProperty(ORC_BLOCK_SIZE, String.valueOf(blockSize));
return this;
}

public Builder orcColumns(String columns) {
props.setProperty(ORC_COLUMNS, columns);
return this;
}

public Builder orcColumnsTypes(String columnsTypes) {
props.setProperty(ORC_COLUMNS_TYPES, columnsTypes);
return this;
}

public Builder orcBloomFilterColumns(String bloomFilterColumns) {
props.setProperty(ORC_BLOOM_FILTER_COLUMNS, bloomFilterColumns);
return this;
}

public Builder orcBloomFilterFpp(String bloomFilterFpp) {
props.setProperty(ORC_BLOOM_FILTER_FPP, bloomFilterFpp);
return this;
}

protected void setDefaults() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
Expand Down Expand Up @@ -1035,6 +1111,14 @@ protected void setDefaults() {
EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION);
setDefaultOnCondition(props, !props.containsKey(TIMELINE_LAYOUT_VERSION), TIMELINE_LAYOUT_VERSION,
String.valueOf(TimelineLayoutVersion.CURR_VERSION));

setDefaultOnCondition(props, !props.containsKey(ORC_STRIPE_SIZE),
ORC_STRIPE_SIZE, DEFAULT_ORC_STRIPE_SIZE);
setDefaultOnCondition(props, !props.containsKey(ORC_BLOCK_SIZE),
ORC_BLOCK_SIZE, DEFAULT_ORC_BLOCK_SIZE);
setDefaultOnCondition(props, !props.containsKey(ORC_BLOOM_FILTER_FPP),
ORC_BLOOM_FILTER_FPP, DEFAULT_ORC_BLOOM_FILTER_FPP);

}

private void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@

import java.io.IOException;

import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
import static org.apache.hudi.common.model.HoodieFileFormat.*;

public class HoodieFileWriterFactory {

Expand All @@ -49,6 +48,9 @@ public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFil
if (HFILE.getFileExtension().equals(extension)) {
return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
}
if (ORC.getFileExtension().equals(extension)) {
return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
}
throw new UnsupportedOperationException(extension + " format not supported yet.");
}

Expand Down Expand Up @@ -77,9 +79,16 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, sparkTaskContextSupplier);
}

private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {

return new HoodieOrcWriter<>(instantTime, path, config, hoodieTable.getHadoopConf(), sparkTaskContextSupplier);
}

private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
config.getDynamicBloomFilterMaxNumEntries(),
config.getBloomFilterType());
config.getDynamicBloomFilterMaxNumEntries(),
config.getBloomFilterType());
}
}
Loading