Skip to content

Commit b8fe5b9

Browse files
jintaoguanTeRS-K
andauthored
[HUDI-764] [HUDI-765] ORC reader writer Implementation (apache#2999)
Co-authored-by: Qingyun (Teresa) Kang <[email protected]>
1 parent cb642ce commit b8fe5b9

File tree

29 files changed

+2268
-91
lines changed

29 files changed

+2268
-91
lines changed

LICENSE

+12
Original file line numberDiff line numberDiff line change
@@ -333,3 +333,15 @@ Copyright (c) 2005, European Commission project OneLab under contract 034819 (ht
333333

334334
Home page: https://commons.apache.org/proper/commons-lang/
335335
License: http://www.apache.org/licenses/LICENSE-2.0
336+
337+
-------------------------------------------------------------------------------
338+
339+
This product includes code from StreamSets Data Collector
340+
341+
* com.streamsets.pipeline.lib.util.avroorc.AvroToOrcRecordConverter copied and modified to org.apache.hudi.common.util.AvroOrcUtils
342+
* com.streamsets.pipeline.lib.util.avroorc.AvroToOrcSchemaConverter copied and modified to org.apache.hudi.common.util.AvroOrcUtils
343+
344+
Copyright 2018 StreamSets Inc.
345+
346+
Home page: https://github.com/streamsets/datacollector-oss
347+
License: http://www.apache.org/licenses/LICENSE-2.0

NOTICE

+12
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,15 @@ its NOTICE file:
147147

148148
This product includes software developed at
149149
The Apache Software Foundation (http://www.apache.org/).
150+
151+
--------------------------------------------------------------------------------
152+
153+
This product includes code from StreamSets Data Collector, which includes the following in
154+
its NOTICE file:
155+
156+
StreamSets datacollector-oss
157+
Copyright 2018 StreamSets Inc.
158+
159+
This product includes software developed at
160+
StreamSets (http://www.streamsets.com/).
161+

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java

+42
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,21 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
3939
public static final String DEFAULT_PARQUET_BLOCK_SIZE_BYTES = DEFAULT_PARQUET_FILE_MAX_BYTES;
4040
public static final String PARQUET_PAGE_SIZE_BYTES = "hoodie.parquet.page.size";
4141
public static final String DEFAULT_PARQUET_PAGE_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
42+
4243
public static final String HFILE_FILE_MAX_BYTES = "hoodie.hfile.max.file.size";
4344
public static final String HFILE_BLOCK_SIZE_BYTES = "hoodie.hfile.block.size";
4445
public static final String DEFAULT_HFILE_BLOCK_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
4546
public static final String DEFAULT_HFILE_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024);
47+
48+
public static final String ORC_FILE_MAX_BYTES = "hoodie.orc.max.file.size";
49+
public static final String DEFAULT_ORC_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024);
50+
// size of the memory buffer in bytes for writing
51+
public static final String ORC_STRIPE_SIZE = "hoodie.orc.stripe.size";
52+
public static final String DEFAULT_ORC_STRIPE_SIZE = String.valueOf(64 * 1024 * 1024);
53+
// file system block size
54+
public static final String ORC_BLOCK_SIZE = "hoodie.orc.block.size";
55+
public static final String DEFAULT_ORC_BLOCK_SIZE = DEFAULT_ORC_FILE_MAX_BYTES;
56+
4657
// used to size log files
4758
public static final String LOGFILE_SIZE_MAX_BYTES = "hoodie.logfile.max.size";
4859
public static final String DEFAULT_LOGFILE_SIZE_MAX_BYTES = String.valueOf(1024 * 1024 * 1024); // 1 GB
@@ -54,9 +65,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
5465
public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
5566
public static final String PARQUET_COMPRESSION_CODEC = "hoodie.parquet.compression.codec";
5667
public static final String HFILE_COMPRESSION_ALGORITHM = "hoodie.hfile.compression.algorithm";
68+
public static final String ORC_COMPRESSION_CODEC = "hoodie.orc.compression.codec";
5769
// Default compression codec for parquet
5870
public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip";
5971
public static final String DEFAULT_HFILE_COMPRESSION_ALGORITHM = "GZ";
72+
public static final String DEFAULT_ORC_COMPRESSION_CODEC = "ZLIB";
6073
public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio";
6174
// Default compression ratio for log file to parquet, general 3x
6275
public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35);
@@ -140,6 +153,26 @@ public Builder logFileToParquetCompressionRatio(double logFileToParquetCompressi
140153
return this;
141154
}
142155

156+
public Builder orcMaxFileSize(long maxFileSize) {
157+
props.setProperty(ORC_FILE_MAX_BYTES, String.valueOf(maxFileSize));
158+
return this;
159+
}
160+
161+
public Builder orcStripeSize(int orcStripeSize) {
162+
props.setProperty(ORC_STRIPE_SIZE, String.valueOf(orcStripeSize));
163+
return this;
164+
}
165+
166+
public Builder orcBlockSize(int orcBlockSize) {
167+
props.setProperty(ORC_BLOCK_SIZE, String.valueOf(orcBlockSize));
168+
return this;
169+
}
170+
171+
public Builder orcCompressionCodec(String orcCompressionCodec) {
172+
props.setProperty(ORC_COMPRESSION_CODEC, orcCompressionCodec);
173+
return this;
174+
}
175+
143176
public HoodieStorageConfig build() {
144177
HoodieStorageConfig config = new HoodieStorageConfig(props);
145178
setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES), PARQUET_FILE_MAX_BYTES,
@@ -166,6 +199,15 @@ public HoodieStorageConfig build() {
166199
setDefaultOnCondition(props, !props.containsKey(HFILE_FILE_MAX_BYTES), HFILE_FILE_MAX_BYTES,
167200
DEFAULT_HFILE_FILE_MAX_BYTES);
168201

202+
setDefaultOnCondition(props, !props.containsKey(ORC_FILE_MAX_BYTES), ORC_FILE_MAX_BYTES,
203+
DEFAULT_ORC_FILE_MAX_BYTES);
204+
setDefaultOnCondition(props, !props.containsKey(ORC_STRIPE_SIZE), ORC_STRIPE_SIZE,
205+
DEFAULT_ORC_STRIPE_SIZE);
206+
setDefaultOnCondition(props, !props.containsKey(ORC_BLOCK_SIZE), ORC_BLOCK_SIZE,
207+
DEFAULT_ORC_BLOCK_SIZE);
208+
setDefaultOnCondition(props, !props.containsKey(ORC_COMPRESSION_CODEC), ORC_COMPRESSION_CODEC,
209+
DEFAULT_ORC_COMPRESSION_CODEC);
210+
169211
return config;
170212
}
171213
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

+17
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
4343
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
4444
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
45+
import org.apache.orc.CompressionKind;
4546
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
4647

4748
import javax.annotation.concurrent.Immutable;
@@ -784,6 +785,22 @@ public Compression.Algorithm getHFileCompressionAlgorithm() {
784785
return Compression.Algorithm.valueOf(props.getProperty(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM));
785786
}
786787

788+
public long getOrcMaxFileSize() {
789+
return Long.parseLong(props.getProperty(HoodieStorageConfig.ORC_FILE_MAX_BYTES));
790+
}
791+
792+
public int getOrcStripeSize() {
793+
return Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_STRIPE_SIZE));
794+
}
795+
796+
public int getOrcBlockSize() {
797+
return Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_BLOCK_SIZE));
798+
}
799+
800+
public CompressionKind getOrcCompressionCodec() {
801+
return CompressionKind.valueOf(props.getProperty(HoodieStorageConfig.ORC_COMPRESSION_CODEC));
802+
}
803+
787804
/**
788805
* metrics properties.
789806
*/

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java

+10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package org.apache.hudi.io.storage;
2020

21+
import java.util.concurrent.atomic.AtomicLong;
22+
import org.apache.avro.generic.GenericRecord;
23+
import org.apache.hudi.avro.HoodieAvroUtils;
2124
import org.apache.hudi.common.model.HoodieRecord;
2225

2326
import org.apache.avro.generic.IndexedRecord;
@@ -35,4 +38,11 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
3538
void writeAvro(String key, R oldRecord) throws IOException;
3639

3740
long getBytesWritten();
41+
42+
default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
43+
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
44+
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
45+
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
46+
return;
47+
}
3848
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java

+13
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import java.io.IOException;
3636

37+
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
3738
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
3839
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
3940

@@ -49,6 +50,9 @@ public static <T extends HoodieRecordPayload, R extends IndexedRecord, I, K, O>
4950
if (HFILE.getFileExtension().equals(extension)) {
5051
return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
5152
}
53+
if (ORC.getFileExtension().equals(extension)) {
54+
return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
55+
}
5256
throw new UnsupportedOperationException(extension + " format not supported yet.");
5357
}
5458

@@ -77,6 +81,15 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
7781
return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier);
7882
}
7983

84+
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter(
85+
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
86+
TaskContextSupplier taskContextSupplier) throws IOException {
87+
BloomFilter filter = createBloomFilter(config);
88+
HoodieOrcConfig orcConfig = new HoodieOrcConfig(hoodieTable.getHadoopConf(), config.getOrcCompressionCodec(),
89+
config.getOrcStripeSize(), config.getOrcBlockSize(), config.getOrcMaxFileSize(), filter);
90+
return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, taskContextSupplier);
91+
}
92+
8093
private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
8194
return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
8295
config.getDynamicBloomFilterMaxNumEntries(),

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,9 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
9999

100100
@Override
101101
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
102-
String seqId =
103-
HoodieRecord.generateSequenceId(instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement());
104-
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(),
105-
file.getName());
106-
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
107-
108-
writeAvro(record.getRecordKey(), (IndexedRecord)avroRecord);
102+
prepRecordWithMetadata(avroRecord, record, instantTime,
103+
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
104+
writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
109105
}
110106

111107
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.io.storage;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hudi.common.bloom.BloomFilter;
23+
import org.apache.orc.CompressionKind;
24+
25+
public class HoodieOrcConfig {
26+
static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema";
27+
28+
private final CompressionKind compressionKind;
29+
private final int stripeSize;
30+
private final int blockSize;
31+
private final long maxFileSize;
32+
private final Configuration hadoopConf;
33+
private final BloomFilter bloomFilter;
34+
35+
public HoodieOrcConfig(Configuration hadoopConf, CompressionKind compressionKind, int stripeSize,
36+
int blockSize, long maxFileSize, BloomFilter bloomFilter) {
37+
this.hadoopConf = hadoopConf;
38+
this.compressionKind = compressionKind;
39+
this.stripeSize = stripeSize;
40+
this.blockSize = blockSize;
41+
this.maxFileSize = maxFileSize;
42+
this.bloomFilter = bloomFilter;
43+
}
44+
45+
public Configuration getHadoopConf() {
46+
return hadoopConf;
47+
}
48+
49+
public CompressionKind getCompressionKind() {
50+
return compressionKind;
51+
}
52+
53+
public int getStripeSize() {
54+
return stripeSize;
55+
}
56+
57+
public int getBlockSize() {
58+
return blockSize;
59+
}
60+
61+
public long getMaxFileSize() {
62+
return maxFileSize;
63+
}
64+
65+
public boolean useBloomFilter() {
66+
return bloomFilter != null;
67+
}
68+
69+
public BloomFilter getBloomFilter() {
70+
return bloomFilter;
71+
}
72+
}

0 commit comments

Comments
 (0)