From 4990d7bf128eb6e08cda87076a53f490451278d6 Mon Sep 17 00:00:00 2001 From: Zac Blanco Date: Mon, 7 Oct 2024 14:53:26 -0700 Subject: [PATCH 1/3] [parquet] Add a profile to re-generate batch readers Previously, the code generation step was removed in 7c814aeeb5ab3 However, this makes it more difficult to add new readers in the future, such as for UUIDs. This change adds the code generation step as an optional profile which can be enabled when building the parquet module through -PgenerateParquetReaders --- presto-parquet/.gitignore | 2 + presto-parquet/pom.xml | 43 ++++++++++++++++++- .../batchreader/BooleanFlatBatchReader.java | 1 + .../batchreader/BooleanNestedBatchReader.java | 1 + .../batchreader/Int64FlatBatchReader.java | 1 + .../batchreader/Int64NestedBatchReader.java | 1 + ...TimeAndTimestampMicrosFlatBatchReader.java | 1 + ...meAndTimestampMicrosNestedBatchReader.java | 1 + .../LongDecimalFlatBatchReader.java | 1 + .../ShortDecimalFlatBatchReader.java | 1 + .../batchreader/TimestampFlatBatchReader.java | 1 + .../TimestampNestedBatchReader.java | 1 + 12 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 presto-parquet/.gitignore diff --git a/presto-parquet/.gitignore b/presto-parquet/.gitignore new file mode 100644 index 0000000000000..5dde9f821430a --- /dev/null +++ b/presto-parquet/.gitignore @@ -0,0 +1,2 @@ +src/main/java/ParquetFlatColumnReaderGenerator.tdd +src/main/java/ParquetNestedColumnReaderGenerator.tdd \ No newline at end of file diff --git a/presto-parquet/pom.xml b/presto-parquet/pom.xml index 7f2caa8e0b12a..056d4f726eadc 100644 --- a/presto-parquet/pom.xml +++ b/presto-parquet/pom.xml @@ -223,10 +223,19 @@ shaded.parquet.it.unimi.dsi.fastutil.* module-info - com.facebook.presto.parquet.batchreader.*BatchReader + com.facebook.presto.parquet.* + org.apache.parquet.io.* + org.apache.parquet.crypto.* + + org.apache.maven.plugins + maven-checkstyle-plugin + + **/com/facebook/presto/parquet/batchreader/*FlatBatchReader.java,**/com/facebook/presto/parquet/batchreader/*NestedBatchReader.java + + @@ -245,5 +254,37 @@ + + generateParquetReaders + + + + com.googlecode.fmpp-maven-plugin + fmpp-maven-plugin + 1.0 + + + net.sourceforge.fmpp + fmpp + 0.9.15 + + + + ${project.basedir}/src/main/resources/freemarker/config.fmpp + ${project.build.sourceDirectory} + ${project.basedir}/src/main/resources/freemarker/templates + + + + generate-sources + + generate + + + + + + + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java index a16af1f2cc1e6..73dedeeeba719 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java index f89ea02b21894..53f6e6620b4ff 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java @@ -142,3 +142,4 @@ protected void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java index 2c97056ddda94..cba2bd2c1297a 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java index fea6186ad6126..9d69d34a30e37 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java @@ -142,3 +142,4 @@ protected void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java index 758df93fa44f8..ccedd8af82645 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java index c9a96ef85b7d2..652915453b4a3 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java @@ -142,3 +142,4 @@ protected void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java index 478dee49e0e49..90a13544b3186 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java @@ -256,3 +256,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java index db3dff192a240..235d7f79cc580 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java index 7ac7bef95043d..0d52fa272e455 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java index e5c867a4d9893..a7a62ebc7ac01 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java @@ -142,3 +142,4 @@ protected void seek() } } } + From 5b0c25c9d2f50e79493d277e5b1ab44c6465a6dc Mon Sep 17 00:00:00 2001 From: Zac Blanco Date: Fri, 30 Aug 2024 15:37:54 -0700 Subject: [PATCH 2/3] [iceberg] Add UUID type support The iceberg spec lists uuid as a valid schema type. Presto supports UUID types but there was no support for reading or writing them in the connector. This commit makes the necessary changes in the connector to create tables with UUID columns and support for UUIDs in the parquet reader. This includes an implementation for UUIDs in the batchreader. --- .../src/main/sphinx/connector/iceberg.rst | 41 +-- .../com/facebook/presto/hive/HiveType.java | 47 ++++ .../presto/hive/HiveTypeTranslator.java | 5 + .../presto/iceberg/ExpressionConverter.java | 8 + .../presto/iceberg/TypeConverter.java | 6 + .../iceberg/IcebergDistributedTestBase.java | 92 ++++++- .../presto/parquet/ColumnReaderFactory.java | 12 +- .../presto/parquet/ParquetTypeUtils.java | 6 + .../parquet/batchreader/BytesUtils.java | 35 +++ .../batchreader/UuidFlatBatchReader.java | 260 ++++++++++++++++++ .../batchreader/decoders/Decoders.java | 16 ++ .../batchreader/decoders/ValuesDecoder.java | 10 + ...xedLenByteArrayUuidDeltaValuesDecoder.java | 62 +++++ ...xedLenByteArrayUuidPlainValuesDecoder.java | 72 +++++ .../rle/UuidRLEDictionaryValuesDecoder.java | 62 +++++ .../presto/parquet/cache/MetadataReader.java | 13 +- .../parquet/reader/BinaryColumnReader.java | 11 + .../writer/ParquetSchemaConverter.java | 8 + .../presto/parquet/writer/ParquetWriters.java | 5 + .../writer/valuewriter/UuidValuesWriter.java | 55 ++++ .../freemarker/data/ParquetTypes.tdd | 6 + .../ParquetFlatColumnReaderGenerator.tdd | 2 +- .../ParquetNestedColumnReaderGenerator.tdd | 2 +- .../decoders/TestParquetUtils.java | 10 + .../decoders/TestValuesDecoders.java | 99 +++++++ .../reader/BenchmarkUuidColumnReader.java | 104 +++++++ .../parquet/writer/TestParquetWriter.java | 2 + 27 files changed, 1009 insertions(+), 42 deletions(-) create mode 100644 presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/UuidFlatBatchReader.java create mode 100644 presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayUuidDeltaValuesDecoder.java create mode 100644 presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayUuidPlainValuesDecoder.java create mode 100644 presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java create mode 100644 presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/UuidValuesWriter.java create mode 100644 presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 731e1f602b36a..55eb20acf9281 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1737,26 +1737,28 @@ Map of Iceberg types to the relevant PrestoDB types: - PrestoDB type * - ``BOOLEAN`` - ``BOOLEAN`` - * - ``BINARY``, ``FIXED`` - - ``VARBINARY`` - * - ``DATE`` - - ``DATE`` - * - ``DECIMAL`` - - ``DECIMAL`` - * - ``DOUBLE`` - - ``DOUBLE`` + * - ``INTEGER`` + - ``INTEGER`` * - ``LONG`` - ``BIGINT`` * - ``FLOAT`` - ``REAL`` - * - ``INTEGER`` - - ``INTEGER`` + * - ``DOUBLE`` + - ``DOUBLE`` + * - ``DECIMAL`` + - ``DECIMAL`` + * - ``STRING`` + - ``VARCHAR`` + * - ``BINARY``, ``FIXED`` + - ``VARBINARY`` + * - ``DATE`` + - ``DATE`` * - ``TIME`` - ``TIME`` * - ``TIMESTAMP`` - ``TIMESTAMP`` - * - ``STRING`` - - ``VARCHAR`` + * - ``UUID`` + - ``UUID`` * - ``LIST`` - ``ARRAY`` * - ``MAP`` @@ -1796,17 +1798,20 @@ Map of PrestoDB types to the relevant Iceberg types: - ``BINARY`` * - ``DATE`` - ``DATE`` - * - ``ROW`` - - ``STRUCT`` - * - ``ARRAY`` - - ``LIST`` - * - ``MAP`` - - ``MAP`` * - ``TIME`` - ``TIME`` * - ``TIMESTAMP`` - ``TIMESTAMP WITHOUT ZONE`` * - ``TIMESTAMP WITH TIMEZONE`` - ``TIMESTAMP WITH ZONE`` + * - ``UUID`` + - ``UUID`` + * - ``ARRAY`` + - ``LIST`` + * - ``MAP`` + - ``MAP`` + * - ``ROW`` + - ``STRUCT`` + No other types are supported. diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveType.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveType.java index 566fa5f3ac8c3..d6d641362bc93 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveType.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveType.java @@ -20,6 +20,7 @@ import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.common.type.TypeSignature; import com.facebook.presto.common.type.TypeSignatureParameter; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.spi.PrestoException; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; @@ -47,6 +48,7 @@ import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.common.type.RealType.REAL; import static com.facebook.presto.common.type.SmallintType.SMALLINT; +import static com.facebook.presto.common.type.StandardTypes.UUID; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TinyintType.TINYINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; @@ -58,6 +60,7 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE; import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.binaryTypeInfo; import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.booleanTypeInfo; import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.byteTypeInfo; @@ -87,6 +90,47 @@ public final class HiveType public static final HiveType HIVE_TIMESTAMP = new HiveType(timestampTypeInfo); public static final HiveType HIVE_DATE = new HiveType(dateTypeInfo); public static final HiveType HIVE_BINARY = new HiveType(binaryTypeInfo); + public static final HiveType HIVE_UUID = new HiveType(new TypeInfo() + { + @Override + public Category getCategory() + { + return PRIMITIVE; + } + + @Override + public String getTypeName() + { + return UUID; + } + + @Override + public boolean equals(Object other) + { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + + TypeInfo ti = (TypeInfo) other; + + return UUID.equals(ti.getTypeName()); + } + + @Override + public int hashCode() + { + return UUID.hashCode(); + } + + @Override + public String toString() + { + return UUID; + } + }); private final HiveTypeName hiveTypeName; private final TypeInfo typeInfo; @@ -223,6 +267,9 @@ private static TypeSignature getTypeSignature(TypeInfo typeInfo) switch (typeInfo.getCategory()) { case PRIMITIVE: Type primitiveType = getPrimitiveType((PrimitiveTypeInfo) typeInfo); + if (primitiveType == null && typeInfo.getTypeName().equals(UUID)) { + return UuidType.UUID.getTypeSignature(); + } if (primitiveType == null) { break; } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveTypeTranslator.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveTypeTranslator.java index 20a87260dce9b..c973f12330f4b 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveTypeTranslator.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveTypeTranslator.java @@ -40,6 +40,7 @@ import static com.facebook.presto.common.type.SmallintType.SMALLINT; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TinyintType.TINYINT; +import static com.facebook.presto.common.type.UuidType.UUID; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.hive.HiveType.HIVE_BINARY; import static com.facebook.presto.hive.HiveType.HIVE_BOOLEAN; @@ -52,6 +53,7 @@ import static com.facebook.presto.hive.HiveType.HIVE_SHORT; import static com.facebook.presto.hive.HiveType.HIVE_STRING; import static com.facebook.presto.hive.HiveType.HIVE_TIMESTAMP; +import static com.facebook.presto.hive.HiveType.HIVE_UUID; import static com.facebook.presto.hive.metastore.MetastoreUtil.isArrayType; import static com.facebook.presto.hive.metastore.MetastoreUtil.isMapType; import static com.facebook.presto.hive.metastore.MetastoreUtil.isRowType; @@ -91,6 +93,9 @@ public TypeInfo translate(Type type, Optional defaultHiveType) if (DOUBLE.equals(type)) { return HIVE_DOUBLE.getTypeInfo(); } + if (UUID.equals(type)) { + return HIVE_UUID.getTypeInfo(); + } if (type instanceof VarcharType) { VarcharType varcharType = (VarcharType) type; int varcharLength = varcharType.getLength(); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java index a66707acbec82..3f05a48d1219a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java @@ -31,6 +31,7 @@ import com.facebook.presto.common.type.TimeType; import com.facebook.presto.common.type.TimestampType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.common.type.VarbinaryType; import com.facebook.presto.common.type.VarcharType; import com.google.common.base.VerifyException; @@ -41,6 +42,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.UUID; import static com.facebook.presto.common.predicate.Marker.Bound.ABOVE; import static com.facebook.presto.common.predicate.Marker.Bound.BELOW; @@ -220,6 +222,12 @@ private static Object getIcebergLiteralValue(Type type, Marker marker) return new BigDecimal(Decimals.decodeUnscaledValue((Slice) value), decimalType.getScale()); } + if (type instanceof UuidType) { + UuidType uuidType = (UuidType) type; + return marker.getValueBlock() + .map(block -> UUID.fromString((String) uuidType.getObjectValue(null, block, 0))) + .orElseThrow(NullPointerException::new); + } return marker.getValue(); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java index cd05290c12b35..b08f9c116e0c3 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java @@ -33,6 +33,7 @@ import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.common.type.TypeSignature; import com.facebook.presto.common.type.TypeSignatureParameter; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.common.type.VarbinaryType; import com.facebook.presto.common.type.VarcharType; import com.facebook.presto.hive.HiveType; @@ -120,6 +121,8 @@ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager return TimestampType.TIMESTAMP; case STRING: return VarcharType.createUnboundedVarcharType(); + case UUID: + return UuidType.UUID; case LIST: Types.ListType listType = (Types.ListType) type; return new ArrayType(toPrestoType(listType.elementType(), typeManager)); @@ -203,6 +206,9 @@ public static org.apache.iceberg.types.Type toIcebergType(Type type) if (type instanceof TimestampWithTimeZoneType) { return Types.TimestampType.withZone(); } + if (type instanceof UuidType) { + return Types.UUIDType.get(); + } throw new PrestoException(NOT_SUPPORTED, "Type not supported for Iceberg: " + type.getDisplayName()); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index a9f17f3a0f167..a20bc099fb0cb 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -88,6 +88,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.time.LocalDateTime; import java.time.LocalTime; @@ -95,6 +96,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -134,6 +136,7 @@ import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; @@ -1596,20 +1599,20 @@ public void testMetadataVersionsMaintainingProperties() } } - @DataProvider(name = "decimalVectorReader") - public Object[] decimalVectorReader() + @DataProvider(name = "batchReadEnabled") + public Object[] batchReadEnabledReader() { return new Object[] {true, false}; } - private Session decimalVectorReaderEnabledSession(boolean decimalVectorReaderEnabled) + private Session batchReadEnabledEnabledSession(boolean batchReadEnabled) { return Session.builder(getQueryRunner().getDefaultSession()) - .setCatalogSessionProperty(ICEBERG_CATALOG, PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, String.valueOf(decimalVectorReaderEnabled)) + .setCatalogSessionProperty(ICEBERG_CATALOG, PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, String.valueOf(batchReadEnabled)) .build(); } - @Test(dataProvider = "decimalVectorReader") + @Test(dataProvider = "batchReadEnabled") public void testDecimal(boolean decimalVectorReaderEnabled) { String tableName = "test_decimal_vector_reader"; @@ -1625,7 +1628,7 @@ public void testDecimal(boolean decimalVectorReaderEnabled) // Insert data to table assertUpdate("INSERT INTO " + tableName + values, 4); - Session session = decimalVectorReaderEnabledSession(decimalVectorReaderEnabled); + Session session = batchReadEnabledEnabledSession(decimalVectorReaderEnabled); assertQuery(session, "SELECT * FROM " + tableName, values); } finally { @@ -1701,6 +1704,7 @@ public void testAllIcebergType() " c_timestamp TIMESTAMP, " + " c_varchar VARCHAR, " + " c_varbinary VARBINARY, " + + " c_uuid UUID, " + " c_array ARRAY(BIGINT), " + " c_map MAP(VARCHAR, INT), " + " c_row ROW(a INT, b VARCHAR) " + @@ -1708,16 +1712,16 @@ public void testAllIcebergType() assertUpdate(format("" + "INSERT INTO %s " + - "SELECT c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_array, c_map, c_row " + + "SELECT c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_uuid, c_array, c_map, c_row " + "FROM ( " + " VALUES " + - " (null, null, null, null, null, null, null, null, null, null, null, null), " + - " (true, INT '1245', BIGINT '1', DOUBLE '2.2', REAL '-24.124', DATE '2024-07-29', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), sequence(0, 10), MAP(ARRAY['aaa', 'bbbb'], ARRAY[1, 2]), CAST(ROW(1, 'AAA') AS ROW(a INT, b VARCHAR)))," + - " (false, INT '-1245', BIGINT '-1', DOUBLE '2.3', REAL '243215.435', DATE '2024-07-29', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), sequence(30, 35), MAP(ARRAY['ccc', 'bbbb'], ARRAY[-1, -2]), CAST(ROW(-1, 'AAA') AS ROW(a INT, b VARCHAR))) " + - ") AS x (c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_array, c_map, c_row)", tmpTableName), 3); + " (null, null, null, null, null, null, null, null, null, null, null, null, null), " + + " (true, INT '1245', BIGINT '1', DOUBLE '2.2', REAL '-24.124', DATE '2024-07-29', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), CAST('4ae71336-e44b-39bf-b9d2-752e234818a5' as UUID), sequence(0, 10), MAP(ARRAY['aaa', 'bbbb'], ARRAY[1, 2]), CAST(ROW(1, 'AAA') AS ROW(a INT, b VARCHAR)))," + + " (false, INT '-1245', BIGINT '-1', DOUBLE '2.3', REAL '243215.435', DATE '2024-07-29', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), CAST('4ae71336-e44b-39bf-b9d2-752e234818a5' as UUID), sequence(30, 35), MAP(ARRAY['ccc', 'bbbb'], ARRAY[-1, -2]), CAST(ROW(-1, 'AAA') AS ROW(a INT, b VARCHAR))) " + + ") AS x (c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_uuid, c_array, c_map, c_row)", tmpTableName), 3); - Session decimalVectorReaderEnabled = decimalVectorReaderEnabledSession(true); - Session decimalVectorReaderDisable = decimalVectorReaderEnabledSession(false); + Session decimalVectorReaderEnabled = batchReadEnabledEnabledSession(true); + Session decimalVectorReaderDisable = batchReadEnabledEnabledSession(false); assertQueryWithSameQueryRunner(decimalVectorReaderEnabled, "SELECT * FROM " + tmpTableName, decimalVectorReaderDisable); } finally { @@ -1886,6 +1890,64 @@ public void testStatisticsFileCache() getQueryRunner().execute("DROP TABLE test_statistics_file_cache"); } + @Test(dataProvider = "batchReadEnabled") + public void testUuidRoundTrip(boolean batchReadEnabled) + { + Session session = batchReadEnabledEnabledSession(batchReadEnabled); + try { + assertQuerySucceeds("CREATE TABLE uuid_roundtrip(u uuid)"); + UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555"); + assertUpdate(format("INSERT INTO uuid_roundtrip VALUES CAST('%s' as uuid)", uuid), 1); + assertQuery(session, "SELECT CAST(u as varchar) FROM uuid_roundtrip", format("VALUES '%s'", uuid)); + } + finally { + assertQuerySucceeds("DROP TABLE uuid_roundtrip"); + } + } + + @Test(dataProvider = "batchReadEnabled") + public void testUuidFilters(boolean batchReadEnabled) + { + Session session = batchReadEnabledEnabledSession(batchReadEnabled); + try { + int uuidCount = 100; + assertQuerySucceeds("CREATE TABLE uuid_filters(u uuid)"); + List uuids = IntStream.range(0, uuidCount) + .mapToObj(idx -> { + ByteBuffer buf = ByteBuffer.allocate(16); + if (idx % 2 == 0) { + buf.putLong(0L); + buf.putLong(idx); + } + else { + buf.putLong(idx); + buf.putLong(0L); + } + buf.flip(); + return new UUID(buf.getLong(), buf.getLong()); + }) + .collect(Collectors.toList()); + // shuffle to make sure parquet metadata stats are updated properly even with + // out-of-order values + Collections.shuffle(uuids); + assertUpdate(format("INSERT INTO uuid_filters VALUES %s", + Joiner.on(", ").join(uuids.stream().map(uuid -> format("CAST('%s' as uuid)", uuid)).iterator())), 100); + assertQuery(session, format("SELECT CAST(u as varchar) FROM uuid_filters WHERE u = CAST('%s' as uuid)", uuids.get(0)), format("VALUES '%s'", uuids.get(0))); + + // sort so we can easily get lowest and highest + uuids.sort(Comparator.naturalOrder()); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u >= CAST('%s' as uuid)", uuids.get(0)), format("VALUES %d", uuidCount)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u > CAST('%s' as uuid)", uuids.get(0)), format("VALUES %d", uuidCount - 1)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u <= CAST('%s' as uuid)", uuids.get(uuidCount - 1)), format("VALUES %d", uuidCount)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u < CAST('%s' as uuid)", uuids.get(uuidCount - 1)), format("VALUES %d", uuidCount - 1)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u < CAST('%s' as uuid)", uuids.get(50)), format("VALUES %d", 50)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u <= CAST('%s' as uuid)", uuids.get(50)), format("VALUES %d", 51)); + } + finally { + assertQuerySucceeds("DROP TABLE uuid_filters"); + } + } + @DataProvider(name = "parquetVersions") public Object[][] parquetVersionsDataProvider() { @@ -1939,7 +2001,7 @@ private void writePositionDeleteToNationTable(Table icebergTable, String dataFil Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile(); org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metastoreDir.toURI()); - String deleteFileName = "delete_file_" + UUID.randomUUID(); + String deleteFileName = "delete_file_" + randomUUID(); FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir); org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(metadataDir, deleteFileName); PositionDeleteWriter writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs)) @@ -1971,7 +2033,7 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map new PrestoException(NOT_SUPPORTED, " type FIXED_LEN_BYTE_ARRAY supported as DECIMAL; got " + descriptor.getPrimitiveType().getOriginalType())); default: - throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getType()); + throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getPrimitiveType().getPrimitiveTypeName()); } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java index c522663f4d8dc..0ca54e572e1f1 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java @@ -40,6 +40,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.getOnlyElement; import static java.util.stream.Collectors.joining; +import static org.apache.parquet.schema.LogicalTypeAnnotation.uuidType; import static org.apache.parquet.schema.OriginalType.DECIMAL; import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; import static org.apache.parquet.schema.OriginalType.TIME_MICROS; @@ -353,6 +354,11 @@ public static boolean isShortDecimalType(ColumnDescriptor descriptor) return decimalLogicalTypeAnnotation.getPrecision() <= MAX_SHORT_PRECISION; } + public static boolean isUuidType(ColumnDescriptor columnDescriptor) + { + return uuidType().equals(columnDescriptor.getPrimitiveType().getLogicalTypeAnnotation()); + } + public static boolean isDecimalType(ColumnDescriptor columnDescriptor) { return columnDescriptor.getPrimitiveType().getOriginalType() == DECIMAL; diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java index 480f0892fbd77..9ad8a5b64af4e 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java @@ -29,6 +29,13 @@ public static int getInt(byte[] byteBuffer, int offset) return (ch3 << 24) + (ch2 << 16) + (ch1 << 8) + ch0; } + /** + * Convert a little-endian formatted value in a bytearray to a long. + * + * @param byteBuffer source bytes + * @param offset offset in byte array to start decoding + * @return long value + */ public static long getLong(byte[] byteBuffer, int offset) { int ch0 = byteBuffer[offset + 0]; @@ -50,6 +57,34 @@ public static long getLong(byte[] byteBuffer, int offset) ((long) (ch0 & 255) << 0); } + /** + * Convert a big-endian formatted value in a bytearray to a long. + * + * @param byteBuffer source bytes + * @param offset offset in byte array to start decoding + * @return long value + */ + public static long getLongBigEndian(byte[] byteBuffer, int offset) + { + byte ch0 = byteBuffer[offset + 0]; + byte ch1 = byteBuffer[offset + 1]; + byte ch2 = byteBuffer[offset + 2]; + byte ch3 = byteBuffer[offset + 3]; + byte ch4 = byteBuffer[offset + 4]; + byte ch5 = byteBuffer[offset + 5]; + byte ch6 = byteBuffer[offset + 6]; + byte ch7 = byteBuffer[offset + 7]; + + return ((long) (ch0 & 255) << 56) + + ((long) (ch1 & 255) << 48) + + ((long) (ch2 & 255) << 40) + + ((long) (ch3 & 255) << 32) + + ((long) (ch4 & 255) << 24) + + ((long) (ch5 & 255) << 16) + + ((long) (ch6 & 255) << 8) + + ((long) (ch7 & 255) << 0); + } + public static void unpack8Values(byte inByte, byte[] out, int outPos) { out[0 + outPos] = (byte) (inByte & 1); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/UuidFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/UuidFlatBatchReader.java new file mode 100644 index 0000000000000..bf3cdf864bb54 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/UuidFlatBatchReader.java @@ -0,0 +1,260 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader; + +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.Int128ArrayBlock; +import com.facebook.presto.common.block.RunLengthEncodedBlock; +import com.facebook.presto.parquet.ColumnReader; +import com.facebook.presto.parquet.DataPage; +import com.facebook.presto.parquet.DictionaryPage; +import com.facebook.presto.parquet.Field; +import com.facebook.presto.parquet.RichColumnDescriptor; +import com.facebook.presto.parquet.batchreader.decoders.Decoders.FlatDecoders; +import com.facebook.presto.parquet.batchreader.decoders.FlatDefinitionLevelDecoder; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; +import com.facebook.presto.parquet.batchreader.dictionary.Dictionaries; +import com.facebook.presto.parquet.dictionary.Dictionary; +import com.facebook.presto.parquet.reader.ColumnChunk; +import com.facebook.presto.parquet.reader.PageReader; +import com.facebook.presto.spi.PrestoException; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; +import org.apache.parquet.io.ParquetDecodingException; +import org.openjdk.jol.info.ClassLayout; + +import java.io.IOException; +import java.util.Optional; + +import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_IO_READ_ERROR; +import static com.facebook.presto.parquet.batchreader.decoders.Decoders.readFlatPage; +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Math.min; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class UuidFlatBatchReader + implements ColumnReader +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(UuidFlatBatchReader.class).instanceSize(); + + private final RichColumnDescriptor columnDescriptor; + + protected Field field; + protected int nextBatchSize; + protected FlatDefinitionLevelDecoder definitionLevelDecoder; + protected UuidValuesDecoder valuesDecoder; + protected int remainingCountInPage; + + private Dictionary dictionary; + private int readOffset; + private PageReader pageReader; + + public UuidFlatBatchReader(RichColumnDescriptor columnDescriptor) + { + this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null"); + } + + @Override + public boolean isInitialized() + { + return pageReader != null && field != null; + } + + @Override + public void init(PageReader pageReader, Field field, RowRanges rowRanges) + { + checkArgument(!isInitialized(), "Parquet batch reader already initialized"); + this.pageReader = requireNonNull(pageReader, "pageReader is null"); + checkArgument(pageReader.getValueCountInColumnChunk() > 0, "page is empty"); + this.field = requireNonNull(field, "field is null"); + + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + if (dictionaryPage != null) { + dictionary = Dictionaries.createDictionary(columnDescriptor, dictionaryPage); + } + } + + @Override + public void prepareNextRead(int batchSize) + { + readOffset = readOffset + nextBatchSize; + nextBatchSize = batchSize; + } + + @Override + public ColumnChunk readNext() + { + ColumnChunk columnChunk = null; + try { + seek(); + if (field.isRequired()) { + columnChunk = readWithoutNull(); + } + else { + columnChunk = readWithNull(); + } + } + catch (IOException exception) { + throw new PrestoException(PARQUET_IO_READ_ERROR, "Error reading Parquet column " + columnDescriptor, exception); + } + + readOffset = 0; + nextBatchSize = 0; + return columnChunk; + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + (definitionLevelDecoder == null ? 0 : definitionLevelDecoder.getRetainedSizeInBytes()) + + (valuesDecoder == null ? 0 : valuesDecoder.getRetainedSizeInBytes()) + + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + + (pageReader == null ? 0 : pageReader.getRetainedSizeInBytes()); + } + + protected boolean readNextPage() + { + definitionLevelDecoder = null; + valuesDecoder = null; + remainingCountInPage = 0; + + DataPage page = pageReader.readPage(); + if (page == null) { + return false; + } + + FlatDecoders flatDecoders = readFlatPage(page, columnDescriptor, dictionary); + definitionLevelDecoder = flatDecoders.getDefinitionLevelDecoder(); + valuesDecoder = (UuidValuesDecoder) flatDecoders.getValuesDecoder(); + + remainingCountInPage = page.getValueCount(); + return true; + } + + private ColumnChunk readWithNull() + throws IOException + { + long[] values = new long[nextBatchSize * 2]; + boolean[] isNull = new boolean[nextBatchSize]; + + int totalNonNullCount = 0; + int remainingInBatch = nextBatchSize; + int startOffset = 0; + while (remainingInBatch > 0) { + if (remainingCountInPage == 0) { + if (!readNextPage()) { + break; + } + } + + int chunkSize = min(remainingCountInPage, remainingInBatch); + int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize); + totalNonNullCount += nonNullCount; + + if (nonNullCount > 0) { + valuesDecoder.readNext(values, startOffset, nonNullCount); + + int valueDestinationIndex = startOffset + chunkSize - 1; + int valueSourceIndex = startOffset + nonNullCount - 1; + + while (valueDestinationIndex >= startOffset) { + if (!isNull[valueDestinationIndex]) { + values[valueDestinationIndex * 2 + 1] = values[valueSourceIndex * 2 + 1]; + values[valueDestinationIndex * 2] = values[valueSourceIndex * 2]; + valueSourceIndex--; + } + valueDestinationIndex--; + } + } + + startOffset += chunkSize; + remainingInBatch -= chunkSize; + remainingCountInPage -= chunkSize; + } + + if (remainingInBatch != 0) { + throw new ParquetDecodingException("Still remaining to be read in current batch."); + } + + if (totalNonNullCount == 0) { + Block block = RunLengthEncodedBlock.create(field.getType(), null, nextBatchSize); + return new ColumnChunk(block, new int[0], new int[0]); + } + + boolean hasNoNull = totalNonNullCount == nextBatchSize; + Block block = new Int128ArrayBlock(nextBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values); + return new ColumnChunk(block, new int[0], new int[0]); + } + + private ColumnChunk readWithoutNull() + throws IOException + { + long[] values = new long[nextBatchSize * 2]; + int remainingInBatch = nextBatchSize; + int startOffset = 0; + while (remainingInBatch > 0) { + if (remainingCountInPage == 0) { + if (!readNextPage()) { + break; + } + } + + int chunkSize = min(remainingCountInPage, remainingInBatch); + + valuesDecoder.readNext(values, startOffset, chunkSize); + startOffset += chunkSize; + remainingInBatch -= chunkSize; + remainingCountInPage -= chunkSize; + } + + if (remainingInBatch != 0) { + throw new ParquetDecodingException(format("Corrupted Parquet file: extra %d values to be consumed when scanning current batch", remainingInBatch)); + } + + Block block = new Int128ArrayBlock(nextBatchSize, Optional.empty(), values); + return new ColumnChunk(block, new int[0], new int[0]); + } + + private void seek() + throws IOException + { + if (readOffset == 0) { + return; + } + + int remainingInBatch = readOffset; + int startOffset = 0; + while (remainingInBatch > 0) { + if (remainingCountInPage == 0) { + if (!readNextPage()) { + break; + } + } + + int chunkSize = min(remainingCountInPage, remainingInBatch); + int skipSize = chunkSize; + if (!columnDescriptor.isRequired()) { + boolean[] isNull = new boolean[readOffset]; + int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize); + skipSize = nonNullCount; + startOffset += chunkSize; + } + valuesDecoder.skip(skipSize); + remainingInBatch -= chunkSize; + remainingCountInPage -= chunkSize; + } + } +} + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java index f2fc89c300093..e51cf0c00fdea 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java @@ -23,6 +23,7 @@ import com.facebook.presto.parquet.batchreader.decoders.delta.BinaryShortDecimalDeltaValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.FixedLenByteArrayLongDecimalDeltaValueDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.FixedLenByteArrayShortDecimalDeltaValueDecoder; +import com.facebook.presto.parquet.batchreader.decoders.delta.FixedLenByteArrayUuidDeltaValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.Int32DeltaBinaryPackedValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.Int32ShortDecimalDeltaValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.Int64DeltaBinaryPackedValuesDecoder; @@ -34,6 +35,7 @@ import com.facebook.presto.parquet.batchreader.decoders.plain.BooleanPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayLongDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayShortDecimalPlainValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayUuidPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int32PlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int32ShortDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int64PlainValuesDecoder; @@ -49,6 +51,7 @@ import com.facebook.presto.parquet.batchreader.decoders.rle.LongDecimalRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.ShortDecimalRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.TimestampRLEDictionaryValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.UuidRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.dictionary.BinaryBatchDictionary; import com.facebook.presto.parquet.batchreader.dictionary.TimestampDictionary; import com.facebook.presto.parquet.dictionary.Dictionary; @@ -81,6 +84,7 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.isShortDecimalType; import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeMicrosType; import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeStampMicrosType; +import static com.facebook.presto.parquet.ParquetTypeUtils.isUuidType; import static com.facebook.presto.parquet.ValuesType.VALUES; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; @@ -153,6 +157,10 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript int typeLength = columnDescriptor.getPrimitiveType().getTypeLength(); return new FixedLenByteArrayLongDecimalPlainValuesDecoder(typeLength, buffer, offset, length); } + else if (isUuidType(columnDescriptor)) { + int typeLength = columnDescriptor.getPrimitiveType().getTypeLength(); + return new FixedLenByteArrayUuidPlainValuesDecoder(typeLength, buffer, offset, length); + } default: throw new PrestoException(PARQUET_UNSUPPORTED_COLUMN_TYPE, format("Column: %s, Encoding: %s", columnDescriptor, encoding)); } @@ -199,6 +207,9 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript } return new LongDecimalRLEDictionaryValuesDecoder(bitWidth, inputStream, (BinaryBatchDictionary) dictionary); } + else if (isUuidType(columnDescriptor)) { + return new UuidRLEDictionaryValuesDecoder(bitWidth, inputStream, (BinaryBatchDictionary) dictionary); + } default: throw new PrestoException(PARQUET_UNSUPPORTED_COLUMN_TYPE, format("Column: %s, Encoding: %s", columnDescriptor, encoding)); } @@ -256,6 +267,11 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript return new FixedLenByteArrayLongDecimalDeltaValueDecoder(parquetReader); } + else if (isUuidType(columnDescriptor)) { + ByteBufferInputStream inputStream = ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, offset, length)); + ValuesReader parquetReader = getParquetReader(encoding, columnDescriptor, valueCount, inputStream); + return new FixedLenByteArrayUuidDeltaValuesDecoder(parquetReader); + } } throw new PrestoException(PARQUET_UNSUPPORTED_ENCODING, format("Column: %s, Encoding: %s", columnDescriptor, encoding)); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java index b5468c82a98f0..b3944cfe9cd76 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java @@ -102,5 +102,15 @@ void skip(int length) throws IOException; } + interface UuidValuesDecoder + extends ValuesDecoder + { + void readNext(long[] values, int offset, int length) + throws IOException; + + void skip(int length) + throws IOException; + } + public long getRetainedSizeInBytes(); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayUuidDeltaValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayUuidDeltaValuesDecoder.java new file mode 100644 index 0000000000000..84180bb455f22 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayUuidDeltaValuesDecoder.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.delta; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; +import org.apache.parquet.column.values.ValuesReader; +import org.openjdk.jol.info.ClassLayout; + +import static com.facebook.presto.parquet.batchreader.BytesUtils.getLongBigEndian; +import static java.util.Objects.requireNonNull; + +/** + * Note: this is not an optimized values decoder. It makes use of the existing Parquet decoder. Given that this type encoding + * is not a common one, just use the existing one provided by Parquet library and add a wrapper around it that satisfies the + * {@link UuidValuesDecoder} interface. + */ +public class FixedLenByteArrayUuidDeltaValuesDecoder + implements UuidValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(FixedLenByteArrayUuidDeltaValuesDecoder.class).instanceSize(); + + private final ValuesReader delegate; + + public FixedLenByteArrayUuidDeltaValuesDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void readNext(long[] values, int offset, int length) + { + int endOffset = (offset + length) * 2; + for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) { + byte[] inputBytes = delegate.readBytes().getBytes(); + values[currentOutputOffset] = getLongBigEndian(inputBytes, 0); + values[currentOutputOffset + 1] = getLongBigEndian(inputBytes, Long.BYTES); + } + } + + @Override + public void skip(int length) + { + delegate.skip(length); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayUuidPlainValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayUuidPlainValuesDecoder.java new file mode 100644 index 0000000000000..296021588e442 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayUuidPlainValuesDecoder.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.plain; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; +import org.openjdk.jol.info.ClassLayout; + +import java.nio.ByteBuffer; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.sizeOf; +import static java.nio.ByteOrder.BIG_ENDIAN; + +public class FixedLenByteArrayUuidPlainValuesDecoder + implements UuidValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(FixedLenByteArrayUuidPlainValuesDecoder.class).instanceSize(); + + private final int typeLength; + private final int bufferEnd; + private final ByteBuffer buffer; + + public FixedLenByteArrayUuidPlainValuesDecoder(int typeLength, byte[] buffer, int baseOffset, int length) + { + checkArgument(typeLength == 16, "typeLength %s should be 16 for a UUID", typeLength); + this.typeLength = typeLength; + this.buffer = ByteBuffer.wrap(buffer).order(BIG_ENDIAN); + this.buffer.position(baseOffset); + this.bufferEnd = baseOffset + length; + } + + /** + * @param values array containing decoded values + * @param offset offset to start writing in the values argument + * @param length number of values to write + */ + @Override + public void readNext(long[] values, int offset, int length) + { + int readEndOffset = (offset + length) * 2; + + for (int currentOutputOffset = offset * 2; currentOutputOffset < readEndOffset; currentOutputOffset += 2) { + values[currentOutputOffset] = buffer.getLong(); + values[currentOutputOffset + 1] = buffer.getLong(); + } + } + + @Override + public void skip(int length) + { + checkArgument(buffer.position() + (length * typeLength) <= bufferEnd, "End of stream: invalid read request"); + checkArgument(length >= 0, "invalid length %s", length); + buffer.position(buffer.position() + (length * typeLength)); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + sizeOf(buffer.array()); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java new file mode 100644 index 0000000000000..30fdc0e93d77a --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.rle; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BinaryValuesDecoder.ValueBuffer; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; +import com.facebook.presto.parquet.batchreader.dictionary.BinaryBatchDictionary; + +import java.io.IOException; +import java.io.InputStream; + +import static com.facebook.presto.parquet.batchreader.BytesUtils.getLongBigEndian; +import static java.util.Objects.requireNonNull; + +public class UuidRLEDictionaryValuesDecoder + extends BaseRLEBitPackedDecoder + implements UuidValuesDecoder +{ + private final BinaryRLEDictionaryValuesDecoder delegate; + + public UuidRLEDictionaryValuesDecoder(int bitWidth, InputStream inputStream, BinaryBatchDictionary dictionary) + { + super(Integer.MAX_VALUE, bitWidth, inputStream); + requireNonNull(dictionary, "dictionary is null"); + delegate = new BinaryRLEDictionaryValuesDecoder(bitWidth, inputStream, dictionary); + } + + @Override + public void readNext(long[] values, int offset, int length) + throws IOException + { + ValueBuffer valueBuffer = delegate.readNext(length); + int bufferSize = valueBuffer.getBufferSize(); + byte[] byteBuffer = new byte[bufferSize]; + int[] offsets = new int[length + 1]; + delegate.readIntoBuffer(byteBuffer, 0, offsets, 0, valueBuffer); + + for (int i = 0; i < length; i++) { + int positionOffset = offsets[i]; + values[(i + offset) * 2] = getLongBigEndian(byteBuffer, positionOffset); + values[((i + offset) * 2) + 1] = getLongBigEndian(byteBuffer, positionOffset + Long.BYTES); + } + } + + @Override + public void skip(int length) + throws IOException + { + delegate.skip(length); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java index 8af920d74d7b8..828fe24441430 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java @@ -373,14 +373,15 @@ private static void readTypeSchema(Types.GroupBuilder builder, Iterator annotationWithParams = getLogicalTypeAnnotationWithParameters(type); - annotationWithParams.ifPresent(typeBuilder::as); + getLogicalTypeAnnotationWithParameters(element.getLogicalType()) + .ifPresent(typeBuilder::as); } + else if (element.isSetConverted_type()) { + typeBuilder.as(getOriginalType(element.converted_type)); + } + if (element.isSetField_id()) { typeBuilder.id(element.field_id); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/BinaryColumnReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/BinaryColumnReader.java index abbd835a40ca9..7d18a7ffdea25 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/BinaryColumnReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/BinaryColumnReader.java @@ -15,6 +15,7 @@ import com.facebook.presto.common.block.BlockBuilder; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.parquet.RichColumnDescriptor; import io.airlift.slice.Slice; import org.apache.parquet.io.api.Binary; @@ -23,6 +24,7 @@ import static com.facebook.presto.common.type.Chars.truncateToLengthAndTrimSpaces; import static com.facebook.presto.common.type.Varchars.isVarcharType; import static com.facebook.presto.common.type.Varchars.truncateToLength; +import static com.facebook.presto.parquet.batchreader.BytesUtils.getLongBigEndian; import static io.airlift.slice.Slices.EMPTY_SLICE; import static io.airlift.slice.Slices.wrappedBuffer; @@ -40,6 +42,15 @@ protected void readValue(BlockBuilder blockBuilder, Type type) if (definitionLevel == columnDescriptor.getMaxDefinitionLevel()) { Binary binary = valuesReader.readBytes(); Slice value; + + if (type instanceof UuidType) { + byte[] src = binary.getBytes(); + blockBuilder.writeLong(getLongBigEndian(src, 0)); + blockBuilder.writeLong(getLongBigEndian(src, Long.BYTES)); + blockBuilder.closeEntry(); + return; + } + if (binary.length() == 0) { value = EMPTY_SLICE; } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java index 5c846552d43ee..f46f9de26201b 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java @@ -20,6 +20,7 @@ import com.facebook.presto.common.type.RealType; import com.facebook.presto.common.type.RowType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.common.type.VarbinaryType; import com.facebook.presto.common.type.VarcharType; import com.facebook.presto.spi.PrestoException; @@ -145,6 +146,13 @@ else if (decimalType.isShort()) { if (type instanceof VarcharType || type instanceof CharType || type instanceof VarbinaryType) { return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition).named(name); } + if (type instanceof UuidType) { + LogicalTypeAnnotation logicalTypeAnnotation = LogicalTypeAnnotation.uuidType(); + Types.PrimitiveBuilder builder = Types.primitive(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .length(16); + builder = builder.as(logicalTypeAnnotation); + return builder.named(name); + } throw new PrestoException(NOT_SUPPORTED, format("Unsupported primitive type: %s", type)); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java index 318f7b68bc6d2..a6b152381622c 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java @@ -29,6 +29,7 @@ import com.facebook.presto.parquet.writer.valuewriter.RealValueWriter; import com.facebook.presto.parquet.writer.valuewriter.TimeValueWriter; import com.facebook.presto.parquet.writer.valuewriter.TimestampValueWriter; +import com.facebook.presto.parquet.writer.valuewriter.UuidValuesWriter; import com.facebook.presto.spi.PrestoException; import com.google.common.collect.ImmutableList; import org.apache.parquet.column.ColumnDescriptor; @@ -57,6 +58,7 @@ import static com.facebook.presto.common.type.TimeType.TIME; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TinyintType.TINYINT; +import static com.facebook.presto.common.type.UuidType.UUID; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -212,6 +214,9 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Ty if (TIME.equals(type)) { return new TimeValueWriter(valuesWriter, type, parquetType); } + if (UUID.equals(type)) { + return new UuidValuesWriter(valuesWriter, parquetType); + } if (type instanceof VarcharType || type instanceof CharType || type instanceof VarbinaryType) { return new CharValueWriter(valuesWriter, type, parquetType); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/UuidValuesWriter.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/UuidValuesWriter.java new file mode 100644 index 0000000000000..eefe981d9281f --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/UuidValuesWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.writer.valuewriter; + +import com.facebook.presto.common.block.Block; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; + +@NotThreadSafe +public class UuidValuesWriter + extends PrimitiveValueWriter +{ + private final ByteBuffer writeBuffer = ByteBuffer.allocate(2 * SIZE_OF_LONG) + .order(ByteOrder.BIG_ENDIAN); + + public UuidValuesWriter(ValuesWriter valuesWriter, PrimitiveType parquetType) + { + super(parquetType, valuesWriter); + } + + @Override + public void write(Block block) + { + for (int i = 0; i < block.getPositionCount(); i++) { + if (!block.isNull(i)) { + writeBuffer.clear(); + writeBuffer.putLong(block.getLong(i, 0)); + writeBuffer.putLong(block.getLong(i, SIZE_OF_LONG)); + writeBuffer.flip(); + Binary data = Binary.fromReusedByteBuffer(writeBuffer); + getValueWriter().writeBytes(data); + getStatistics().updateStats(data); + } + } + } +} diff --git a/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd b/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd index 7eadd8be3f710..a72c9e8a54349 100644 --- a/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd +++ b/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd @@ -36,5 +36,11 @@ valuesDecoder: "LongDecimalValuesDecoder", primitiveType: "long" }, + { + classNamePrefix: "Uuid", + blockType: "Int128ArrayBlock", + valuesDecoder: "UuidValuesDecoder", + primitiveType: "long" + }, ] } diff --git a/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd b/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd index 0b11669d8026b..0bbef5592ba7e 100644 --- a/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd +++ b/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd @@ -4,7 +4,7 @@ <#assign updatedTemplate = updatedTemplate?replace("com.facebook.presto.common.block.IntArrayBlock", "com.facebook.presto.common.block.${type.blockType}")> <#assign updatedTemplate = updatedTemplate?replace("IntArrayBlock", "${type.blockType}")> <#assign updatedTemplate = updatedTemplate?replace("Int32ValuesDecoder", "${type.valuesDecoder}")> -<#if !type.classNamePrefix?starts_with("LongDecimal")> +<#if !type.classNamePrefix?starts_with("LongDecimal") && !type.classNamePrefix?starts_with("Uuid")> <#assign updatedTemplate = updatedTemplate?replace("int[] values = new int[nextBatchSize]", "${type.primitiveType}[] values = new ${type.primitiveType}[nextBatchSize]")> <#else> <#assign updatedTemplate = updatedTemplate?replace("int[] values = new int[nextBatchSize]", "${type.primitiveType}[] values = new ${type.primitiveType}[nextBatchSize * 2]")> diff --git a/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd b/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd index f19a2140d7d45..6ef04c4ec4981 100644 --- a/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd +++ b/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd @@ -1,5 +1,5 @@ <#list parquetTypes.flatTypes as type> -<#if !type.classNamePrefix?ends_with("Decimal")> +<#if !type.classNamePrefix?ends_with("Decimal") && !type.classNamePrefix?starts_with("Uuid")> <@pp.changeOutputFile name="/com/facebook/presto/parquet/batchreader/${type.classNamePrefix}NestedBatchReader.java" /> <#assign updatedTemplate = nestedTypeTemplate?replace("Int32NestedBatchReader", "${type.classNamePrefix}NestedBatchReader")> <#assign updatedTemplate = updatedTemplate?replace("com.facebook.presto.common.block.IntArrayBlock", "com.facebook.presto.common.block.${type.blockType}")> diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestParquetUtils.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestParquetUtils.java index cea13b18a9da4..9168934f92133 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestParquetUtils.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestParquetUtils.java @@ -144,6 +144,16 @@ public static byte[] generatePlainValuesPage(int valueCount, int valueSizeBits, } break; } + case 128: + for (int i = 0; i < valueCount; i++) { + long value = random.nextLong(); + writer.writeLong(value); + addedValues.add(value); + value = random.nextLong(); + writer.writeLong(value); + addedValues.add(value); + } + break; default: throw new IllegalArgumentException("invalid value size (expected: 4, 8 or 12)"); } diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java index 35f356f6010c1..c94a68e8d19e3 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java @@ -21,8 +21,10 @@ import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64ValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.TimestampValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.BinaryPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.BooleanPlainValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayUuidPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int32PlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int32ShortDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int64PlainValuesDecoder; @@ -36,6 +38,7 @@ import com.facebook.presto.parquet.batchreader.decoders.rle.Int64RLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.TimestampRLEDictionaryValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.UuidRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.dictionary.BinaryBatchDictionary; import com.facebook.presto.parquet.batchreader.dictionary.TimestampDictionary; import com.facebook.presto.parquet.dictionary.IntegerDictionary; @@ -55,6 +58,7 @@ import static com.facebook.presto.parquet.ParquetEncoding.PLAIN_DICTIONARY; import static com.facebook.presto.parquet.batchreader.decoders.TestParquetUtils.generateDictionaryIdPage2048; import static com.facebook.presto.parquet.batchreader.decoders.TestParquetUtils.generatePlainValuesPage; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.lang.Math.min; import static org.apache.parquet.bytes.BytesUtils.UTF8; import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; @@ -142,6 +146,39 @@ private static ShortDecimalValuesDecoder int64ShortDecimalRLE(byte[] pageBytes, return new Int64RLEDictionaryValuesDecoder(getWidthFromMaxInt(dictionarySize), new ByteArrayInputStream(pageBytes), dictionary); } + private static UuidValuesDecoder uuidPlain(byte[] pageBytes) + { + return new FixedLenByteArrayUuidPlainValuesDecoder(16, pageBytes, 0, pageBytes.length); + } + + private static UuidValuesDecoder uuidRle(byte[] pageBytes, int dictionarySize, BinaryBatchDictionary dictionary) + { + return new UuidRLEDictionaryValuesDecoder(getWidthFromMaxInt(dictionarySize), new ByteArrayInputStream(pageBytes), dictionary); + } + + private static void uuidBatchReadWithSkipHelper(int batchSize, int skipSize, int valueCount, UuidValuesDecoder decoder, List expectedValues) + throws IOException + { + long[] actualValues = new long[valueCount * 2]; + int inputOffset = 0; + int outputOffset = 0; + while (inputOffset < valueCount) { + int readBatchSize = min(batchSize, valueCount - inputOffset); + decoder.readNext(actualValues, outputOffset, readBatchSize); + + for (int i = 0; i < (readBatchSize * 2); i++) { + assertEquals(actualValues[(outputOffset * 2) + i], expectedValues.get((inputOffset * 2) + i)); + } + + inputOffset += readBatchSize; + outputOffset += readBatchSize; + + int skipBatchSize = min(skipSize, valueCount - inputOffset); + decoder.skip(skipBatchSize); + inputOffset += skipBatchSize; + } + } + private static void int32BatchReadWithSkipHelper(int batchSize, int skipSize, int valueCount, Int32ValuesDecoder decoder, List expectedValues) throws IOException { @@ -681,4 +718,66 @@ public void testInt64ShortDecimalRLE() int64ShortDecimalBatchReadWithSkipHelper(89, 29, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues); int64ShortDecimalBatchReadWithSkipHelper(1024, 1024, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues); } + + @Test + public void testUuidPlainPlain() + throws IOException + { + int valueCount = 2048; + List expectedValues = new ArrayList<>(); + + byte[] pageBytes = generatePlainValuesPage(valueCount, 128, new Random(83), expectedValues); + // page is read assuming in big endian, so we need to flip the bytes around when comparing read values + expectedValues = expectedValues.stream() + .map(Long.class::cast) + .mapToLong(Long::longValue) + .map(Long::reverseBytes) + .boxed() + .collect(toImmutableList()); + uuidBatchReadWithSkipHelper(valueCount, 0, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(29, 0, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(89, 0, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(1024, 0, valueCount, uuidPlain(pageBytes), expectedValues); + + uuidBatchReadWithSkipHelper(256, 29, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(89, 29, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(1024, 1024, valueCount, uuidPlain(pageBytes), expectedValues); + } + + @Test + public void testUuidRLEDictionary() + throws IOException + { + int valueCount = 2048; + Random random = new Random(83); + int dictionarySize = 29; + List dictionary = new ArrayList<>(); + List dictionaryIds = new ArrayList<>(); + + byte[] dictionaryPage = generatePlainValuesPage(dictionarySize, 128, random, dictionary); + byte[] dataPage = generateDictionaryIdPage2048(dictionarySize - 1, random, dictionaryIds); + + List expectedValues = new ArrayList<>(); + for (Integer dictionaryId : dictionaryIds) { + expectedValues.add(dictionary.get(dictionaryId * 2)); + expectedValues.add(dictionary.get((dictionaryId * 2) + 1)); + } + + expectedValues = expectedValues.stream() + .map(Long.class::cast) + .mapToLong(Long::longValue) + .map(Long::reverseBytes) + .boxed() + .collect(toImmutableList()); + + BinaryBatchDictionary binaryDictionary = new BinaryBatchDictionary(new DictionaryPage(Slices.wrappedBuffer(dictionaryPage), dictionarySize, PLAIN_DICTIONARY), 16); + uuidBatchReadWithSkipHelper(valueCount, 0, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(29, 0, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(89, 0, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(1024, 0, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + + uuidBatchReadWithSkipHelper(256, 29, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(89, 29, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(1024, 1024, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + } } diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java new file mode 100644 index 0000000000000..6e31031f8c818 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import com.facebook.presto.common.type.UuidType; +import com.facebook.presto.parquet.PrimitiveField; +import com.facebook.presto.parquet.RichColumnDescriptor; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; + +import java.util.Random; + +import static com.facebook.presto.parquet.reader.TestData.randomBigInteger; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; + +public class BenchmarkUuidColumnReader + extends AbstractColumnReaderBenchmark +{ + private static final int LENGTH = 2 * SIZE_OF_LONG; + + private static final Random random = new Random(1); + + @Override + protected PrimitiveField createPrimitiveField() + { + PrimitiveType parquetType = Types.optional(FIXED_LEN_BYTE_ARRAY) + .length(LENGTH) + .as(LogicalTypeAnnotation.uuidType()) + .named("name"); + + return new PrimitiveField( + UuidType.UUID, + -1, + -1, + true, + new RichColumnDescriptor(new ColumnDescriptor(new String[] {"test"}, parquetType, 0, 0), parquetType), + 0); + } + + @Override + protected ValuesWriter createValuesWriter(int bufferSize) + { + switch (parquetEncoding) { + case PLAIN: + return new FixedLenByteArrayPlainValuesWriter(LENGTH, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + case DELTA_BYTE_ARRAY: + return new DeltaByteArrayWriter(bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + default: + throw new RuntimeException("Cannot parse parquetEncoding:" + parquetEncoding); + } + } + + @Override + protected void writeValue(ValuesWriter writer, long[] batch, int index) + { + Slice slice = Slices.wrappedLongArray(batch, index * 2, 2); + writer.writeBytes(Binary.fromConstantByteArray(slice.getBytes())); + } + + @Override + protected boolean getEnableOptimizedReader() + { + return enableOptimizedReader; + } + + @Override + protected long[] generateDataBatch(int size) + { + long[] batch = new long[size * 2]; + for (int i = 0; i < size; i++) { + Slice slice = randomBigInteger(random); + batch[i * 2] = slice.getLong(0); + batch[(i * 2) + 1] = slice.getLong(SIZE_OF_LONG); + } + return batch; + } + + public static void main(String[] args) + throws Exception + { + run(BenchmarkUuidColumnReader.class); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java index 270e1460c489b..b6c6249107d99 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java @@ -48,6 +48,7 @@ import static com.facebook.presto.common.type.SmallintType.SMALLINT; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TinyintType.TINYINT; +import static com.facebook.presto.common.type.UuidType.UUID; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.io.Files.createTempDir; @@ -127,6 +128,7 @@ public static Object[][] types() {DATE, LogicalTypeAnnotation.DateLogicalTypeAnnotation.class, "INT32"}, {TIMESTAMP, LogicalTypeAnnotation.TimestampLogicalTypeAnnotation.class, "INT64"}, {MAP, LogicalTypeAnnotation.MapLogicalTypeAnnotation.class, null}, + {UUID, LogicalTypeAnnotation.UUIDLogicalTypeAnnotation.class, "FIXED_LEN_BYTE_ARRAY"}, {ROW, null, null} }; } From 365b88e712bebb29c754def6bbe34a0d1a168895 Mon Sep 17 00:00:00 2001 From: Zac Blanco Date: Fri, 11 Oct 2024 13:43:20 -0700 Subject: [PATCH 3/3] Rename enabledOptimizedReader -> enableBatchReader --- .../parquet/reader/AbstractColumnReaderBenchmark.java | 6 +++--- .../parquet/reader/BenchmarkLongDecimalColumnReader.java | 4 ++-- .../parquet/reader/BenchmarkShortDecimalColumnReader.java | 4 ++-- .../presto/parquet/reader/BenchmarkUuidColumnReader.java | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java index a3768d560ca37..e7f1aaefa439f 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java @@ -85,7 +85,7 @@ public abstract class AbstractColumnReaderBenchmark @Param({ "true", "false", }) - public boolean enableOptimizedReader; + public boolean batchReaderEnabled; protected abstract PrimitiveField createPrimitiveField(); @@ -95,7 +95,7 @@ public abstract class AbstractColumnReaderBenchmark protected abstract void writeValue(ValuesWriter writer, T batch, int index); - protected abstract boolean getEnableOptimizedReader(); + protected abstract boolean getBatchReaderEnabled(); @Setup public void setup() @@ -126,7 +126,7 @@ public void setup() public int read() throws IOException { - ColumnReader columnReader = ColumnReaderFactory.createReader(field.getDescriptor(), getEnableOptimizedReader()); + ColumnReader columnReader = ColumnReaderFactory.createReader(field.getDescriptor(), getBatchReaderEnabled()); columnReader.init(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages).listIterator(), MAX_VALUES, null, null, Optional.empty(), null, -1, -1), field, null); ColumnReader reader = null; diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java index 32e87327e20fa..071c92cb4a0df 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java @@ -79,9 +79,9 @@ protected void writeValue(ValuesWriter writer, long[] batch, int index) } @Override - protected boolean getEnableOptimizedReader() + protected boolean getBatchReaderEnabled() { - return enableOptimizedReader; + return batchReaderEnabled; } @Override diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java index 664f3e69fad5b..62d094a4bc4b6 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java @@ -78,9 +78,9 @@ protected long[] generateDataBatch(int size) } @Override - protected boolean getEnableOptimizedReader() + protected boolean getBatchReaderEnabled() { - return enableOptimizedReader; + return batchReaderEnabled; } @Override diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java index 6e31031f8c818..94258521272d1 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java @@ -79,9 +79,9 @@ protected void writeValue(ValuesWriter writer, long[] batch, int index) } @Override - protected boolean getEnableOptimizedReader() + protected boolean getBatchReaderEnabled() { - return enableOptimizedReader; + return batchReaderEnabled; } @Override