Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ public VariantValue get(int index) {
public ByteBuffer buffer() {
return value;
}

@Override
public String toString() {
return VariantArray.asString(this);
}
}
19 changes: 19 additions & 0 deletions api/src/main/java/org/apache/iceberg/variants/VariantArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,25 @@

/** An variant array value. */
public interface VariantArray extends VariantValue {
static String asString(VariantArray arr) {
StringBuilder builder = new StringBuilder();

builder.append("VariantArray([");
boolean first = true;
for (int i = 0; i < arr.numElements(); i++) {
if (first) {
first = false;
} else {
builder.append(", ");
}

builder.append(arr.get(i));
}
builder.append("])");

return builder.toString();
}

/** Returns the {@link VariantValue} at {@code index} in this array. */
VariantValue get(int index);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ public static ByteBuffer createObject(VariantMetadata metadata, Map<String, Vari
return buffer;
}

static ByteBuffer createArray(Serialized... values) {
public static ByteBuffer createArray(VariantValue... values) {
int numElements = values.length;
boolean isLarge = numElements > 0xFF;

int dataSize = 0;
for (Serialized value : values) {
for (VariantValue value : values) {
// TODO: produce size for every variant without serializing
dataSize += value.buffer().remaining();
dataSize += value.sizeInBytes();
}

// offset size is the size needed to store the length of the data section
Expand All @@ -260,13 +260,11 @@ static ByteBuffer createArray(Serialized... values) {
// write values and offsets
int nextOffset = 0; // the first offset is always 0
int index = 0;
for (Serialized value : values) {
for (VariantValue value : values) {
// write the offset and value
VariantUtil.writeLittleEndianUnsigned(
buffer, nextOffset, offsetListOffset + (index * offsetSize), offsetSize);
// in a real implementation, the buffer should be passed to serialize
ByteBuffer valueBuffer = value.buffer();
int valueSize = writeBufferAbsolute(buffer, dataOffset + nextOffset, valueBuffer);
int valueSize = value.writeTo(buffer, dataOffset + nextOffset);
// update next offset and index
nextOffset += valueSize;
index += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,9 @@ private int writeTo(ByteBuffer buffer, int offset) {
return (dataOffset - offset) + dataSize;
}
}

@Override
public String toString() {
return VariantArray.asString(this);
}
}
9 changes: 8 additions & 1 deletion core/src/test/java/org/apache/iceberg/RandomVariants.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.ShreddedObject;
import org.apache.iceberg.variants.ValueArray;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantTestUtil;
Expand Down Expand Up @@ -120,7 +121,13 @@ private static VariantValue randomVariant(
byte[] uuidBytes = (byte[]) RandomUtil.generatePrimitive(Types.UUIDType.get(), random);
return Variants.of(type, UUIDUtil.convert(uuidBytes));
case ARRAY:
// for now, generate an object instead of an array
ValueArray arr = Variants.array();
int numElements = random.nextInt(10);
for (int i = 0; i < numElements; i += 1) {
arr.add(randomVariant(random, metadata, randomType(random)));
}

return arr;
case OBJECT:
ShreddedObject object = Variants.object(metadata);
if (metadata.dictionarySize() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.ShreddedObject;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantArray;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantObject;
import org.apache.iceberg.variants.VariantValue;
Expand Down Expand Up @@ -98,6 +99,26 @@ static ParquetValueWriter<VariantValue> objects(
builder.build());
}

public static ParquetValueWriter<?> array(
int valueDefinitionLevel,
ParquetValueWriter<?> valueWriter,
int typedDefinitionLevel,
int repeatedDefinitionLevel,
int repeatedRepetitionLevel,
ParquetValueWriter<?> elementWriter) {
ArrayWriter typedWriter =
new ArrayWriter(
repeatedDefinitionLevel,
repeatedRepetitionLevel,
(ParquetValueWriter<VariantValue>) elementWriter);

return new ArrayValueWriter(
valueDefinitionLevel,
(ParquetValueWriter<VariantValue>) valueWriter,
typedDefinitionLevel,
typedWriter);
}

private static class VariantWriter implements ParquetValueWriter<Variant> {
private final ParquetValueWriter<VariantMetadata> metadataWriter;
private final ParquetValueWriter<VariantValue> valueWriter;
Expand Down Expand Up @@ -360,6 +381,92 @@ public void setColumnStore(ColumnWriteStore columnStore) {
}
}

private static class ArrayValueWriter implements ParquetValueWriter<VariantValue> {
private final int valueDefinitionLevel;
private final ParquetValueWriter<VariantValue> valueWriter;
private final int typedDefinitionLevel;
private final ArrayWriter typedWriter;
private final List<TripleWriter<?>> children;

private ArrayValueWriter(
int valueDefinitionLevel,
ParquetValueWriter<VariantValue> valueWriter,
int typedDefinitionLevel,
ArrayWriter typedWriter) {
this.valueDefinitionLevel = valueDefinitionLevel;
this.valueWriter = valueWriter;
this.typedDefinitionLevel = typedDefinitionLevel;
this.typedWriter = typedWriter;
this.children = children(valueWriter, typedWriter);
}

@Override
public void write(int repetitionLevel, VariantValue value) {
if (value.type() == PhysicalType.ARRAY) {
typedWriter.write(repetitionLevel, value);
writeNull(valueWriter, repetitionLevel, valueDefinitionLevel);
} else {
valueWriter.write(repetitionLevel, value);
writeNull(typedWriter, repetitionLevel, typedDefinitionLevel);
}
}

@Override
public List<TripleWriter<?>> columns() {
return children;
}

@Override
public void setColumnStore(ColumnWriteStore columnStore) {
valueWriter.setColumnStore(columnStore);
typedWriter.setColumnStore(columnStore);
}
}

private static class ArrayWriter implements ParquetValueWriter<VariantValue> {
private final int definitionLevel;
private final int repetitionLevel;
private final ParquetValueWriter<VariantValue> writer;
private final List<TripleWriter<?>> children;

private ArrayWriter(
int definitionLevel, int repetitionLevel, ParquetValueWriter<VariantValue> writer) {
this.definitionLevel = definitionLevel;
this.repetitionLevel = repetitionLevel;
this.writer = writer;
this.children = writer.columns();
}

@Override
public void write(int parentRepetition, VariantValue value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct to me.

VariantArray arr = value.asArray();
if (arr.numElements() == 0) {
writeNull(writer, parentRepetition, definitionLevel);
} else {
for (int i = 0; i < arr.numElements(); i++) {
VariantValue element = arr.get(i);

int rl = repetitionLevel;
if (i == 0) {
rl = parentRepetition;
}

writer.write(rl, element);
}
}
}

@Override
public List<TripleWriter<?>> columns() {
return children;
}

@Override
public void setColumnStore(ColumnWriteStore columnStore) {
writer.setColumnStore(columnStore);
}
}

private static void writeNull(
ParquetValueWriter<?> writer, int repetitionLevel, int definitionLevel) {
for (TripleWriter<?> column : writer.columns()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ public ParquetValueWriter<?> object(
@Override
public ParquetValueWriter<?> array(
GroupType array, ParquetValueWriter<?> valueWriter, ParquetValueWriter<?> elementWriter) {
throw new UnsupportedOperationException("Array is not yet supported");
int valueDL = schema.getMaxDefinitionLevel(path(VALUE));
int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE));
int repeatedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE, LIST));
int repeatedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE, LIST));
return ParquetVariantWriters.array(
valueDL, valueWriter, typedDL, repeatedDL, repeatedRL, elementWriter);
}

private static class LogicalTypeToVariantWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.variants.ValueArray;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantArray;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantObject;
import org.apache.iceberg.variants.VariantTestUtil;
import org.apache.iceberg.variants.VariantValue;
import org.apache.iceberg.variants.Variants;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.FieldSource;
Expand Down Expand Up @@ -73,6 +76,12 @@ public class TestVariantWriters {
"c", Variants.of("string")));
private static final ByteBuffer EMPTY_OBJECT_BUFFER =
VariantTestUtil.createObject(TEST_METADATA_BUFFER, ImmutableMap.of());
private static final ByteBuffer ARRAY_IN_OBJECT_BUFFER =
VariantTestUtil.createObject(
TEST_METADATA_BUFFER,
ImmutableMap.of(
"a", Variants.of(123456789),
"c", array(Variants.of("string"), Variants.of("iceberg"))));

private static final VariantMetadata EMPTY_METADATA =
Variants.metadata(VariantTestUtil.emptyMetadata());
Expand All @@ -83,6 +92,30 @@ public class TestVariantWriters {
(VariantObject) Variants.value(TEST_METADATA, SIMILAR_OBJECT_BUFFER);
private static final VariantObject EMPTY_OBJECT =
(VariantObject) Variants.value(TEST_METADATA, EMPTY_OBJECT_BUFFER);
private static final VariantObject ARRAY_IN_OBJECT =
(VariantObject) Variants.value(TEST_METADATA, ARRAY_IN_OBJECT_BUFFER);

private static final ByteBuffer EMPTY_ARRAY_BUFFER = VariantTestUtil.createArray();
private static final ByteBuffer TEST_ARRAY_BUFFER =
VariantTestUtil.createArray(Variants.of("iceberg"), Variants.of("string"), Variants.of(34));
private static final ByteBuffer NESTED_ARRAY_BUFFER =
VariantTestUtil.createArray(
array(Variants.of("string"), Variants.of("iceberg"), Variants.of(34)),
array(Variants.of(34), Variants.ofNull()),
array(),
array(Variants.of("string"), Variants.of("iceberg")),
Variants.of(34));
private static final ByteBuffer OBJECT_IN_ARRAY_BUFFER =
VariantTestUtil.createArray(TEST_OBJECT, EMPTY_OBJECT, SIMILAR_OBJECT);

private static final VariantArray EMPTY_ARRAY =
(VariantArray) Variants.value(EMPTY_METADATA, EMPTY_ARRAY_BUFFER);
private static final VariantArray TEST_ARRAY =
(VariantArray) Variants.value(EMPTY_METADATA, TEST_ARRAY_BUFFER);
private static final VariantArray TEST_NESTED_ARRAY =
(VariantArray) Variants.value(EMPTY_METADATA, NESTED_ARRAY_BUFFER);
private static final VariantArray TEST_OBJECT_IN_ARRAY =
(VariantArray) Variants.value(TEST_METADATA, OBJECT_IN_ARRAY_BUFFER);

private static final Variant[] VARIANTS =
new Variant[] {
Expand All @@ -104,6 +137,11 @@ public class TestVariantWriters {
Variant.of(EMPTY_METADATA, EMPTY_OBJECT),
Variant.of(TEST_METADATA, TEST_OBJECT),
Variant.of(TEST_METADATA, SIMILAR_OBJECT),
Variant.of(TEST_METADATA, ARRAY_IN_OBJECT),
Variant.of(EMPTY_METADATA, EMPTY_ARRAY),
Variant.of(EMPTY_METADATA, TEST_ARRAY),
Variant.of(EMPTY_METADATA, TEST_NESTED_ARRAY),
Variant.of(TEST_METADATA, TEST_OBJECT_IN_ARRAY),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses ParquetVariantUtil.ParquetSchemaProducer to produce a shredding schema, but that hasn't been updated to support arrays so the shredded case for arrays is not testing the new shredded writer. You can check by running the tests with coverage.

I think it would be a good idea to update ParquetSchemaProducer to shred an array if it has a consistent element type across all elements. Then you'd need to ensure that the test cases here exercise that path by making some of the arrays have one element or a consistent type across elements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a little more, I think you'll probably want at least one test where the array is shredded, but values are not (there is no consistent type for elements). Then the mixed test will test whether existing shredding works for arrays with partial shredding.

Here's what I'm thinking for test cases:

  • An array of "string", "iceberg"
  • An array of "string", "iceberg", 34
  • An array of objects with a consistent schema that is the shredding schema
  • An array of objects with the previous case's schema, along with numbers and strings
  • An array of arrays like you have, but with a consistent inner array element type
  • An object with arrays

The idea is to use the schemas produced from one test (like list) to test other cases in the mixed test (like list).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rdblue . Friendly reminder: I also noticed the lack of array support in ParquetVariantUtil.ParquetSchemaProducer. However, I also observed code redundancy in ParquetSchemaProducer across TestVariantReaders and ParquetVariantUtil.

To address this, I moved the ParquetSchemaProducer class outside of the ParquetVariantUtil class to facilitate code reuse and ease of future modifications.

I submit a PR for this: #12916. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @XBaith. I think that we need to include the changes to ParquetSchemaProducer here. Otherwise the changes aren't exercised and we need those changes to properly test this PR.

Would it be alright with you if your changes were picked into this PR and you were listed as a co-author?

Copy link
Contributor Author

@aihuaxu aihuaxu Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that I was making the changes in TestVariantWriter before ParquetSchemaProducer was refactored and rebase lost that change.

I was using the most common type rather than requiring the unique type across all the array element since that should be closer to what engines will do. And we will the cases that some elements are shredded while some are not. Let me know your thoughts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue I checked #12916. Actually it can be separated out since it is a refactoring and TestVariantWriter and TestVariantReader should provide the same coverage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more test coverage, with one difference that we are shredding to the most common type in an array. Let me know your thoughts on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with implementing the array method for ParquetSchemaProducer in this PR.

However, I intentionally did not make changes to this part in my PR since #12916 mainly focuses on refactoring.

Variant.of(EMPTY_METADATA, Variants.ofIsoDate("2024-11-07")),
Variant.of(EMPTY_METADATA, Variants.ofIsoDate("1957-11-07")),
Variant.of(EMPTY_METADATA, Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00")),
Expand Down Expand Up @@ -200,4 +238,13 @@ private static List<Record> writeAndRead(
return Lists.newArrayList(reader);
}
}

private static ValueArray array(VariantValue... values) {
ValueArray arr = Variants.array();
for (VariantValue value : values) {
arr.add(value);
}

return arr;
}
}