Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/types/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Objects;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.variants.Variant;

public interface Type extends Serializable {
enum TypeID {
Expand All @@ -46,7 +47,7 @@ enum TypeID {
STRUCT(StructLike.class),
LIST(List.class),
MAP(Map.class),
VARIANT(Object.class),
VARIANT(Variant.class),
UNKNOWN(Object.class);

private final Class<?> javaClass;
Expand Down
26 changes: 26 additions & 0 deletions api/src/main/java/org/apache/iceberg/variants/BasicType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.variants;

enum BasicType {
PRIMITIVE,
SHORT_STRING,
OBJECT,
ARRAY
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
*/
package org.apache.iceberg.variants;

/** A variant metadata and value pair. */
public interface Variant {
Copy link
Contributor Author

@rdblue rdblue Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved from Variants and did not replace the Variant interface. Looks like a bad diff detection in git.

/** Returns the metadata for all values in the variant. */
VariantMetadata metadata();
import java.nio.ByteBuffer;

/** Returns the variant value. */
VariantValue value();
interface Serialized {
ByteBuffer buffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class SerializedArray extends Variants.SerializedValue implements VariantArray {
class SerializedArray implements VariantArray, SerializedValue {
private static final int HEADER_SIZE = 1;
private static final int OFFSET_SIZE_MASK = 0b1100;
private static final int OFFSET_SIZE_SHIFT = 2;
private static final int IS_LARGE = 0b10000;
Expand All @@ -36,9 +37,9 @@ static SerializedArray from(VariantMetadata metadata, byte[] bytes) {
static SerializedArray from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
Variants.BasicType basicType = VariantUtil.basicType(header);
BasicType basicType = VariantUtil.basicType(header);
Preconditions.checkArgument(
basicType == Variants.BasicType.ARRAY, "Invalid array, basic type: " + basicType);
basicType == BasicType.ARRAY, "Invalid array, basic type: " + basicType);
return new SerializedArray(metadata, value, header);
}

Expand All @@ -54,9 +55,8 @@ private SerializedArray(VariantMetadata metadata, ByteBuffer value, int header)
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
int numElements =
VariantUtil.readLittleEndianUnsigned(value, Variants.HEADER_SIZE, numElementsSize);
this.offsetListOffset = Variants.HEADER_SIZE + numElementsSize;
int numElements = VariantUtil.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
this.offsetListOffset = HEADER_SIZE + numElementsSize;
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
this.array = new VariantValue[numElements];
}
Expand All @@ -76,7 +76,7 @@ public VariantValue get(int index) {
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
array[index] =
Variants.value(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
VariantValue.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
}
return array[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class SerializedMetadata implements VariantMetadata, Variants.Serialized {
class SerializedMetadata implements VariantMetadata, Serialized {
private static final int HEADER_SIZE = 1;
private static final int SUPPORTED_VERSION = 1;
private static final int VERSION_MASK = 0b1111;
private static final int SORTED_STRINGS = 0b10000;
private static final int OFFSET_SIZE_MASK = 0b11000000;
private static final int OFFSET_SIZE_SHIFT = 6;

static final ByteBuffer EMPTY_V1_BUFFER =
ByteBuffer.wrap(new byte[] {0x01, 0x00}).order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer.wrap(new byte[] {0x01, 0x00, 0x00}).order(ByteOrder.LITTLE_ENDIAN);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation now finds the end of the Variant metadata buffer so that metadata and value buffers can be concatenated.

static final SerializedMetadata EMPTY_V1_METADATA = from(EMPTY_V1_BUFFER);

static SerializedMetadata from(byte[] bytes) {
Expand All @@ -55,13 +56,21 @@ static SerializedMetadata from(ByteBuffer metadata) {
private final String[] dict;

private SerializedMetadata(ByteBuffer metadata, int header) {
this.metadata = metadata;
this.isSorted = (header & SORTED_STRINGS) == SORTED_STRINGS;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int dictSize = VariantUtil.readLittleEndianUnsigned(metadata, Variants.HEADER_SIZE, offsetSize);
int dictSize = VariantUtil.readLittleEndianUnsigned(metadata, HEADER_SIZE, offsetSize);
this.dict = new String[dictSize];
this.offsetListOffset = Variants.HEADER_SIZE + offsetSize;
this.offsetListOffset = HEADER_SIZE + offsetSize;
this.dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize);
int endOffset =
dataOffset
+ VariantUtil.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * dictSize), offsetSize);
if (endOffset < metadata.limit()) {
this.metadata = VariantUtil.slice(metadata, 0, endOffset);
} else {
this.metadata = metadata;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Pair;

class SerializedObject extends Variants.SerializedValue implements VariantObject {
class SerializedObject implements VariantObject, SerializedValue {
private static final int HEADER_SIZE = 1;
private static final int OFFSET_SIZE_MASK = 0b1100;
private static final int OFFSET_SIZE_SHIFT = 2;
private static final int FIELD_ID_SIZE_MASK = 0b110000;
Expand All @@ -43,9 +43,9 @@ static SerializedObject from(VariantMetadata metadata, byte[] bytes) {
static SerializedObject from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
Variants.BasicType basicType = VariantUtil.basicType(header);
BasicType basicType = VariantUtil.basicType(header);
Preconditions.checkArgument(
basicType == Variants.BasicType.OBJECT, "Invalid object, basic type: " + basicType);
basicType == BasicType.OBJECT, "Invalid object, basic type: " + basicType);
return new SerializedObject(metadata, value, header);
}

Expand All @@ -67,9 +67,8 @@ private SerializedObject(VariantMetadata metadata, ByteBuffer value, int header)
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
this.fieldIdSize = 1 + ((header & FIELD_ID_SIZE_MASK) >> FIELD_ID_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
int numElements =
VariantUtil.readLittleEndianUnsigned(value, Variants.HEADER_SIZE, numElementsSize);
this.fieldIdListOffset = Variants.HEADER_SIZE + numElementsSize;
int numElements = VariantUtil.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
this.fieldIdListOffset = HEADER_SIZE + numElementsSize;
this.fieldIds = new Integer[numElements];
this.offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize);
this.offsets = new int[numElements];
Expand Down Expand Up @@ -122,7 +121,7 @@ VariantMetadata metadata() {
return metadata;
}

Iterable<Pair<String, Integer>> fields() {
Iterable<Map.Entry<String, Integer>> fields() {
return () ->
new Iterator<>() {
private int index = 0;
Expand All @@ -133,8 +132,8 @@ public boolean hasNext() {
}

@Override
public Pair<String, Integer> next() {
Pair<String, Integer> next = Pair.of(metadata.get(id(index)), index);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pair is part of core and can't be moved because it has Avro class references.

public Map.Entry<String, Integer> next() {
Map.Entry<String, Integer> next = Map.entry(metadata.get(id(index)), index);
index += 1;
return next;
}
Expand Down Expand Up @@ -182,7 +181,7 @@ public VariantValue get(String name) {

if (null == values[index]) {
values[index] =
Variants.value(
VariantValue.from(
metadata, VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]));
}

Expand Down Expand Up @@ -213,7 +212,7 @@ ByteBuffer sliceValue(String name) {
*/
ByteBuffer sliceValue(int index) {
if (values[index] != null) {
return ((Variants.Serialized) values[index]).buffer();
return ((Serialized) values[index]).buffer();
}

return VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]);
Expand All @@ -224,11 +223,6 @@ public ByteBuffer buffer() {
return value;
}

@Override
public int sizeInBytes() {
return value.remaining();
}

@Override
public String toString() {
return VariantObject.asString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import java.nio.ByteOrder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class SerializedPrimitive extends Variants.SerializedValue implements VariantPrimitive<Object> {
class SerializedPrimitive implements VariantPrimitive<Object>, SerializedValue {
private static final int PRIMITIVE_TYPE_SHIFT = 2;
private static final int PRIMITIVE_OFFSET = Variants.HEADER_SIZE;
private static final int PRIMITIVE_OFFSET = 1;

static SerializedPrimitive from(byte[] bytes) {
return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
Expand All @@ -35,9 +35,9 @@ static SerializedPrimitive from(byte[] bytes) {
static SerializedPrimitive from(ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
Variants.BasicType basicType = VariantUtil.basicType(header);
BasicType basicType = VariantUtil.basicType(header);
Preconditions.checkArgument(
basicType == Variants.BasicType.PRIMITIVE,
basicType == BasicType.PRIMITIVE,
"Invalid primitive, basic type != PRIMITIVE: " + basicType);
return new SerializedPrimitive(value, header);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.nio.ByteOrder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class SerializedShortString extends Variants.SerializedValue implements VariantPrimitive<String> {
class SerializedShortString implements VariantPrimitive<String>, SerializedValue {
private static final int HEADER_SIZE = 1;
private static final int LENGTH_MASK = 0b11111100;
private static final int LENGTH_SHIFT = 2;

Expand All @@ -33,10 +34,9 @@ static SerializedShortString from(byte[] bytes) {
static SerializedShortString from(ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
Variants.BasicType basicType = VariantUtil.basicType(header);
BasicType basicType = VariantUtil.basicType(header);
Preconditions.checkArgument(
basicType == Variants.BasicType.SHORT_STRING,
"Invalid short string, basic type: " + basicType);
basicType == BasicType.SHORT_STRING, "Invalid short string, basic type: " + basicType);
return new SerializedShortString(value, header);
}

Expand All @@ -57,7 +57,7 @@ public PhysicalType type() {
@Override
public String get() {
if (null == string) {
this.string = VariantUtil.readString(value, Variants.HEADER_SIZE, length);
this.string = VariantUtil.readString(value, HEADER_SIZE, length);
}
return string;
}
Expand Down
35 changes: 35 additions & 0 deletions api/src/main/java/org/apache/iceberg/variants/SerializedValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.variants;

import java.nio.ByteBuffer;

interface SerializedValue extends VariantValue, Serialized {
@Override
default int sizeInBytes() {
return buffer().remaining();
}

@Override
default int writeTo(ByteBuffer buffer, int offset) {
ByteBuffer value = buffer();
VariantUtil.writeBufferAbsolute(buffer, offset, value);
return value.remaining();
}
}
53 changes: 53 additions & 0 deletions api/src/main/java/org/apache/iceberg/variants/Variant.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.variants;

import java.nio.ByteBuffer;

/** A variant metadata and value pair. */
public interface Variant {
/** Returns the metadata for all values in the variant. */
VariantMetadata metadata();

/** Returns the variant value. */
VariantValue value();

static Variant of(VariantMetadata metadata, VariantValue value) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be replaced with the implementation in the Parquet writer PR (#12323).

return new Variant() {
@Override
public VariantMetadata metadata() {
return metadata;
}

@Override
public VariantValue value() {
return value;
}
};
}

static Variant from(ByteBuffer buffer) {
VariantMetadata metadata = VariantMetadata.from(buffer);
ByteBuffer valueBuffer =
VariantUtil.slice(
buffer, metadata.sizeInBytes(), buffer.remaining() - metadata.sizeInBytes());
VariantValue value = VariantValue.from(metadata, valueBuffer);
return of(metadata, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public interface VariantMetadata {
*/
int writeTo(ByteBuffer buffer, int offset);

static VariantMetadata from(ByteBuffer buffer) {
return SerializedMetadata.from(buffer);
}

static String asString(VariantMetadata metadata) {
StringBuilder builder = new StringBuilder();

Expand Down
Loading