Skip to content

Commit d90574c

Browse files
ayushtkndeniskuzZ
andauthored
HIVE-29184: Iceberg: Basic Variant type support in Hive (#6069)
Co-authored-by: Denys Kuzmenko <[email protected]>
1 parent 53a42f5 commit d90574c

File tree

44 files changed

+3283
-18
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3283
-18
lines changed

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ Type convertType(TypeInfo typeInfo) {
143143
int listId = id++;
144144
Type listType = convertType(listTypeInfo.getListElementTypeInfo());
145145
return Types.ListType.ofOptional(listId, listType);
146-
case UNION:
146+
case VARIANT:
147+
return Types.VariantType.get();
147148
default:
148149
throw new IllegalArgumentException("Unknown type " + typeInfo.getCategory());
149150
}

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ public static String convertToTypeString(Type type) {
323323
case MAP:
324324
final Types.MapType mapType = type.asMapType();
325325
return String.format("map<%s,%s>", convert(mapType.keyType()), convert(mapType.valueType()));
326+
case VARIANT:
327+
return "variant";
326328
default:
327329
throw new UnsupportedOperationException(type + " is not supported");
328330
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iceberg.mr.hive;
2121

22+
import java.nio.ByteBuffer;
23+
import java.nio.ByteOrder;
2224
import java.util.List;
2325
import java.util.Map;
2426
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -27,6 +29,7 @@
2729
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
2830
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
2931
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
32+
import org.apache.hadoop.hive.serde2.variant.Variant;
3033
import org.apache.iceberg.Schema;
3134
import org.apache.iceberg.data.GenericRecord;
3235
import org.apache.iceberg.data.Record;
@@ -35,11 +38,13 @@
3538
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
3639
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
3740
import org.apache.iceberg.types.Type.PrimitiveType;
41+
import org.apache.iceberg.types.Types;
3842
import org.apache.iceberg.types.Types.ListType;
3943
import org.apache.iceberg.types.Types.MapType;
4044
import org.apache.iceberg.types.Types.NestedField;
4145
import org.apache.iceberg.types.Types.StructType;
42-
46+
import org.apache.iceberg.variants.VariantMetadata;
47+
import org.apache.iceberg.variants.VariantValue;
4348

4449
class Deserializer {
4550
private final FieldDeserializer fieldDeserializer;
@@ -164,6 +169,26 @@ public FieldDeserializer list(ListType listTypeInfo, ObjectInspectorPair pair, F
164169
};
165170
}
166171

172+
@Override
173+
public FieldDeserializer variant(Types.VariantType variantType, ObjectInspectorPair pair) {
174+
return variantObj -> {
175+
if (variantObj == null) {
176+
return null;
177+
}
178+
// Extract data from the struct representation
179+
StructObjectInspector variantOI = (StructObjectInspector) pair.sourceInspector();
180+
Variant variant = Variant.from(variantOI.getStructFieldsDataAsList(variantObj));
181+
182+
VariantMetadata metadata = VariantMetadata.from(
183+
ByteBuffer.wrap(variant.getMetadata()).order(ByteOrder.LITTLE_ENDIAN));
184+
185+
VariantValue value = VariantValue.from(metadata,
186+
ByteBuffer.wrap(variant.getValue()).order(ByteOrder.LITTLE_ENDIAN));
187+
188+
return org.apache.iceberg.variants.Variant.of(metadata, value);
189+
};
190+
}
191+
167192
@Override
168193
public FieldDeserializer map(MapType mapType, ObjectInspectorPair pair, FieldDeserializer keyDeserializer,
169194
FieldDeserializer valueDeserializer) {

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,8 @@ public ObjectInspector struct(Types.StructType structType, List<ObjectInspector>
152152
return new IcebergRecordObjectInspector(structType, fieldObjectInspectors);
153153
}
154154

155+
@Override
156+
public ObjectInspector variant(Types.VariantType variantType) {
157+
return IcebergVariantObjectInspector.get();
158+
}
155159
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.mr.hive.serde.objectinspector;
21+
22+
import java.nio.ByteBuffer;
23+
import java.util.List;
24+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
25+
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
26+
import org.apache.hadoop.hive.serde2.objectinspector.VariantObjectInspector;
27+
import org.apache.iceberg.variants.Variant;
28+
29+
/**
30+
* ObjectInspector for Iceberg's Variant type in Hive.
31+
* <p>
32+
* This ObjectInspector enables Hive to work with Iceberg's Variant type, which stores
33+
* polymorphic data in a single column. Variant types are particularly useful for
34+
* semi-structured data like JSON where the actual type may vary per row.
35+
* <p>
36+
* The ObjectInspector exposes each Variant as a Hive struct with two binary fields:
37+
* <ul>
38+
* <li><strong>metadata</strong>: Binary metadata containing type information and schema</li>
39+
* <li><strong>value</strong>: Binary representation of the actual data value</li>
40+
* </ul>
41+
* <p>
42+
*/
43+
public final class IcebergVariantObjectInspector extends VariantObjectInspector {
44+
45+
private static final ObjectInspector INSTANCE = new IcebergVariantObjectInspector();
46+
47+
private IcebergVariantObjectInspector() {
48+
}
49+
50+
public static ObjectInspector get() {
51+
return INSTANCE;
52+
}
53+
54+
@Override
55+
public Object getStructFieldData(Object data, StructField fieldRef) {
56+
if (data == null) {
57+
return null;
58+
}
59+
Variant variant = (Variant) data;
60+
MyField field = (MyField) fieldRef;
61+
62+
switch (field.getFieldID()) {
63+
case 0: // "metadata" field (binary)
64+
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
65+
variant.metadata().writeTo(metadata, 0);
66+
return metadata.array();
67+
case 1: // "value" field (binary)
68+
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
69+
variant.value().writeTo(value, 0);
70+
return value.array();
71+
default:
72+
throw new IllegalArgumentException("Unknown field position: " + field.getFieldID());
73+
}
74+
}
75+
76+
@Override
77+
public List<Object> getStructFieldsDataAsList(Object data) {
78+
if (data == null) {
79+
return null;
80+
}
81+
Variant variant = (Variant) data;
82+
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
83+
variant.metadata().writeTo(metadata, 0);
84+
85+
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
86+
variant.value().writeTo(value, 0);
87+
88+
// Return the data for our fields in the correct order: metadata, value
89+
return List.of(metadata.array(), value.array());
90+
}
91+
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/ParquetSchemaFieldNameVisitor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,17 @@ public Type list(Types.ListType iList, GroupType array, Type element) {
114114
return array;
115115
}
116116

117+
@Override
118+
public Type variant(Types.VariantType iVariant, GroupType variant, Type result) {
119+
if (variant.getId() != null) {
120+
typesById.put(variant.getId().intValue(), variant);
121+
}
122+
// Add the variant field name to the column names list
123+
appendToColNamesList(variant instanceof MessageType, variant.getName());
124+
125+
return variant;
126+
}
127+
117128
@Override
118129
public Type map(Types.MapType iMap, GroupType map, Type key, Type value) {
119130
if (map.getId() != null) {

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
7777

7878
@Override
7979
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
80-
builder.createWriterFunc(GenericParquetWriter::buildWriter);
80+
builder.createWriterFunc(GenericParquetWriter::create);
8181
}
8282

8383
@Override
@@ -87,7 +87,7 @@ protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
8787

8888
@Override
8989
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
90-
builder.createWriterFunc(GenericParquetWriter::buildWriter);
90+
builder.createWriterFunc(GenericParquetWriter::create);
9191
}
9292

9393
@Override
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
CREATE EXTERNAL TABLE variant_test_partition (
2+
id INT,
3+
data VARIANT
4+
) PARTITIONED BY spec (data)
5+
STORED BY ICEBERG tblproperties('format-version'='3');

0 commit comments

Comments
 (0)