Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ public VariantValue get(int index) {
public ByteBuffer buffer() {
return value;
}

@Override
public String toString() {
return VariantArray.asString(this);
}
}
19 changes: 19 additions & 0 deletions api/src/main/java/org/apache/iceberg/variants/VariantArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,23 @@ default PhysicalType type() {
default VariantArray asArray() {
return this;
}

static String asString(VariantArray arr) {
StringBuilder builder = new StringBuilder();

builder.append("VariantArray([");
boolean first = true;
for (int i = 0; i < arr.numElements(); i++) {
if (first) {
first = false;
} else {
builder.append(", ");
}

builder.append(arr.get(i));
}
builder.append("])");

return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ public static ByteBuffer createObject(VariantMetadata metadata, Map<String, Vari
return buffer;
}

static ByteBuffer createArray(Serialized... values) {
public static ByteBuffer createArray(VariantValue... values) {
int numElements = values.length;
boolean isLarge = numElements > 0xFF;

int dataSize = 0;
for (Serialized value : values) {
for (VariantValue value : values) {
// TODO: produce size for every variant without serializing
dataSize += value.buffer().remaining();
dataSize += value.sizeInBytes();
}

// offset size is the size needed to store the length of the data section
Expand All @@ -260,13 +260,11 @@ static ByteBuffer createArray(Serialized... values) {
// write values and offsets
int nextOffset = 0; // the first offset is always 0
int index = 0;
for (Serialized value : values) {
for (VariantValue value : values) {
// write the offset and value
VariantUtil.writeLittleEndianUnsigned(
buffer, nextOffset, offsetListOffset + (index * offsetSize), offsetSize);
// in a real implementation, the buffer should be passed to serialize
ByteBuffer valueBuffer = value.buffer();
int valueSize = writeBufferAbsolute(buffer, dataOffset + nextOffset, valueBuffer);
int valueSize = value.writeTo(buffer, dataOffset + nextOffset);
// update next offset and index
nextOffset += valueSize;
index += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,9 @@ private int writeTo(ByteBuffer buffer, int offset) {
return (dataOffset - offset) + dataSize;
}
}

@Override
public String toString() {
return VariantArray.asString(this);
}
}
9 changes: 8 additions & 1 deletion core/src/test/java/org/apache/iceberg/RandomVariants.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.ShreddedObject;
import org.apache.iceberg.variants.ValueArray;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantTestUtil;
Expand Down Expand Up @@ -120,7 +121,13 @@ private static VariantValue randomVariant(
byte[] uuidBytes = (byte[]) RandomUtil.generatePrimitive(Types.UUIDType.get(), random);
return Variants.of(type, UUIDUtil.convert(uuidBytes));
case ARRAY:
// for now, generate an object instead of an array
ValueArray arr = Variants.array();
int numElements = random.nextInt(10);
for (int i = 0; i < numElements; i += 1) {
arr.add(randomVariant(random, metadata, randomType(random)));
}

return arr;
case OBJECT:
ShreddedObject object = Variants.object(metadata);
if (metadata.dictionarySize() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.PathUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -395,7 +399,30 @@ public Type object(VariantObject object, List<String> names, List<Type> typedVal

@Override
public Type array(VariantArray array, List<Type> elementResults) {
return null;
if (elementResults.isEmpty()) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this not support shredding an array of encoded variants? This could be a list with only an inner value column?

}

// Choose most common type as shredding type and build 3-level list
Type defaultTYpe = elementResults.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: defaultTYpe

Type shredType =
elementResults.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is okay, but it seems strange to me to rely on equals and hashCode for types and counting. It would be simpler to only shred values if there is a uniform type, which is when we would benefit from shredding.

.filter(Objects::nonNull)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
.entrySet()
.stream()
.max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.orElse(defaultTYpe);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the list is not empty, then this should never be used. Why default it?


return list(shredType);
}

private static GroupType list(Type shreddedType) {
GroupType elementType = field("element", shreddedType);
checkField(elementType);

return Types.optionalList().element(elementType).named("typed_value");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.ShreddedObject;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantArray;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantObject;
import org.apache.iceberg.variants.VariantValue;
Expand Down Expand Up @@ -98,6 +99,17 @@ static ParquetValueWriter<VariantValue> objects(
builder.build());
}

@SuppressWarnings("unchecked")
public static ParquetValueWriter<VariantValue> array(
int repeatedDefinitionLevel,
int repeatedRepetitionLevel,
ParquetValueWriter<?> elementWriter) {
return new ArrayWriter(
repeatedDefinitionLevel,
repeatedRepetitionLevel,
(ParquetValueWriter<VariantValue>) elementWriter);
}

private static class VariantWriter implements ParquetValueWriter<Variant> {
private final ParquetValueWriter<VariantMetadata> metadataWriter;
private final ParquetValueWriter<VariantValue> valueWriter;
Expand Down Expand Up @@ -360,6 +372,55 @@ public void setColumnStore(ColumnWriteStore columnStore) {
}
}

private static class ArrayWriter implements TypedWriter {
private final int definitionLevel;
private final int repetitionLevel;
private final ParquetValueWriter<VariantValue> writer;
private final List<TripleWriter<?>> children;

protected ArrayWriter(
int definitionLevel, int repetitionLevel, ParquetValueWriter<VariantValue> writer) {
this.definitionLevel = definitionLevel;
this.repetitionLevel = repetitionLevel;
this.writer = writer;
this.children = writer.columns();
}

@Override
public Set<PhysicalType> types() {
return Set.of(PhysicalType.ARRAY);
}

@Override
public void write(int parentRepetition, VariantValue value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct to me.

VariantArray arr = value.asArray();
if (arr.numElements() == 0) {
writeNull(writer, parentRepetition, definitionLevel);
} else {
for (int i = 0; i < arr.numElements(); i++) {
VariantValue element = arr.get(i);

int rl = repetitionLevel;
if (i == 0) {
rl = parentRepetition;
}

writer.write(rl, element);
}
}
}

@Override
public List<TripleWriter<?>> columns() {
return children;
}

@Override
public void setColumnStore(ColumnWriteStore columnStore) {
writer.setColumnStore(columnStore);
}
}

private static void writeNull(
ParquetValueWriter<?> writer, int repetitionLevel, int definitionLevel) {
for (TripleWriter<?> column : writer.columns()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,15 @@ public ParquetValueWriter<?> object(
@Override
public ParquetValueWriter<?> array(
GroupType array, ParquetValueWriter<?> valueWriter, ParquetValueWriter<?> elementWriter) {
throw new UnsupportedOperationException("Array is not yet supported");
int valueDL = schema.getMaxDefinitionLevel(path(VALUE));
int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE));
int repeatedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE, LIST));
int repeatedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE, LIST));

ParquetValueWriter<VariantValue> typedWriter =
ParquetVariantWriters.array(repeatedDL, repeatedRL, elementWriter);

return ParquetVariantWriters.shredded(valueDL, valueWriter, typedDL, typedWriter);
}

private static class LogicalTypeToVariantWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.variants.ValueArray;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantArray;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantObject;
import org.apache.iceberg.variants.VariantTestUtil;
import org.apache.iceberg.variants.VariantValue;
import org.apache.iceberg.variants.Variants;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.FieldSource;
Expand Down Expand Up @@ -73,6 +76,12 @@ public class TestVariantWriters {
"c", Variants.of("string")));
private static final ByteBuffer EMPTY_OBJECT_BUFFER =
VariantTestUtil.createObject(TEST_METADATA_BUFFER, ImmutableMap.of());
private static final ByteBuffer ARRAY_IN_OBJECT_BUFFER =
VariantTestUtil.createObject(
TEST_METADATA_BUFFER,
ImmutableMap.of(
"a", Variants.of(123456789),
"c", array(Variants.of("string"), Variants.of("iceberg"))));

private static final VariantMetadata EMPTY_METADATA =
Variants.metadata(VariantTestUtil.emptyMetadata());
Expand All @@ -83,6 +92,45 @@ public class TestVariantWriters {
(VariantObject) Variants.value(TEST_METADATA, SIMILAR_OBJECT_BUFFER);
private static final VariantObject EMPTY_OBJECT =
(VariantObject) Variants.value(TEST_METADATA, EMPTY_OBJECT_BUFFER);
private static final VariantObject ARRAY_IN_OBJECT =
(VariantObject) Variants.value(TEST_METADATA, ARRAY_IN_OBJECT_BUFFER);

private static final ByteBuffer EMPTY_ARRAY_BUFFER = VariantTestUtil.createArray();
private static final ByteBuffer TEST_ARRAY_BUFFER =
VariantTestUtil.createArray(Variants.of("iceberg"), Variants.of("string"));
private static final ByteBuffer MIXED_TYPE_ARRAY_BUFFER =
VariantTestUtil.createArray(Variants.of("iceberg"), Variants.of("string"), Variants.of(34));
private static final ByteBuffer NESTED_ARRAY_BUFFER =
VariantTestUtil.createArray(
array(Variants.of("string"), Variants.of("iceberg")),
array(Variants.of("string"), Variants.of("iceberg")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use different string values for the second array?

private static final ByteBuffer MIXED_NESTED_ARRAY_BUFFER =
VariantTestUtil.createArray(
array(Variants.of("string"), Variants.of("iceberg"), Variants.of(34)),
array(Variants.of(34), Variants.ofNull()),
array(),
array(Variants.of("string"), Variants.of("iceberg")),
Variants.of(34));
private static final ByteBuffer OBJECT_IN_ARRAY_BUFFER =
VariantTestUtil.createArray(SIMILAR_OBJECT, SIMILAR_OBJECT);
private static final ByteBuffer MIXED_OBJECT_IN_ARRAY_BUFFER =
VariantTestUtil.createArray(
SIMILAR_OBJECT, SIMILAR_OBJECT, Variants.of("iceberg"), Variants.of(34));

private static final VariantArray EMPTY_ARRAY =
(VariantArray) Variants.value(EMPTY_METADATA, EMPTY_ARRAY_BUFFER);
private static final VariantArray TEST_ARRAY =
(VariantArray) Variants.value(EMPTY_METADATA, TEST_ARRAY_BUFFER);
private static final VariantArray MIXED_TYPE_ARRAY =
(VariantArray) Variants.value(EMPTY_METADATA, MIXED_TYPE_ARRAY_BUFFER);
private static final VariantArray NESTED_ARRAY =
(VariantArray) Variants.value(EMPTY_METADATA, NESTED_ARRAY_BUFFER);
private static final VariantArray MIXED_NESTED_ARRAY =
(VariantArray) Variants.value(EMPTY_METADATA, MIXED_NESTED_ARRAY_BUFFER);
private static final VariantArray OBJECT_IN_ARRAY =
(VariantArray) Variants.value(TEST_METADATA, OBJECT_IN_ARRAY_BUFFER);
private static final VariantArray MIXED_OBJECT_IN_ARRAY =
(VariantArray) Variants.value(TEST_METADATA, MIXED_OBJECT_IN_ARRAY_BUFFER);

private static final Variant[] VARIANTS =
new Variant[] {
Expand All @@ -104,6 +152,14 @@ public class TestVariantWriters {
Variant.of(EMPTY_METADATA, EMPTY_OBJECT),
Variant.of(TEST_METADATA, TEST_OBJECT),
Variant.of(TEST_METADATA, SIMILAR_OBJECT),
Variant.of(TEST_METADATA, ARRAY_IN_OBJECT),
Variant.of(EMPTY_METADATA, EMPTY_ARRAY),
Variant.of(EMPTY_METADATA, TEST_ARRAY),
Variant.of(EMPTY_METADATA, MIXED_TYPE_ARRAY),
Variant.of(EMPTY_METADATA, NESTED_ARRAY),
Variant.of(EMPTY_METADATA, MIXED_NESTED_ARRAY),
Variant.of(TEST_METADATA, OBJECT_IN_ARRAY),
Variant.of(TEST_METADATA, MIXED_OBJECT_IN_ARRAY),
Variant.of(EMPTY_METADATA, Variants.ofIsoDate("2024-11-07")),
Variant.of(EMPTY_METADATA, Variants.ofIsoDate("1957-11-07")),
Variant.of(EMPTY_METADATA, Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00")),
Expand Down Expand Up @@ -200,4 +256,13 @@ private static List<Record> writeAndRead(
return Lists.newArrayList(reader);
}
}

private static ValueArray array(VariantValue... values) {
ValueArray arr = Variants.array();
for (VariantValue value : values) {
arr.add(value);
}

return arr;
}
}