Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -42,6 +43,17 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
metrics.lowerBounds(), metrics.upperBounds(), splitOffsets, null, sortOrderId, keyMetadata);
}

GenericDataFile(int specId, FileContent content, String filePath, FileFormat format, PartitionData partition,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't content be hard-coded to DATA like the constructor above?

long fileSizeInBytes, long recordCount, Map<Integer, Long> columnSizes,
Map<Integer, Long> valueCounts, Map<Integer, Long> nullValueCounts,
Map<Integer, Long> nanValueCounts, Map<Integer, ByteBuffer> lowerBounds,
Map<Integer, ByteBuffer> upperBounds, List<Long> splitOffsets, int[] equalityFieldIds,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for equalityFieldIds for data files.

Integer sortOrderId, ByteBuffer keyMetadata) {
super(specId, content, filePath, format, partition, fileSizeInBytes, recordCount, columnSizes, valueCounts,
nullValueCounts, nanValueCounts, lowerBounds, upperBounds, splitOffsets, equalityFieldIds, sortOrderId,
keyMetadata);
}

/**
* Copy constructor.
*
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@


import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -42,6 +44,17 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
metrics.lowerBounds(), metrics.upperBounds(), null, equalityFieldIds, sortOrderId, keyMetadata);
}

GenericDeleteFile(int specId, FileContent content, String filePath, FileFormat format, PartitionData partition,
long fileSizeInBytes, long recordCount, Map<Integer, Long> columnSizes,
Map<Integer, Long> valueCounts, Map<Integer, Long> nullValueCounts,
Map<Integer, Long> nanValueCounts, Map<Integer, ByteBuffer> lowerBounds,
Map<Integer, ByteBuffer> upperBounds, List<Long> splitOffsets, int[] equalityFieldIds,
Integer sortOrderId, ByteBuffer keyMetadata) {
super(specId, content, filePath, format, partition, fileSizeInBytes, recordCount, columnSizes, valueCounts,
nullValueCounts, nanValueCounts, lowerBounds, upperBounds, splitOffsets, equalityFieldIds, sortOrderId,
keyMetadata);
}

/**
* Copy constructor.
*
Expand Down
156 changes: 156 additions & 0 deletions core/src/main/java/org/apache/iceberg/JacksonSerializationUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.expressions.ResidualEvaluator;

/**
* Util methods to help Jackson serialization of Iceberg objects, useful in systems like Presto and Trino.
*/
public class JacksonSerializationUtil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the other to/from JSON classes are named SomethingParser. Can we use the same pattern?


private JacksonSerializationUtil() {
}

public static FileScanTask createFileScanTask(DataFile file, DeleteFile[] deletes, String schemaString,
String specString, ResidualEvaluator residuals) {
Copy link
Copy Markdown
Contributor Author

@jackye1995 jackye1995 Oct 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only unknown part is the serialization of ResidualEvaluator, which requires serialization of Iceberg Expression. In theory we can convert it to Trino TupleDomain (basically the reverse of https://github.com/trinodb/trino/blob/master/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java), but I have not verified if that is lossy or not, maybe there is a better way to serialize expressions directly in Iceberg to a string format that can be easily deserialized back.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, we may want to have an expression parser, but I've been trying to avoid the complexity of that for a long time. We could also have a similar util class to convert expressions to/from JSON.

Does Trino actually use the residuals? If not, we could just skip serializing the residual evaluator.

return new BaseFileScanTask(file, deletes, schemaString, specString, residuals);
}

public static DataFile createDataFile(int specId, FileContent content, String filePath, FileFormat format,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this use the builders instead of a direct construtor?

PartitionData partitionData, long fileSizeInBytes, long recordCount,
Map<Integer, Long> columnSizes, Map<Integer, Long> valueCounts,
Map<Integer, Long> nullValueCounts, Map<Integer, Long> nanValueCounts,
Map<Integer, ByteBuffer> lowerBounds, Map<Integer, ByteBuffer> upperBounds,
List<Long> splitOffsets, int[] equalityFieldIds, Integer sortOrderId,
ByteBuffer keyMetadata) {
return new GenericDataFile(specId, content, filePath, format, partitionData, fileSizeInBytes, recordCount,
columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds, splitOffsets,
equalityFieldIds, sortOrderId, keyMetadata);
}

public static DeleteFile createDeleteFile(int specId, FileContent content, String filePath, FileFormat format,
PartitionData partitionData, long fileSizeInBytes, long recordCount,
Map<Integer, Long> columnSizes, Map<Integer, Long> valueCounts,
Map<Integer, Long> nullValueCounts, Map<Integer, Long> nanValueCounts,
Map<Integer, ByteBuffer> lowerBounds, Map<Integer, ByteBuffer> upperBounds,
List<Long> splitOffsets, int[] equalityFieldIds, Integer sortOrderId,
ByteBuffer keyMetadata) {
return new GenericDeleteFile(specId, content, filePath, format, partitionData, fileSizeInBytes, recordCount,
columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds, splitOffsets,
equalityFieldIds, sortOrderId, keyMetadata);
}

public static List<byte[]> partitionDataToBytesMap(PartitionData partition) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec is used in deserialization to convert bytes to values. Why not also use it to convert values to bytes rather than instanceof checks? Then you'd know whether a value matches the assumptions at serialization time rather than potentially deserializing incorrectly. For example, if I have an long in the partition that should be an int serializing 8 bytes but deserializing just the first 4 is a silent error.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of accepting a PartitionData, you may want to consider using StructLike.

return IntStream.of(partition.size())
.mapToObj(i -> partitionValueToBytes(partition.get(i), i))
.collect(Collectors.toList());
}

public static PartitionData bytesMapToPartitionData(List<byte[]> values, PartitionSpec spec) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unused. Do i understand correctly, it's for Trino's consumption?
We should exercise this code within Iceberg. Let's have a test.

PartitionData partitionData = new PartitionData(spec.partitionType());
Class<?>[] classes = spec.javaClasses();
IntStream.of(values.size()).forEach(i -> partitionData.set(i, bytesToPartitionValue(values.get(i), classes[i], i)));
return partitionData;
}

private static byte[] partitionValueToBytes(Object value, int pos) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the existing Conversions.toByteBuffer and Conversions.fromByteBuffer instead? Those implement the single-value serializations defined in the spec.

byte[] bytes;
if (value == null) {
bytes = null;
} else if (value instanceof ByteBuffer) {
ByteBuffer bb = (ByteBuffer) value;
bytes = new byte[bb.remaining()];
bb.put(bytes);
} else if (value instanceof CharSequence) {
bytes = value.toString().getBytes(StandardCharsets.UTF_8);
} else if (value instanceof UUID) {
ByteBuffer bb = ByteBuffer.allocate(16);
bb.putLong(((UUID) value).getMostSignificantBits());
bb.putLong(((UUID) value).getLeastSignificantBits());
bytes = bb.array();
} else if (value instanceof Boolean) {
bytes = ByteBuffer.allocate(Integer.BYTES).putInt((Boolean) value ? 1 : 0).array();
} else if (value instanceof Integer) {
bytes = ByteBuffer.allocate(Integer.BYTES).putInt((Integer) value).array();
} else if (value instanceof Long) {
bytes = ByteBuffer.allocate(Long.BYTES).putLong((Long) value).array();
} else if (value instanceof Float) {
bytes = ByteBuffer.allocate(Float.BYTES).putFloat((Float) value).array();
} else if (value instanceof Double) {
bytes = ByteBuffer.allocate(Double.BYTES).putDouble((Double) value).array();
} else if (value instanceof BigDecimal) {
bytes = value.toString().getBytes(StandardCharsets.UTF_8);
} else {
throw new UnsupportedOperationException("Unsupported partition data value " + value + " at position" + pos);
}

return bytes;
}

private static Object bytesToPartitionValue(byte[] bytes, Class<?> javaClass, int pos) {
Object value;
if (bytes == null) {
value = null;
} else if (javaClass.equals(ByteBuffer.class)) {
value = ByteBuffer.wrap(bytes);
} else if (javaClass.equals(CharSequence.class)) {
value = new String(bytes, StandardCharsets.UTF_8);
} else if (javaClass.equals(UUID.class)) {
value = UUID.nameUUIDFromBytes(bytes);
} else if (javaClass.equals(Boolean.class)) {
value = ByteBuffer.wrap(bytes).getInt() == 1;
} else if (javaClass.equals(Integer.class)) {
value = ByteBuffer.wrap(bytes).getInt();
} else if (javaClass.equals(Long.class)) {
value = ByteBuffer.wrap(bytes).getLong();
} else if (javaClass.equals(Float.class)) {
value = ByteBuffer.wrap(bytes).getFloat();
} else if (javaClass.equals(Double.class)) {
value = ByteBuffer.wrap(bytes).getDouble();
} else if (javaClass.equals(BigDecimal.class)) {
value = new BigDecimal(new String(bytes, StandardCharsets.UTF_8));
} else {
throw new UnsupportedOperationException("Unsupported partition data class " + javaClass + " at position" + pos);
}

return value;
}

public static <T> Map<T, ByteBuffer> bytesMapToByteBufferMap(Map<T, byte[]> map) {
return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ByteBuffer.wrap(e.getValue())));
}

public static <T> Map<T, byte[]> byteBufferMapToBytesMap(Map<T, ByteBuffer> map) {
return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
byte[] bytes = new byte[e.getValue().remaining()];
e.getValue().put(bytes);
return bytes;
}));
}
}