diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index 1a051759b5bfb..c06b306047986 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -100,6 +100,16 @@ org.apache.spark spark-sql_${scala.binary.version} + + + org.apache.orc + orc-core + + + org.apache.orc + orc-mapreduce + + org.apache.spark @@ -256,7 +266,7 @@ ${hive.groupid} hive-exec ${hive.version} - test + ${hive.exec.classifier} diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 089474d15c2e8..cb5c9ab9713a0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -19,6 +19,7 @@ package org.apache.hudi.config; import org.apache.hadoop.hbase.io.compress.Compression; + import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.bootstrap.BootstrapMode; @@ -28,6 +29,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.MetricsReporterType; @@ -116,13 +118,25 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; + + public static final String ORC_STRIPE_SIZE = "hoodie.hive.exec.orc.default.stripe.size"; + + public static final String DEFAULT_ORC_STRIPE_SIZE = String.valueOf(67108864); + public static final String ORC_BLOCK_SIZE = "hoodie.hive.exec.orc.default.block.size"; + public static final String DEFAULT_ORC_BLOCK_SIZE = String.valueOf(268435456); + public static final String ORC_COLUMNS = "hoodie.orc.columns"; + public static final String ORC_COLUMNS_TYPES = "hoodie.orc.columns.types"; + public static final String ORC_BLOOM_FILTER_COLUMNS = "hoodie.orc.bloom.filter.columns"; + public static final String ORC_BLOOM_FILTER_FPP = "hoodie.orc.bloom.filter.fpp"; + public static final String DEFAULT_ORC_BLOOM_FILTER_FPP = String.valueOf(0.05); + /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow * multiple write operations (upsert/buk-insert/...) to be executed within a single commit. - * + *

* Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It * only works with COW table. Hudi 0.5.x had stopped this behavior. - * + *

* Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag * (disabled by default) which will allow this old behavior. */ @@ -571,6 +585,35 @@ public Compression.Algorithm getHFileCompressionAlgorithm() { return Compression.Algorithm.valueOf(props.getProperty(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM)); } + /** + * ORC configs . + */ + + // orc + public long getOrcStripeSize() { + return Long.valueOf(props.getProperty(ORC_STRIPE_SIZE)); + } + + public long getOrcBlockSize() { + return Long.valueOf(props.getProperty(ORC_BLOCK_SIZE)); + } + + public String getOrcColumns() { + return props.getProperty(ORC_COLUMNS); + } + + public String getOrcColumnsTypes() { + return props.getProperty(ORC_COLUMNS_TYPES); + } + + public String getOrcBloomFilterColumns() { + return props.getProperty(ORC_BLOOM_FILTER_COLUMNS); + } + + public String getOrcBloomFilterFpp() { + return props.getProperty(ORC_BLOOM_FILTER_FPP); + } + /** * metrics properties. */ @@ -672,7 +715,7 @@ public String getPushGatewayJobName() { public boolean getPushGatewayRandomJobNameSuffix() { return Boolean.parseBoolean(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX)); } - + /** * memory configs. */ @@ -965,11 +1008,44 @@ public Builder withExternalSchemaTrasformation(boolean enabled) { return this; } + /** + * ORC Builder Methods .. + */ public Builder withProperties(Properties properties) { this.props.putAll(properties); return this; } + public Builder orcStripeSize(long stripeSize) { + props.setProperty(ORC_STRIPE_SIZE, String.valueOf(stripeSize)); + return this; + } + + public Builder orcBlockSize(long blockSize) { + props.setProperty(ORC_BLOCK_SIZE, String.valueOf(blockSize)); + return this; + } + + public Builder orcColumns(String columns) { + props.setProperty(ORC_COLUMNS, columns); + return this; + } + + public Builder orcColumnsTypes(String columnsTypes) { + props.setProperty(ORC_COLUMNS_TYPES, columnsTypes); + return this; + } + + public Builder orcBloomFilterColumns(String bloomFilterColumns) { + props.setProperty(ORC_BLOOM_FILTER_COLUMNS, bloomFilterColumns); + return this; + } + + public Builder orcBloomFilterFpp(String bloomFilterFpp) { + props.setProperty(ORC_BLOOM_FILTER_FPP, bloomFilterFpp); + return this; + } + protected void setDefaults() { // Check for mandatory properties setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM); @@ -1035,6 +1111,14 @@ protected void setDefaults() { EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION); setDefaultOnCondition(props, !props.containsKey(TIMELINE_LAYOUT_VERSION), TIMELINE_LAYOUT_VERSION, String.valueOf(TimelineLayoutVersion.CURR_VERSION)); + + setDefaultOnCondition(props, !props.containsKey(ORC_STRIPE_SIZE), + ORC_STRIPE_SIZE, DEFAULT_ORC_STRIPE_SIZE); + setDefaultOnCondition(props, !props.containsKey(ORC_BLOCK_SIZE), + ORC_BLOCK_SIZE, DEFAULT_ORC_BLOCK_SIZE); + setDefaultOnCondition(props, !props.containsKey(ORC_BLOOM_FILTER_FPP), + ORC_BLOOM_FILTER_FPP, DEFAULT_ORC_BLOOM_FILTER_FPP); + } private void validate() { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 1d4a9a28e54f9..d3bf0b774bf65 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -34,8 +34,7 @@ import java.io.IOException; -import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; -import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; +import static org.apache.hudi.common.model.HoodieFileFormat.*; public class HoodieFileWriterFactory { @@ -49,6 +48,9 @@ public static HoodieFil if (HFILE.getFileExtension().equals(extension)) { return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier); } + if (ORC.getFileExtension().equals(extension)) { + return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier); + } throw new UnsupportedOperationException(extension + " format not supported yet."); } @@ -77,9 +79,16 @@ private static HoodieFi return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, sparkTaskContextSupplier); } + private static HoodieFileWriter newOrcFileWriter( + String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable, + SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException { + + return new HoodieOrcWriter<>(instantTime, path, config, hoodieTable.getHadoopConf(), sparkTaskContextSupplier); + } + private static BloomFilter createBloomFilter(HoodieWriteConfig config) { return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(), - config.getDynamicBloomFilterMaxNumEntries(), - config.getBloomFilterType()); + config.getDynamicBloomFilterMaxNumEntries(), + config.getBloomFilterType()); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java new file mode 100644 index 0000000000000..7d8c3a98549af --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.ql.io.orc.CompressionKind; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; + +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.common.util.StringUtils; + +import org.apache.orc.OrcFile.EncodingStrategy; +import org.apache.orc.OrcConf; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions; +import org.apache.hadoop.hive.ql.io.orc.Writer; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; + +/** + * HoodieOrcWriter use hive's Writer to help limit the size of underlying file. Provides + * a way to check if the current file can take more records with the canWrite() + */ +public class HoodieOrcWriter + implements HoodieFileWriter { + + private static AtomicLong recordIndex = new AtomicLong(1); + + private final long maxFileSize; + private final long orcStripeSize; + private final Path file; + private final HoodieWrapperFileSystem fs; + private final String commitTime; + private final Writer writer; + private final List columnNames; + private final List fieldTypes; + private final List fieldObjectInspectors; + private final StandardStructObjectInspector hudiObjectInspector; + private final SparkTaskContextSupplier sparkTaskContextSupplier; + + public HoodieOrcWriter(String commitTime, Path file, HoodieWriteConfig config, + Configuration hadoopConfig, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException { + // default value is 64M + this.orcStripeSize = config.getOrcStripeSize(); + // default value is 256M + long orcBlockSize = config.getOrcBlockSize(); + this.maxFileSize = orcBlockSize + (2 * orcStripeSize); + + this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, hadoopConfig); + this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(registerFileSystem(file, hadoopConfig)); + this.commitTime = commitTime; + this.sparkTaskContextSupplier = sparkTaskContextSupplier; + + // Read the configuration parameters + String columnNameProperty = config.getOrcColumns(); + columnNames = new ArrayList<>(); + if (columnNameProperty != null && columnNameProperty.length() > 0) { + Collections.addAll(columnNames, columnNameProperty.split(String.valueOf(SerDeUtils.COMMA))); + } + columnNames.add(HoodieRecord.COMMIT_TIME_METADATA_FIELD); + columnNames.add(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD); + columnNames.add(HoodieRecord.RECORD_KEY_METADATA_FIELD); + columnNames.add(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + columnNames.add(HoodieRecord.FILENAME_METADATA_FIELD); + + String columnTypeProperty = config.getOrcColumnsTypes(); + if (columnTypeProperty != null) { + columnTypeProperty = columnTypeProperty + ",string,string,string,string,string"; + } else { + // Default type: all string + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < columnNames.size(); i++) { + if (i > 0) { + sb.append(SerDeUtils.COMMA); + } + sb.append("string"); + } + columnTypeProperty = sb.toString(); + } + fieldTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + + fieldObjectInspectors = new ArrayList<>(columnNames.size()); + for (TypeInfo typeInfo : fieldTypes) { + fieldObjectInspectors.add(createObjectInspector(typeInfo)); + } + hudiObjectInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(columnNames, fieldObjectInspectors); + + Configuration hudiConf = registerFileSystem(file, hadoopConfig); + String bloomFilterColumns = config.getOrcBloomFilterColumns(); + if (null != bloomFilterColumns) { + bloomFilterColumns = bloomFilterColumns + String.valueOf(SerDeUtils.COMMA) + + HoodieRecord.RECORD_KEY_METADATA_FIELD; + } else { + bloomFilterColumns = HoodieRecord.RECORD_KEY_METADATA_FIELD; + } + hudiConf.set(OrcConf.BLOOM_FILTER_COLUMNS.name(), bloomFilterColumns); + String bloomFilterFpp = config.getOrcBloomFilterFpp(); + + if (StringUtils.isNullOrEmpty(bloomFilterFpp)) { + bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDefaultValue().toString(); + } + + hudiConf.set(OrcConf.BLOOM_FILTER_FPP.name(), bloomFilterFpp); + + WriterOptions writerOptions = getOptions(hudiConf, config.getProps()); + writerOptions.inspector(hudiObjectInspector); + writerOptions.stripeSize(orcStripeSize); + writerOptions.blockSize(orcBlockSize); + this.writer = OrcFile.createWriter(file, writerOptions); + } + + public static Configuration registerFileSystem(Path file, Configuration conf) { + Configuration returnConf = new Configuration(conf); + String scheme = FSUtils.getFs(file.toString(), conf).getScheme(); + returnConf.set("fs." + HoodieWrapperFileSystem.getHoodieScheme(scheme) + ".impl", + HoodieWrapperFileSystem.class.getName()); + return returnConf; + } + + @Override + public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { + GenericRecord genericRecord = (GenericRecord) avroRecord; + String seqId = HoodieRecord.generateSequenceId(commitTime, sparkTaskContextSupplier.getPartitionIdSupplier().get(), + recordIndex.getAndIncrement()); + + HoodieAvroUtils.addHoodieKeyToRecord(genericRecord, record.getRecordKey(), + record.getPartitionPath(), file.getName()); + + GenericRecord metadataToRecord = HoodieAvroUtils + .addCommitMetadataToRecord(genericRecord, commitTime, seqId); + + Schema avroSchema = metadataToRecord.getSchema(); + + Object row = hudiObjectInspector.create(); + setOrcFiledValue((R) avroRecord, avroSchema, hudiObjectInspector, row); + this.writer.addRow(row); + } + + + @Override + public boolean canWrite() { + return fs.getBytesWritten(file) < maxFileSize; + } + + @Override + public void writeAvro(String key, R avroRecord) throws IOException { + Schema avroSchema = avroRecord.getSchema(); + Object row = hudiObjectInspector.create(); + setOrcFiledValue((R) avroRecord, avroSchema, hudiObjectInspector, row); + this.writer.addRow(row); + } + + @Override + public void close() throws IOException { + if (null != this.writer) { + this.writer.close(); + } + } + + private void setOrcFiledValue(R avroRecord, Schema avroSchema, + StandardStructObjectInspector hudi, Object row) throws IOException { + // Otherwise, return the row. + for (int c = 0; c < columnNames.size(); c++) { + try { + // orc + String fieldName = columnNames.get(c); + TypeInfo typeInfo = fieldTypes.get(c); + StructField structField = hudi.getAllStructFieldRefs().get(c); + + //avro + Schema.Field field = avroSchema.getField(fieldName); + Object fieldValue = avroRecord.get(field.pos()); + + // Convert the column to the correct type when needed and set in row obj + PrimitiveTypeInfo pti = (PrimitiveTypeInfo) typeInfo; + switch (pti.getPrimitiveCategory()) { + case STRING: + Text t = new Text(String.valueOf(fieldValue)); + hudi.setStructFieldData(row, structField, t); + break; + case BYTE: + ByteWritable b = new ByteWritable(Byte.valueOf(String.valueOf(fieldValue))); + hudi.setStructFieldData(row, structField, b); + break; + case SHORT: + ShortWritable s = new ShortWritable(Short.valueOf(String.valueOf(fieldValue))); + hudi.setStructFieldData(row, structField, s); + break; + case INT: + IntWritable i = new IntWritable(Integer.valueOf(String.valueOf(fieldValue))); + hudi.setStructFieldData(row, structField, i); + break; + case LONG: + LongWritable l = new LongWritable(Long.valueOf(String.valueOf(fieldValue))); + hudi.setStructFieldData(row, structField, l); + break; + case FLOAT: + FloatWritable f = new FloatWritable(Float.valueOf(String.valueOf(fieldValue))); + hudi.setStructFieldData(row, structField, f); + break; + case DOUBLE: + DoubleWritable d = new DoubleWritable(Double.valueOf(String.valueOf(fieldValue))); + hudi.setStructFieldData(row, structField, d); + break; + case BOOLEAN: + BooleanWritable bool = new BooleanWritable(Boolean.valueOf(String.valueOf(fieldValue))); + hudi.setStructFieldData(row, structField, bool); + break; + case TIMESTAMP: + TimestampWritable ts = new TimestampWritable(Timestamp.valueOf(String.valueOf(fieldValue))); + hudi.setStructFieldData(row, structField, ts); + break; + case DATE: + DateWritable date = new DateWritable(Date.valueOf(String.valueOf(fieldValue))); + hudi.setStructFieldData(row, structField, date); + break; + case DECIMAL: + HiveDecimal db = HiveDecimal.create(String.valueOf(fieldValue)); + HiveDecimalWritable dbw = new HiveDecimalWritable(db); + hudi.setStructFieldData(row, structField, dbw); + break; + case CHAR: + HiveChar hc = new HiveChar(String.valueOf(fieldValue), ((CharTypeInfo) typeInfo).getLength()); + HiveCharWritable hcw = new HiveCharWritable(hc); + hudi.setStructFieldData(row, structField, hcw); + break; + case VARCHAR: + HiveVarchar hv = new HiveVarchar(String.valueOf(fieldValue), ((VarcharTypeInfo) typeInfo).getLength()); + HiveVarcharWritable hvw = new HiveVarcharWritable(hv); + hudi.setStructFieldData(row, structField, hvw); + break; + default: + throw new IOException("Unsupported type " + typeInfo); + } + } catch (RuntimeException e) { + hudi.setStructFieldData(row, null, null); + } + } + } + + private OrcFile.WriterOptions getOptions(Configuration conf, Properties props) { + OrcFile.WriterOptions options = OrcFile.writerOptions(conf); + String propVal; + if ((propVal = getSettingFromPropsFallingBackToConf(OrcConf.STRIPE_SIZE.name(), + props, conf)) != null) { + options.stripeSize(Long.parseLong(propVal)); + } + + if ((propVal = getSettingFromPropsFallingBackToConf(OrcConf.COMPRESS.name(), + props, conf)) != null) { + options.compress(CompressionKind.valueOf(propVal)); + } + + if ((propVal = getSettingFromPropsFallingBackToConf(OrcConf.BUFFER_SIZE.name(), + props, conf)) != null) { + options.bufferSize(Integer.parseInt(propVal)); + } + + if ((propVal = getSettingFromPropsFallingBackToConf(OrcConf.ROW_INDEX_STRIDE.name(), + props, conf)) != null) { + options.rowIndexStride(Integer.parseInt(propVal)); + } + + if ((propVal = getSettingFromPropsFallingBackToConf(OrcConf.ENABLE_INDEXES.name(), + props, conf)) != null) { + if ("false".equalsIgnoreCase(propVal)) { + options.rowIndexStride(0); + } + } + + if ((propVal = getSettingFromPropsFallingBackToConf(OrcConf.BLOCK_PADDING.name(), + props, conf)) != null) { + options.blockPadding(Boolean.parseBoolean(propVal)); + } + + if ((propVal = getSettingFromPropsFallingBackToConf(OrcConf.ENCODING_STRATEGY.name(), + props, conf)) != null) { + options.encodingStrategy(EncodingStrategy.valueOf(propVal)); + } + + if ((propVal = getSettingFromPropsFallingBackToConf(OrcConf.BLOOM_FILTER_COLUMNS.name(), + props, conf)) != null) { + options.bloomFilterColumns(propVal); + } + + if ((propVal = getSettingFromPropsFallingBackToConf(OrcConf.BLOOM_FILTER_FPP.name(), + props, conf)) != null) { + options.bloomFilterFpp(Double.parseDouble(propVal)); + } + + return options; + } + + /** + * Helper method to get a parameter first from props if present, falling back to JobConf if not. + * Returns null if key is present in neither. + */ + private String getSettingFromPropsFallingBackToConf(String key, Properties props, Configuration conf) { + if ((props != null) && props.containsKey(key)) { + return props.getProperty(key); + } else if (conf != null) { + // If conf is not null, and the key is not present, Configuration.get() will + // return null for us. So, we don't have to check if it contains it. + return conf.get(key); + } else { + return null; + } + } + + private static ObjectInspector createObjectInspector(TypeInfo typeInfo) { + PrimitiveTypeInfo pti = (PrimitiveTypeInfo) typeInfo; + switch (pti.getPrimitiveCategory()) { + case FLOAT: + return PrimitiveObjectInspectorFactory.writableFloatObjectInspector; + case DOUBLE: + return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + case BOOLEAN: + return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector; + case BYTE: + return PrimitiveObjectInspectorFactory.writableByteObjectInspector; + case SHORT: + return PrimitiveObjectInspectorFactory.writableShortObjectInspector; + case INT: + return PrimitiveObjectInspectorFactory.writableIntObjectInspector; + case LONG: + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + case BINARY: + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + case STRING: + return PrimitiveObjectInspectorFactory.writableStringObjectInspector; + case TIMESTAMP: + return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector; + case DATE: + return PrimitiveObjectInspectorFactory.writableDateObjectInspector; + case VARCHAR: + case CHAR: + case DECIMAL: + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(pti); + default: + throw new IllegalArgumentException("Unknown primitive type " + pti.getPrimitiveCategory()); + } + } +} \ No newline at end of file diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java index 6ea2b7e1d026b..ad13ce346ef48 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java @@ -55,6 +55,11 @@ public void testGetFileWriter() throws IOException { hfilePath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); assertTrue(hfileWriter instanceof HoodieHFileWriter); + final Path orcPath = new Path(basePath + "/partition/path/f1_1-0-1_000.orc"); + HoodieFileWriter orcFileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, + orcPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); + assertTrue(orcFileWriter instanceof HoodieOrcWriter); + // other file format exception. final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> { diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcWriter.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcWriter.java new file mode 100644 index 0000000000000..ec863e7003aa5 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcWriter.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.apache.hudi.common.testutils.RawTripTestPayload; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class TestHoodieOrcWriter { + + private JavaSparkContext jsc = null; + private String basePath = null; + private transient FileSystem fs; + private String schemaStr; + private Schema schema; + private final ObjectMapper mapper = new ObjectMapper(); + + @Before + public void init() throws IOException { + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieOrcWriter")); + TemporaryFolder folder = new TemporaryFolder(); + folder.create(); + basePath = folder.getRoot().getAbsolutePath(); + fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); + HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); + schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); + schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); + } + + @Test + public void testWriteOrcFile() throws Exception { + // Create some records to use + String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + String recordStr4 = "{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":32}"; + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), + rowChange1); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), + rowChange2); + RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); + HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), + rowChange3); + RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4); + HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), + rowChange4); + + // We write record1, record2 to a orc file, but the bloom filter contains (record1, + // record2, record3). + BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); + filter.add(record3.getRecordKey()); + + String commitTime = HoodieTestUtils.makeNewCommitTime(); + + Path path = new Path(basePath + "/orc_dc.orc"); + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .orcBloomFilterColumns("_hoodie_record_key") + .orcColumns("_row_key,time,number,_hoodie_commit_time," + + "_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path") + .orcColumnsTypes("string,string,int,string,string,string,string") + .build(); + + SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); + + HoodieOrcWriter writer = new HoodieOrcWriter(commitTime, + path, + config, + new Configuration(), supplier); + + + List> beforeList = new LinkedList<>(); + List records = Arrays.asList(record1, record2, record3, record4); + int seqId = 1; + for (HoodieRecord record : records) { + GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); + HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++); + HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), "orc"); + writer.writeAvro(record.getRecordKey(), avroRecord); + filter.add(record.getRecordKey()); + + Map recordMap = mapper.readValue(avroRecord.toString(), Map.class); + beforeList.add(recordMap); + } + writer.close(); + + // value has been written to orc, read it out and verify. + OrcSerde serde = new OrcSerde(); + Properties p = new Properties(); + p.setProperty("columns", "_row_key,time,number,_hoodie_commit_time," + + "_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path"); + p.setProperty("columns.types", "string,string,int,string,string,string,string"); + serde.initialize(HoodieTestUtils.getDefaultHadoopConf(), p); + StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector(); + + Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(HoodieTestUtils.getDefaultHadoopConf())); + + Object row = null; + RecordReader rows = reader.rows(); + List fields = inspector.getAllStructFieldRefs(); + List> afterList = new LinkedList<>(); + while (rows.hasNext()) { + row = rows.next(row); + + StringBuilder sb = new StringBuilder(); + sb.append("{") + .append("\"").append("_row_key").append("\"").append(":").append("\"") + .append(inspector.getStructFieldData(row, fields.get(0))).append("\",") + .append("\"").append("time").append("\"").append(":").append("\"") + .append(inspector.getStructFieldData(row, fields.get(1))).append("\",") + .append("\"").append("number").append("\"").append(":") + .append(inspector.getStructFieldData(row, fields.get(2))).append(",") + .append("\"").append("_hoodie_commit_time").append("\"").append(":").append("\"") + .append(inspector.getStructFieldData(row, fields.get(3))).append("\",") + .append("\"").append("_hoodie_commit_seqno").append("\"").append(":").append("\"") + .append(inspector.getStructFieldData(row, fields.get(4))).append("\",") + .append("\"").append("_hoodie_record_key").append("\"").append(":").append("\"") + .append(inspector.getStructFieldData(row, fields.get(5))).append("\",") + .append("\"").append("_hoodie_partition_path").append("\"").append(":").append("\"") + .append(inspector.getStructFieldData(row, fields.get(6))).append("\"") + .append("}"); + + Map recordMap = mapper.readValue(sb.toString(), Map.class); + afterList.add(recordMap); + } + + assertEquals(beforeList.size(), afterList.size()); + for (int i = 0; i < beforeList.size(); i++) { + Map before = beforeList.get(i); + Map after = afterList.get(i); + + assertEquals(before.get("_row_key"), after.get("_row_key")); + assertEquals(before.get("time"), after.get("time")); + assertEquals(before.get("number"), after.get("number")); + assertEquals(before.get("_hoodie_commit_time"), after.get("_hoodie_commit_time")); + assertEquals(before.get("_hoodie_commit_seqno"), after.get("_hoodie_commit_seqno")); + assertEquals(before.get("_hoodie_record_key"), after.get("_hoodie_record_key")); + assertEquals(before.get("_hoodie_partition_path"), after.get("_hoodie_partition_path")); + } + + } + + @After + public void clean() { + if (basePath != null) { + new File(basePath).delete(); + } + if (jsc != null) { + jsc.stop(); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java index 552c38ffd9bd1..f7fdcd01d5006 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java @@ -24,7 +24,8 @@ public enum HoodieFileFormat { PARQUET(".parquet"), HOODIE_LOG(".log"), - HFILE(".hfile"); + HFILE(".hfile"), + ORC(".orc"); private final String extension; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieOrcInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieOrcInputFormat.java new file mode 100644 index 0000000000000..f88e2a4ec72fc --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieOrcInputFormat.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; + +@UseFileSplitsFromInputFormat +public class HoodieOrcInputFormat extends OrcInputFormat implements Configurable { + public static final Log LOG = LogFactory.getLog(HoodieOrcInputFormat.class); + + protected Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieOrcRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieOrcRealtimeInputFormat.java new file mode 100644 index 0000000000000..b2939c184edd6 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieOrcRealtimeInputFormat.java @@ -0,0 +1,77 @@ +package org.apache.hudi.hadoop.realtime; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.hadoop.HoodieOrcInputFormat; +import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; + +@UseFileSplitsFromInputFormat +public class HoodieOrcRealtimeInputFormat extends HoodieOrcInputFormat implements Configurable { + + public static final Log LOG = LogFactory.getLog(HoodieOrcRealtimeInputFormat.class); + + // These positions have to be deterministic across all tables + public static final int HOODIE_COMMIT_TIME_COL_POS = 0; + public static final int HOODIE_RECORD_KEY_COL_POS = 2; + public static final int HOODIE_PARTITION_PATH_COL_POS = 3; + + protected Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = addRequiredProjectionFields(conf); + } + + @Override + public Configuration getConf() { + return conf; + } + + private static Configuration addRequiredProjectionFields(Configuration configuration) { + // Need this to do merge records in HoodieRealtimeRecordReader + configuration = addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, + HOODIE_RECORD_KEY_COL_POS); + configuration = addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, + HOODIE_COMMIT_TIME_COL_POS); + configuration = addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, + HOODIE_PARTITION_PATH_COL_POS); + return configuration; + } + + /** + * Add a field to the existing fields projected + */ + private static Configuration addProjectionField(Configuration conf, String fieldName, + int fieldIndex) { + String readColNames = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""); + String readColIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, ""); + + String readColNamesPrefix = readColNames + ","; + if (readColNames == null || readColNames.isEmpty()) { + readColNamesPrefix = ""; + } + String readColIdsPrefix = readColIds + ","; + if (readColIds == null || readColIds.isEmpty()) { + readColIdsPrefix = ""; + } + + if (!readColNames.contains(fieldName)) { + // If not already in the list - then add it + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNamesPrefix + fieldName); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ", + conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), + conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR))); + } + } + return conf; + } +}