Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive.serde.objectinspector;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
Expand Down Expand Up @@ -61,11 +62,13 @@ public Object getStructFieldData(Object data, StructField fieldRef) {

switch (field.getFieldID()) {
case 0: // "metadata" field (binary)
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.metadata().writeTo(metadata, 0);
return metadata.array();
case 1: // "value" field (binary)
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.value().writeTo(value, 0);
return value.array();
default:
Expand All @@ -79,10 +82,12 @@ public List<Object> getStructFieldsDataAsList(Object data) {
return null;
}
Variant variant = (Variant) data;
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.metadata().writeTo(metadata, 0);

ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.value().writeTo(value, 0);

// Return the data for our fields in the correct order: metadata, value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.mr.hive.writer;

import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand All @@ -31,9 +32,13 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;

class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {

private final Map<String, String> properties;
private Record sampleRecord = null;

HiveFileWriterFactory(
Table table,
FileFormat dataFileFormat,
Expand All @@ -54,6 +59,7 @@ class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
equalityDeleteRowSchema,
equalityDeleteSortOrder,
positionDeleteRowSchema);
properties = table.properties();
}

static Builder builderFor(Table table) {
Expand All @@ -78,6 +84,11 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
// Configure variant shredding function if conditions are met:
if (hasVariantColumns(dataSchema()) && isVariantShreddingEnabled(properties)) {
builder.variantShreddingFunc(
Parquet.constructVariantShreddingFunction(sampleRecord, dataSchema()));
}
}

@Override
Expand Down Expand Up @@ -149,4 +160,30 @@ HiveFileWriterFactory build() {
positionDeleteRowSchema);
}
}

/**
* Check if the schema contains any variant columns.
*/
private static boolean hasVariantColumns(Schema schema) {
return schema.columns().stream()
.anyMatch(field -> field.type() instanceof Types.VariantType);
}

/**
* Check if variant shredding is enabled via table properties.
*/
private static boolean isVariantShreddingEnabled(Map<String, String> properties) {
String shreddingEnabled = properties.get("variant.shredding.enabled");
return "true".equalsIgnoreCase(shreddingEnabled);
}

/**
* Set a sample record to use for data-driven variant shredding schema generation.
* Should be called before the Parquet writer is created.
*/
public void initialize(Record record) {
if (this.sampleRecord != null) {
this.sampleRecord = record;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
private final Set<String> missingColumns;
private final List<Types.NestedField> missingOrStructFields;

private final HiveFileWriterFactory fileWriterFactory;

HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
OutputFileFactory dataFileFactory, Context context) {
super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, context));
Expand All @@ -48,17 +50,18 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
this.missingColumns = context.missingColumns();
this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream()
.filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()).toList();
this.fileWriterFactory = fileWriterFactory;
}

@Override
public void write(Writable row) throws IOException {
Record record = ((Container<Record>) row).get();
HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, missingColumns);
fileWriterFactory.initialize(record);

writer.write(record, specs.get(currentSpecId), partition(record, currentSpecId));
}


@Override
public FilesForCommit files() {
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
Expand Down
Loading