diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 731e1f602b36a..55eb20acf9281 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1737,26 +1737,28 @@ Map of Iceberg types to the relevant PrestoDB types: - PrestoDB type * - ``BOOLEAN`` - ``BOOLEAN`` - * - ``BINARY``, ``FIXED`` - - ``VARBINARY`` - * - ``DATE`` - - ``DATE`` - * - ``DECIMAL`` - - ``DECIMAL`` - * - ``DOUBLE`` - - ``DOUBLE`` + * - ``INTEGER`` + - ``INTEGER`` * - ``LONG`` - ``BIGINT`` * - ``FLOAT`` - ``REAL`` - * - ``INTEGER`` - - ``INTEGER`` + * - ``DOUBLE`` + - ``DOUBLE`` + * - ``DECIMAL`` + - ``DECIMAL`` + * - ``STRING`` + - ``VARCHAR`` + * - ``BINARY``, ``FIXED`` + - ``VARBINARY`` + * - ``DATE`` + - ``DATE`` * - ``TIME`` - ``TIME`` * - ``TIMESTAMP`` - ``TIMESTAMP`` - * - ``STRING`` - - ``VARCHAR`` + * - ``UUID`` + - ``UUID`` * - ``LIST`` - ``ARRAY`` * - ``MAP`` @@ -1796,17 +1798,20 @@ Map of PrestoDB types to the relevant Iceberg types: - ``BINARY`` * - ``DATE`` - ``DATE`` - * - ``ROW`` - - ``STRUCT`` - * - ``ARRAY`` - - ``LIST`` - * - ``MAP`` - - ``MAP`` * - ``TIME`` - ``TIME`` * - ``TIMESTAMP`` - ``TIMESTAMP WITHOUT ZONE`` * - ``TIMESTAMP WITH TIMEZONE`` - ``TIMESTAMP WITH ZONE`` + * - ``UUID`` + - ``UUID`` + * - ``ARRAY`` + - ``LIST`` + * - ``MAP`` + - ``MAP`` + * - ``ROW`` + - ``STRUCT`` + No other types are supported. diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveType.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveType.java index 566fa5f3ac8c3..d6d641362bc93 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveType.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveType.java @@ -20,6 +20,7 @@ import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.common.type.TypeSignature; import com.facebook.presto.common.type.TypeSignatureParameter; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.spi.PrestoException; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; @@ -47,6 +48,7 @@ import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.common.type.RealType.REAL; import static com.facebook.presto.common.type.SmallintType.SMALLINT; +import static com.facebook.presto.common.type.StandardTypes.UUID; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TinyintType.TINYINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; @@ -58,6 +60,7 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE; import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.binaryTypeInfo; import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.booleanTypeInfo; import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.byteTypeInfo; @@ -87,6 +90,47 @@ public final class HiveType public static final HiveType HIVE_TIMESTAMP = new HiveType(timestampTypeInfo); public static final HiveType HIVE_DATE = new HiveType(dateTypeInfo); public static final HiveType HIVE_BINARY = new HiveType(binaryTypeInfo); + public static final HiveType HIVE_UUID = new HiveType(new TypeInfo() + { + @Override + public Category getCategory() + { + return PRIMITIVE; + } + + @Override + public String getTypeName() + { + return UUID; + } + + @Override + public boolean equals(Object other) + { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + + TypeInfo ti = (TypeInfo) other; + + return UUID.equals(ti.getTypeName()); + } + + @Override + public int hashCode() + { + return UUID.hashCode(); + } + + @Override + public String toString() + { + return UUID; + } + }); private final HiveTypeName hiveTypeName; private final TypeInfo typeInfo; @@ -223,6 +267,9 @@ private static TypeSignature getTypeSignature(TypeInfo typeInfo) switch (typeInfo.getCategory()) { case PRIMITIVE: Type primitiveType = getPrimitiveType((PrimitiveTypeInfo) typeInfo); + if (primitiveType == null && typeInfo.getTypeName().equals(UUID)) { + return UuidType.UUID.getTypeSignature(); + } if (primitiveType == null) { break; } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveTypeTranslator.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveTypeTranslator.java index 20a87260dce9b..c973f12330f4b 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveTypeTranslator.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveTypeTranslator.java @@ -40,6 +40,7 @@ import static com.facebook.presto.common.type.SmallintType.SMALLINT; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TinyintType.TINYINT; +import static com.facebook.presto.common.type.UuidType.UUID; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.hive.HiveType.HIVE_BINARY; import static com.facebook.presto.hive.HiveType.HIVE_BOOLEAN; @@ -52,6 +53,7 @@ import static com.facebook.presto.hive.HiveType.HIVE_SHORT; import static com.facebook.presto.hive.HiveType.HIVE_STRING; import static com.facebook.presto.hive.HiveType.HIVE_TIMESTAMP; +import static com.facebook.presto.hive.HiveType.HIVE_UUID; import static com.facebook.presto.hive.metastore.MetastoreUtil.isArrayType; import static com.facebook.presto.hive.metastore.MetastoreUtil.isMapType; import static com.facebook.presto.hive.metastore.MetastoreUtil.isRowType; @@ -91,6 +93,9 @@ public TypeInfo translate(Type type, Optional defaultHiveType) if (DOUBLE.equals(type)) { return HIVE_DOUBLE.getTypeInfo(); } + if (UUID.equals(type)) { + return HIVE_UUID.getTypeInfo(); + } if (type instanceof VarcharType) { VarcharType varcharType = (VarcharType) type; int varcharLength = varcharType.getLength(); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java index a66707acbec82..3f05a48d1219a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java @@ -31,6 +31,7 @@ import com.facebook.presto.common.type.TimeType; import com.facebook.presto.common.type.TimestampType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.common.type.VarbinaryType; import com.facebook.presto.common.type.VarcharType; import com.google.common.base.VerifyException; @@ -41,6 +42,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.UUID; import static com.facebook.presto.common.predicate.Marker.Bound.ABOVE; import static com.facebook.presto.common.predicate.Marker.Bound.BELOW; @@ -220,6 +222,12 @@ private static Object getIcebergLiteralValue(Type type, Marker marker) return new BigDecimal(Decimals.decodeUnscaledValue((Slice) value), decimalType.getScale()); } + if (type instanceof UuidType) { + UuidType uuidType = (UuidType) type; + return marker.getValueBlock() + .map(block -> UUID.fromString((String) uuidType.getObjectValue(null, block, 0))) + .orElseThrow(NullPointerException::new); + } return marker.getValue(); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java index cd05290c12b35..b08f9c116e0c3 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java @@ -33,6 +33,7 @@ import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.common.type.TypeSignature; import com.facebook.presto.common.type.TypeSignatureParameter; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.common.type.VarbinaryType; import com.facebook.presto.common.type.VarcharType; import com.facebook.presto.hive.HiveType; @@ -120,6 +121,8 @@ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager return TimestampType.TIMESTAMP; case STRING: return VarcharType.createUnboundedVarcharType(); + case UUID: + return UuidType.UUID; case LIST: Types.ListType listType = (Types.ListType) type; return new ArrayType(toPrestoType(listType.elementType(), typeManager)); @@ -203,6 +206,9 @@ public static org.apache.iceberg.types.Type toIcebergType(Type type) if (type instanceof TimestampWithTimeZoneType) { return Types.TimestampType.withZone(); } + if (type instanceof UuidType) { + return Types.UUIDType.get(); + } throw new PrestoException(NOT_SUPPORTED, "Type not supported for Iceberg: " + type.getDisplayName()); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index a9f17f3a0f167..a20bc099fb0cb 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -88,6 +88,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.time.LocalDateTime; import java.time.LocalTime; @@ -95,6 +96,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -134,6 +136,7 @@ import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; @@ -1596,20 +1599,20 @@ public void testMetadataVersionsMaintainingProperties() } } - @DataProvider(name = "decimalVectorReader") - public Object[] decimalVectorReader() + @DataProvider(name = "batchReadEnabled") + public Object[] batchReadEnabledReader() { return new Object[] {true, false}; } - private Session decimalVectorReaderEnabledSession(boolean decimalVectorReaderEnabled) + private Session batchReadEnabledEnabledSession(boolean batchReadEnabled) { return Session.builder(getQueryRunner().getDefaultSession()) - .setCatalogSessionProperty(ICEBERG_CATALOG, PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, String.valueOf(decimalVectorReaderEnabled)) + .setCatalogSessionProperty(ICEBERG_CATALOG, PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, String.valueOf(batchReadEnabled)) .build(); } - @Test(dataProvider = "decimalVectorReader") + @Test(dataProvider = "batchReadEnabled") public void testDecimal(boolean decimalVectorReaderEnabled) { String tableName = "test_decimal_vector_reader"; @@ -1625,7 +1628,7 @@ public void testDecimal(boolean decimalVectorReaderEnabled) // Insert data to table assertUpdate("INSERT INTO " + tableName + values, 4); - Session session = decimalVectorReaderEnabledSession(decimalVectorReaderEnabled); + Session session = batchReadEnabledEnabledSession(decimalVectorReaderEnabled); assertQuery(session, "SELECT * FROM " + tableName, values); } finally { @@ -1701,6 +1704,7 @@ public void testAllIcebergType() " c_timestamp TIMESTAMP, " + " c_varchar VARCHAR, " + " c_varbinary VARBINARY, " + + " c_uuid UUID, " + " c_array ARRAY(BIGINT), " + " c_map MAP(VARCHAR, INT), " + " c_row ROW(a INT, b VARCHAR) " + @@ -1708,16 +1712,16 @@ public void testAllIcebergType() assertUpdate(format("" + "INSERT INTO %s " + - "SELECT c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_array, c_map, c_row " + + "SELECT c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_uuid, c_array, c_map, c_row " + "FROM ( " + " VALUES " + - " (null, null, null, null, null, null, null, null, null, null, null, null), " + - " (true, INT '1245', BIGINT '1', DOUBLE '2.2', REAL '-24.124', DATE '2024-07-29', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), sequence(0, 10), MAP(ARRAY['aaa', 'bbbb'], ARRAY[1, 2]), CAST(ROW(1, 'AAA') AS ROW(a INT, b VARCHAR)))," + - " (false, INT '-1245', BIGINT '-1', DOUBLE '2.3', REAL '243215.435', DATE '2024-07-29', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), sequence(30, 35), MAP(ARRAY['ccc', 'bbbb'], ARRAY[-1, -2]), CAST(ROW(-1, 'AAA') AS ROW(a INT, b VARCHAR))) " + - ") AS x (c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_array, c_map, c_row)", tmpTableName), 3); + " (null, null, null, null, null, null, null, null, null, null, null, null, null), " + + " (true, INT '1245', BIGINT '1', DOUBLE '2.2', REAL '-24.124', DATE '2024-07-29', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), CAST('4ae71336-e44b-39bf-b9d2-752e234818a5' as UUID), sequence(0, 10), MAP(ARRAY['aaa', 'bbbb'], ARRAY[1, 2]), CAST(ROW(1, 'AAA') AS ROW(a INT, b VARCHAR)))," + + " (false, INT '-1245', BIGINT '-1', DOUBLE '2.3', REAL '243215.435', DATE '2024-07-29', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), CAST('4ae71336-e44b-39bf-b9d2-752e234818a5' as UUID), sequence(30, 35), MAP(ARRAY['ccc', 'bbbb'], ARRAY[-1, -2]), CAST(ROW(-1, 'AAA') AS ROW(a INT, b VARCHAR))) " + + ") AS x (c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_uuid, c_array, c_map, c_row)", tmpTableName), 3); - Session decimalVectorReaderEnabled = decimalVectorReaderEnabledSession(true); - Session decimalVectorReaderDisable = decimalVectorReaderEnabledSession(false); + Session decimalVectorReaderEnabled = batchReadEnabledEnabledSession(true); + Session decimalVectorReaderDisable = batchReadEnabledEnabledSession(false); assertQueryWithSameQueryRunner(decimalVectorReaderEnabled, "SELECT * FROM " + tmpTableName, decimalVectorReaderDisable); } finally { @@ -1886,6 +1890,64 @@ public void testStatisticsFileCache() getQueryRunner().execute("DROP TABLE test_statistics_file_cache"); } + @Test(dataProvider = "batchReadEnabled") + public void testUuidRoundTrip(boolean batchReadEnabled) + { + Session session = batchReadEnabledEnabledSession(batchReadEnabled); + try { + assertQuerySucceeds("CREATE TABLE uuid_roundtrip(u uuid)"); + UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555"); + assertUpdate(format("INSERT INTO uuid_roundtrip VALUES CAST('%s' as uuid)", uuid), 1); + assertQuery(session, "SELECT CAST(u as varchar) FROM uuid_roundtrip", format("VALUES '%s'", uuid)); + } + finally { + assertQuerySucceeds("DROP TABLE uuid_roundtrip"); + } + } + + @Test(dataProvider = "batchReadEnabled") + public void testUuidFilters(boolean batchReadEnabled) + { + Session session = batchReadEnabledEnabledSession(batchReadEnabled); + try { + int uuidCount = 100; + assertQuerySucceeds("CREATE TABLE uuid_filters(u uuid)"); + List uuids = IntStream.range(0, uuidCount) + .mapToObj(idx -> { + ByteBuffer buf = ByteBuffer.allocate(16); + if (idx % 2 == 0) { + buf.putLong(0L); + buf.putLong(idx); + } + else { + buf.putLong(idx); + buf.putLong(0L); + } + buf.flip(); + return new UUID(buf.getLong(), buf.getLong()); + }) + .collect(Collectors.toList()); + // shuffle to make sure parquet metadata stats are updated properly even with + // out-of-order values + Collections.shuffle(uuids); + assertUpdate(format("INSERT INTO uuid_filters VALUES %s", + Joiner.on(", ").join(uuids.stream().map(uuid -> format("CAST('%s' as uuid)", uuid)).iterator())), 100); + assertQuery(session, format("SELECT CAST(u as varchar) FROM uuid_filters WHERE u = CAST('%s' as uuid)", uuids.get(0)), format("VALUES '%s'", uuids.get(0))); + + // sort so we can easily get lowest and highest + uuids.sort(Comparator.naturalOrder()); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u >= CAST('%s' as uuid)", uuids.get(0)), format("VALUES %d", uuidCount)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u > CAST('%s' as uuid)", uuids.get(0)), format("VALUES %d", uuidCount - 1)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u <= CAST('%s' as uuid)", uuids.get(uuidCount - 1)), format("VALUES %d", uuidCount)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u < CAST('%s' as uuid)", uuids.get(uuidCount - 1)), format("VALUES %d", uuidCount - 1)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u < CAST('%s' as uuid)", uuids.get(50)), format("VALUES %d", 50)); + assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u <= CAST('%s' as uuid)", uuids.get(50)), format("VALUES %d", 51)); + } + finally { + assertQuerySucceeds("DROP TABLE uuid_filters"); + } + } + @DataProvider(name = "parquetVersions") public Object[][] parquetVersionsDataProvider() { @@ -1939,7 +2001,7 @@ private void writePositionDeleteToNationTable(Table icebergTable, String dataFil Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile(); org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metastoreDir.toURI()); - String deleteFileName = "delete_file_" + UUID.randomUUID(); + String deleteFileName = "delete_file_" + randomUUID(); FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir); org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(metadataDir, deleteFileName); PositionDeleteWriter writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs)) @@ -1971,7 +2033,7 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map shaded.parquet.it.unimi.dsi.fastutil.* module-info - com.facebook.presto.parquet.batchreader.*BatchReader + com.facebook.presto.parquet.* + org.apache.parquet.io.* + org.apache.parquet.crypto.* + + org.apache.maven.plugins + maven-checkstyle-plugin + + **/com/facebook/presto/parquet/batchreader/*FlatBatchReader.java,**/com/facebook/presto/parquet/batchreader/*NestedBatchReader.java + + @@ -245,5 +254,37 @@ + + generateParquetReaders + + + + com.googlecode.fmpp-maven-plugin + fmpp-maven-plugin + 1.0 + + + net.sourceforge.fmpp + fmpp + 0.9.15 + + + + ${project.basedir}/src/main/resources/freemarker/config.fmpp + ${project.build.sourceDirectory} + ${project.basedir}/src/main/resources/freemarker/templates + + + + generate-sources + + generate + + + + + + + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java index eba17a1d3d2bd..2520bf241ddce 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java @@ -28,6 +28,7 @@ import com.facebook.presto.parquet.batchreader.ShortDecimalFlatBatchReader; import com.facebook.presto.parquet.batchreader.TimestampFlatBatchReader; import com.facebook.presto.parquet.batchreader.TimestampNestedBatchReader; +import com.facebook.presto.parquet.batchreader.UuidFlatBatchReader; import com.facebook.presto.parquet.reader.AbstractColumnReader; import com.facebook.presto.parquet.reader.BinaryColumnReader; import com.facebook.presto.parquet.reader.BooleanColumnReader; @@ -49,11 +50,13 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.isShortDecimalType; import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeMicrosType; import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeStampMicrosType; +import static com.facebook.presto.parquet.ParquetTypeUtils.isUuidType; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; public class ColumnReaderFactory { private static final Logger log = Logger.get(ColumnReaderFactory.class); + private ColumnReaderFactory() { } @@ -96,6 +99,10 @@ public static ColumnReader createReader(RichColumnDescriptor descriptor, boolean return isNested ? new BinaryNestedBatchReader(descriptor) : new BinaryFlatBatchReader(descriptor); case FIXED_LEN_BYTE_ARRAY: if (!isNested) { + if (isUuidType(descriptor)) { + return new UuidFlatBatchReader(descriptor); + } + decimalBatchColumnReader = createDecimalBatchColumnReader(descriptor); if (decimalBatchColumnReader.isPresent()) { return decimalBatchColumnReader.get(); @@ -126,10 +133,13 @@ public static ColumnReader createReader(RichColumnDescriptor descriptor, boolean case BINARY: return createDecimalColumnReader(descriptor).orElse(new BinaryColumnReader(descriptor)); case FIXED_LEN_BYTE_ARRAY: + if (isUuidType(descriptor)) { + return new BinaryColumnReader(descriptor); + } return createDecimalColumnReader(descriptor) .orElseThrow(() -> new PrestoException(NOT_SUPPORTED, " type FIXED_LEN_BYTE_ARRAY supported as DECIMAL; got " + descriptor.getPrimitiveType().getOriginalType())); default: - throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getType()); + throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getPrimitiveType().getPrimitiveTypeName()); } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java index c522663f4d8dc..0ca54e572e1f1 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java @@ -40,6 +40,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.getOnlyElement; import static java.util.stream.Collectors.joining; +import static org.apache.parquet.schema.LogicalTypeAnnotation.uuidType; import static org.apache.parquet.schema.OriginalType.DECIMAL; import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; import static org.apache.parquet.schema.OriginalType.TIME_MICROS; @@ -353,6 +354,11 @@ public static boolean isShortDecimalType(ColumnDescriptor descriptor) return decimalLogicalTypeAnnotation.getPrecision() <= MAX_SHORT_PRECISION; } + public static boolean isUuidType(ColumnDescriptor columnDescriptor) + { + return uuidType().equals(columnDescriptor.getPrimitiveType().getLogicalTypeAnnotation()); + } + public static boolean isDecimalType(ColumnDescriptor columnDescriptor) { return columnDescriptor.getPrimitiveType().getOriginalType() == DECIMAL; diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java index a16af1f2cc1e6..73dedeeeba719 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java index f89ea02b21894..53f6e6620b4ff 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java @@ -142,3 +142,4 @@ protected void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java index 480f0892fbd77..9ad8a5b64af4e 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java @@ -29,6 +29,13 @@ public static int getInt(byte[] byteBuffer, int offset) return (ch3 << 24) + (ch2 << 16) + (ch1 << 8) + ch0; } + /** + * Convert a little-endian formatted value in a bytearray to a long. + * + * @param byteBuffer source bytes + * @param offset offset in byte array to start decoding + * @return long value + */ public static long getLong(byte[] byteBuffer, int offset) { int ch0 = byteBuffer[offset + 0]; @@ -50,6 +57,34 @@ public static long getLong(byte[] byteBuffer, int offset) ((long) (ch0 & 255) << 0); } + /** + * Convert a big-endian formatted value in a bytearray to a long. + * + * @param byteBuffer source bytes + * @param offset offset in byte array to start decoding + * @return long value + */ + public static long getLongBigEndian(byte[] byteBuffer, int offset) + { + byte ch0 = byteBuffer[offset + 0]; + byte ch1 = byteBuffer[offset + 1]; + byte ch2 = byteBuffer[offset + 2]; + byte ch3 = byteBuffer[offset + 3]; + byte ch4 = byteBuffer[offset + 4]; + byte ch5 = byteBuffer[offset + 5]; + byte ch6 = byteBuffer[offset + 6]; + byte ch7 = byteBuffer[offset + 7]; + + return ((long) (ch0 & 255) << 56) + + ((long) (ch1 & 255) << 48) + + ((long) (ch2 & 255) << 40) + + ((long) (ch3 & 255) << 32) + + ((long) (ch4 & 255) << 24) + + ((long) (ch5 & 255) << 16) + + ((long) (ch6 & 255) << 8) + + ((long) (ch7 & 255) << 0); + } + public static void unpack8Values(byte inByte, byte[] out, int outPos) { out[0 + outPos] = (byte) (inByte & 1); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java index 2c97056ddda94..cba2bd2c1297a 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java index fea6186ad6126..9d69d34a30e37 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java @@ -142,3 +142,4 @@ protected void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java index 758df93fa44f8..ccedd8af82645 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java index c9a96ef85b7d2..652915453b4a3 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java @@ -142,3 +142,4 @@ protected void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java index 478dee49e0e49..90a13544b3186 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java @@ -256,3 +256,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java index db3dff192a240..235d7f79cc580 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java index 7ac7bef95043d..0d52fa272e455 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java @@ -255,3 +255,4 @@ private void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java index e5c867a4d9893..a7a62ebc7ac01 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java @@ -142,3 +142,4 @@ protected void seek() } } } + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/UuidFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/UuidFlatBatchReader.java new file mode 100644 index 0000000000000..bf3cdf864bb54 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/UuidFlatBatchReader.java @@ -0,0 +1,260 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader; + +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.Int128ArrayBlock; +import com.facebook.presto.common.block.RunLengthEncodedBlock; +import com.facebook.presto.parquet.ColumnReader; +import com.facebook.presto.parquet.DataPage; +import com.facebook.presto.parquet.DictionaryPage; +import com.facebook.presto.parquet.Field; +import com.facebook.presto.parquet.RichColumnDescriptor; +import com.facebook.presto.parquet.batchreader.decoders.Decoders.FlatDecoders; +import com.facebook.presto.parquet.batchreader.decoders.FlatDefinitionLevelDecoder; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; +import com.facebook.presto.parquet.batchreader.dictionary.Dictionaries; +import com.facebook.presto.parquet.dictionary.Dictionary; +import com.facebook.presto.parquet.reader.ColumnChunk; +import com.facebook.presto.parquet.reader.PageReader; +import com.facebook.presto.spi.PrestoException; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; +import org.apache.parquet.io.ParquetDecodingException; +import org.openjdk.jol.info.ClassLayout; + +import java.io.IOException; +import java.util.Optional; + +import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_IO_READ_ERROR; +import static com.facebook.presto.parquet.batchreader.decoders.Decoders.readFlatPage; +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Math.min; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class UuidFlatBatchReader + implements ColumnReader +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(UuidFlatBatchReader.class).instanceSize(); + + private final RichColumnDescriptor columnDescriptor; + + protected Field field; + protected int nextBatchSize; + protected FlatDefinitionLevelDecoder definitionLevelDecoder; + protected UuidValuesDecoder valuesDecoder; + protected int remainingCountInPage; + + private Dictionary dictionary; + private int readOffset; + private PageReader pageReader; + + public UuidFlatBatchReader(RichColumnDescriptor columnDescriptor) + { + this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null"); + } + + @Override + public boolean isInitialized() + { + return pageReader != null && field != null; + } + + @Override + public void init(PageReader pageReader, Field field, RowRanges rowRanges) + { + checkArgument(!isInitialized(), "Parquet batch reader already initialized"); + this.pageReader = requireNonNull(pageReader, "pageReader is null"); + checkArgument(pageReader.getValueCountInColumnChunk() > 0, "page is empty"); + this.field = requireNonNull(field, "field is null"); + + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + if (dictionaryPage != null) { + dictionary = Dictionaries.createDictionary(columnDescriptor, dictionaryPage); + } + } + + @Override + public void prepareNextRead(int batchSize) + { + readOffset = readOffset + nextBatchSize; + nextBatchSize = batchSize; + } + + @Override + public ColumnChunk readNext() + { + ColumnChunk columnChunk = null; + try { + seek(); + if (field.isRequired()) { + columnChunk = readWithoutNull(); + } + else { + columnChunk = readWithNull(); + } + } + catch (IOException exception) { + throw new PrestoException(PARQUET_IO_READ_ERROR, "Error reading Parquet column " + columnDescriptor, exception); + } + + readOffset = 0; + nextBatchSize = 0; + return columnChunk; + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + (definitionLevelDecoder == null ? 0 : definitionLevelDecoder.getRetainedSizeInBytes()) + + (valuesDecoder == null ? 0 : valuesDecoder.getRetainedSizeInBytes()) + + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + + (pageReader == null ? 0 : pageReader.getRetainedSizeInBytes()); + } + + protected boolean readNextPage() + { + definitionLevelDecoder = null; + valuesDecoder = null; + remainingCountInPage = 0; + + DataPage page = pageReader.readPage(); + if (page == null) { + return false; + } + + FlatDecoders flatDecoders = readFlatPage(page, columnDescriptor, dictionary); + definitionLevelDecoder = flatDecoders.getDefinitionLevelDecoder(); + valuesDecoder = (UuidValuesDecoder) flatDecoders.getValuesDecoder(); + + remainingCountInPage = page.getValueCount(); + return true; + } + + private ColumnChunk readWithNull() + throws IOException + { + long[] values = new long[nextBatchSize * 2]; + boolean[] isNull = new boolean[nextBatchSize]; + + int totalNonNullCount = 0; + int remainingInBatch = nextBatchSize; + int startOffset = 0; + while (remainingInBatch > 0) { + if (remainingCountInPage == 0) { + if (!readNextPage()) { + break; + } + } + + int chunkSize = min(remainingCountInPage, remainingInBatch); + int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize); + totalNonNullCount += nonNullCount; + + if (nonNullCount > 0) { + valuesDecoder.readNext(values, startOffset, nonNullCount); + + int valueDestinationIndex = startOffset + chunkSize - 1; + int valueSourceIndex = startOffset + nonNullCount - 1; + + while (valueDestinationIndex >= startOffset) { + if (!isNull[valueDestinationIndex]) { + values[valueDestinationIndex * 2 + 1] = values[valueSourceIndex * 2 + 1]; + values[valueDestinationIndex * 2] = values[valueSourceIndex * 2]; + valueSourceIndex--; + } + valueDestinationIndex--; + } + } + + startOffset += chunkSize; + remainingInBatch -= chunkSize; + remainingCountInPage -= chunkSize; + } + + if (remainingInBatch != 0) { + throw new ParquetDecodingException("Still remaining to be read in current batch."); + } + + if (totalNonNullCount == 0) { + Block block = RunLengthEncodedBlock.create(field.getType(), null, nextBatchSize); + return new ColumnChunk(block, new int[0], new int[0]); + } + + boolean hasNoNull = totalNonNullCount == nextBatchSize; + Block block = new Int128ArrayBlock(nextBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values); + return new ColumnChunk(block, new int[0], new int[0]); + } + + private ColumnChunk readWithoutNull() + throws IOException + { + long[] values = new long[nextBatchSize * 2]; + int remainingInBatch = nextBatchSize; + int startOffset = 0; + while (remainingInBatch > 0) { + if (remainingCountInPage == 0) { + if (!readNextPage()) { + break; + } + } + + int chunkSize = min(remainingCountInPage, remainingInBatch); + + valuesDecoder.readNext(values, startOffset, chunkSize); + startOffset += chunkSize; + remainingInBatch -= chunkSize; + remainingCountInPage -= chunkSize; + } + + if (remainingInBatch != 0) { + throw new ParquetDecodingException(format("Corrupted Parquet file: extra %d values to be consumed when scanning current batch", remainingInBatch)); + } + + Block block = new Int128ArrayBlock(nextBatchSize, Optional.empty(), values); + return new ColumnChunk(block, new int[0], new int[0]); + } + + private void seek() + throws IOException + { + if (readOffset == 0) { + return; + } + + int remainingInBatch = readOffset; + int startOffset = 0; + while (remainingInBatch > 0) { + if (remainingCountInPage == 0) { + if (!readNextPage()) { + break; + } + } + + int chunkSize = min(remainingCountInPage, remainingInBatch); + int skipSize = chunkSize; + if (!columnDescriptor.isRequired()) { + boolean[] isNull = new boolean[readOffset]; + int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize); + skipSize = nonNullCount; + startOffset += chunkSize; + } + valuesDecoder.skip(skipSize); + remainingInBatch -= chunkSize; + remainingCountInPage -= chunkSize; + } + } +} + diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java index f2fc89c300093..e51cf0c00fdea 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java @@ -23,6 +23,7 @@ import com.facebook.presto.parquet.batchreader.decoders.delta.BinaryShortDecimalDeltaValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.FixedLenByteArrayLongDecimalDeltaValueDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.FixedLenByteArrayShortDecimalDeltaValueDecoder; +import com.facebook.presto.parquet.batchreader.decoders.delta.FixedLenByteArrayUuidDeltaValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.Int32DeltaBinaryPackedValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.Int32ShortDecimalDeltaValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.Int64DeltaBinaryPackedValuesDecoder; @@ -34,6 +35,7 @@ import com.facebook.presto.parquet.batchreader.decoders.plain.BooleanPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayLongDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayShortDecimalPlainValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayUuidPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int32PlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int32ShortDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int64PlainValuesDecoder; @@ -49,6 +51,7 @@ import com.facebook.presto.parquet.batchreader.decoders.rle.LongDecimalRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.ShortDecimalRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.TimestampRLEDictionaryValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.UuidRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.dictionary.BinaryBatchDictionary; import com.facebook.presto.parquet.batchreader.dictionary.TimestampDictionary; import com.facebook.presto.parquet.dictionary.Dictionary; @@ -81,6 +84,7 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.isShortDecimalType; import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeMicrosType; import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeStampMicrosType; +import static com.facebook.presto.parquet.ParquetTypeUtils.isUuidType; import static com.facebook.presto.parquet.ValuesType.VALUES; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; @@ -153,6 +157,10 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript int typeLength = columnDescriptor.getPrimitiveType().getTypeLength(); return new FixedLenByteArrayLongDecimalPlainValuesDecoder(typeLength, buffer, offset, length); } + else if (isUuidType(columnDescriptor)) { + int typeLength = columnDescriptor.getPrimitiveType().getTypeLength(); + return new FixedLenByteArrayUuidPlainValuesDecoder(typeLength, buffer, offset, length); + } default: throw new PrestoException(PARQUET_UNSUPPORTED_COLUMN_TYPE, format("Column: %s, Encoding: %s", columnDescriptor, encoding)); } @@ -199,6 +207,9 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript } return new LongDecimalRLEDictionaryValuesDecoder(bitWidth, inputStream, (BinaryBatchDictionary) dictionary); } + else if (isUuidType(columnDescriptor)) { + return new UuidRLEDictionaryValuesDecoder(bitWidth, inputStream, (BinaryBatchDictionary) dictionary); + } default: throw new PrestoException(PARQUET_UNSUPPORTED_COLUMN_TYPE, format("Column: %s, Encoding: %s", columnDescriptor, encoding)); } @@ -256,6 +267,11 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript return new FixedLenByteArrayLongDecimalDeltaValueDecoder(parquetReader); } + else if (isUuidType(columnDescriptor)) { + ByteBufferInputStream inputStream = ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, offset, length)); + ValuesReader parquetReader = getParquetReader(encoding, columnDescriptor, valueCount, inputStream); + return new FixedLenByteArrayUuidDeltaValuesDecoder(parquetReader); + } } throw new PrestoException(PARQUET_UNSUPPORTED_ENCODING, format("Column: %s, Encoding: %s", columnDescriptor, encoding)); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java index b5468c82a98f0..b3944cfe9cd76 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java @@ -102,5 +102,15 @@ void skip(int length) throws IOException; } + interface UuidValuesDecoder + extends ValuesDecoder + { + void readNext(long[] values, int offset, int length) + throws IOException; + + void skip(int length) + throws IOException; + } + public long getRetainedSizeInBytes(); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayUuidDeltaValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayUuidDeltaValuesDecoder.java new file mode 100644 index 0000000000000..84180bb455f22 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayUuidDeltaValuesDecoder.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.delta; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; +import org.apache.parquet.column.values.ValuesReader; +import org.openjdk.jol.info.ClassLayout; + +import static com.facebook.presto.parquet.batchreader.BytesUtils.getLongBigEndian; +import static java.util.Objects.requireNonNull; + +/** + * Note: this is not an optimized values decoder. It makes use of the existing Parquet decoder. Given that this type encoding + * is not a common one, just use the existing one provided by Parquet library and add a wrapper around it that satisfies the + * {@link UuidValuesDecoder} interface. + */ +public class FixedLenByteArrayUuidDeltaValuesDecoder + implements UuidValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(FixedLenByteArrayUuidDeltaValuesDecoder.class).instanceSize(); + + private final ValuesReader delegate; + + public FixedLenByteArrayUuidDeltaValuesDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void readNext(long[] values, int offset, int length) + { + int endOffset = (offset + length) * 2; + for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) { + byte[] inputBytes = delegate.readBytes().getBytes(); + values[currentOutputOffset] = getLongBigEndian(inputBytes, 0); + values[currentOutputOffset + 1] = getLongBigEndian(inputBytes, Long.BYTES); + } + } + + @Override + public void skip(int length) + { + delegate.skip(length); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayUuidPlainValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayUuidPlainValuesDecoder.java new file mode 100644 index 0000000000000..296021588e442 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayUuidPlainValuesDecoder.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.plain; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; +import org.openjdk.jol.info.ClassLayout; + +import java.nio.ByteBuffer; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.sizeOf; +import static java.nio.ByteOrder.BIG_ENDIAN; + +public class FixedLenByteArrayUuidPlainValuesDecoder + implements UuidValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(FixedLenByteArrayUuidPlainValuesDecoder.class).instanceSize(); + + private final int typeLength; + private final int bufferEnd; + private final ByteBuffer buffer; + + public FixedLenByteArrayUuidPlainValuesDecoder(int typeLength, byte[] buffer, int baseOffset, int length) + { + checkArgument(typeLength == 16, "typeLength %s should be 16 for a UUID", typeLength); + this.typeLength = typeLength; + this.buffer = ByteBuffer.wrap(buffer).order(BIG_ENDIAN); + this.buffer.position(baseOffset); + this.bufferEnd = baseOffset + length; + } + + /** + * @param values array containing decoded values + * @param offset offset to start writing in the values argument + * @param length number of values to write + */ + @Override + public void readNext(long[] values, int offset, int length) + { + int readEndOffset = (offset + length) * 2; + + for (int currentOutputOffset = offset * 2; currentOutputOffset < readEndOffset; currentOutputOffset += 2) { + values[currentOutputOffset] = buffer.getLong(); + values[currentOutputOffset + 1] = buffer.getLong(); + } + } + + @Override + public void skip(int length) + { + checkArgument(buffer.position() + (length * typeLength) <= bufferEnd, "End of stream: invalid read request"); + checkArgument(length >= 0, "invalid length %s", length); + buffer.position(buffer.position() + (length * typeLength)); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + sizeOf(buffer.array()); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java new file mode 100644 index 0000000000000..30fdc0e93d77a --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.rle; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BinaryValuesDecoder.ValueBuffer; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; +import com.facebook.presto.parquet.batchreader.dictionary.BinaryBatchDictionary; + +import java.io.IOException; +import java.io.InputStream; + +import static com.facebook.presto.parquet.batchreader.BytesUtils.getLongBigEndian; +import static java.util.Objects.requireNonNull; + +public class UuidRLEDictionaryValuesDecoder + extends BaseRLEBitPackedDecoder + implements UuidValuesDecoder +{ + private final BinaryRLEDictionaryValuesDecoder delegate; + + public UuidRLEDictionaryValuesDecoder(int bitWidth, InputStream inputStream, BinaryBatchDictionary dictionary) + { + super(Integer.MAX_VALUE, bitWidth, inputStream); + requireNonNull(dictionary, "dictionary is null"); + delegate = new BinaryRLEDictionaryValuesDecoder(bitWidth, inputStream, dictionary); + } + + @Override + public void readNext(long[] values, int offset, int length) + throws IOException + { + ValueBuffer valueBuffer = delegate.readNext(length); + int bufferSize = valueBuffer.getBufferSize(); + byte[] byteBuffer = new byte[bufferSize]; + int[] offsets = new int[length + 1]; + delegate.readIntoBuffer(byteBuffer, 0, offsets, 0, valueBuffer); + + for (int i = 0; i < length; i++) { + int positionOffset = offsets[i]; + values[(i + offset) * 2] = getLongBigEndian(byteBuffer, positionOffset); + values[((i + offset) * 2) + 1] = getLongBigEndian(byteBuffer, positionOffset + Long.BYTES); + } + } + + @Override + public void skip(int length) + throws IOException + { + delegate.skip(length); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java index 8af920d74d7b8..828fe24441430 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java @@ -373,14 +373,15 @@ private static void readTypeSchema(Types.GroupBuilder builder, Iterator annotationWithParams = getLogicalTypeAnnotationWithParameters(type); - annotationWithParams.ifPresent(typeBuilder::as); + getLogicalTypeAnnotationWithParameters(element.getLogicalType()) + .ifPresent(typeBuilder::as); } + else if (element.isSetConverted_type()) { + typeBuilder.as(getOriginalType(element.converted_type)); + } + if (element.isSetField_id()) { typeBuilder.id(element.field_id); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/BinaryColumnReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/BinaryColumnReader.java index abbd835a40ca9..7d18a7ffdea25 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/BinaryColumnReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/BinaryColumnReader.java @@ -15,6 +15,7 @@ import com.facebook.presto.common.block.BlockBuilder; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.parquet.RichColumnDescriptor; import io.airlift.slice.Slice; import org.apache.parquet.io.api.Binary; @@ -23,6 +24,7 @@ import static com.facebook.presto.common.type.Chars.truncateToLengthAndTrimSpaces; import static com.facebook.presto.common.type.Varchars.isVarcharType; import static com.facebook.presto.common.type.Varchars.truncateToLength; +import static com.facebook.presto.parquet.batchreader.BytesUtils.getLongBigEndian; import static io.airlift.slice.Slices.EMPTY_SLICE; import static io.airlift.slice.Slices.wrappedBuffer; @@ -40,6 +42,15 @@ protected void readValue(BlockBuilder blockBuilder, Type type) if (definitionLevel == columnDescriptor.getMaxDefinitionLevel()) { Binary binary = valuesReader.readBytes(); Slice value; + + if (type instanceof UuidType) { + byte[] src = binary.getBytes(); + blockBuilder.writeLong(getLongBigEndian(src, 0)); + blockBuilder.writeLong(getLongBigEndian(src, Long.BYTES)); + blockBuilder.closeEntry(); + return; + } + if (binary.length() == 0) { value = EMPTY_SLICE; } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java index 5c846552d43ee..f46f9de26201b 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java @@ -20,6 +20,7 @@ import com.facebook.presto.common.type.RealType; import com.facebook.presto.common.type.RowType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.UuidType; import com.facebook.presto.common.type.VarbinaryType; import com.facebook.presto.common.type.VarcharType; import com.facebook.presto.spi.PrestoException; @@ -145,6 +146,13 @@ else if (decimalType.isShort()) { if (type instanceof VarcharType || type instanceof CharType || type instanceof VarbinaryType) { return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition).named(name); } + if (type instanceof UuidType) { + LogicalTypeAnnotation logicalTypeAnnotation = LogicalTypeAnnotation.uuidType(); + Types.PrimitiveBuilder builder = Types.primitive(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .length(16); + builder = builder.as(logicalTypeAnnotation); + return builder.named(name); + } throw new PrestoException(NOT_SUPPORTED, format("Unsupported primitive type: %s", type)); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java index 318f7b68bc6d2..a6b152381622c 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java @@ -29,6 +29,7 @@ import com.facebook.presto.parquet.writer.valuewriter.RealValueWriter; import com.facebook.presto.parquet.writer.valuewriter.TimeValueWriter; import com.facebook.presto.parquet.writer.valuewriter.TimestampValueWriter; +import com.facebook.presto.parquet.writer.valuewriter.UuidValuesWriter; import com.facebook.presto.spi.PrestoException; import com.google.common.collect.ImmutableList; import org.apache.parquet.column.ColumnDescriptor; @@ -57,6 +58,7 @@ import static com.facebook.presto.common.type.TimeType.TIME; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TinyintType.TINYINT; +import static com.facebook.presto.common.type.UuidType.UUID; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -212,6 +214,9 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Ty if (TIME.equals(type)) { return new TimeValueWriter(valuesWriter, type, parquetType); } + if (UUID.equals(type)) { + return new UuidValuesWriter(valuesWriter, parquetType); + } if (type instanceof VarcharType || type instanceof CharType || type instanceof VarbinaryType) { return new CharValueWriter(valuesWriter, type, parquetType); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/UuidValuesWriter.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/UuidValuesWriter.java new file mode 100644 index 0000000000000..eefe981d9281f --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/UuidValuesWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.writer.valuewriter; + +import com.facebook.presto.common.block.Block; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; + +@NotThreadSafe +public class UuidValuesWriter + extends PrimitiveValueWriter +{ + private final ByteBuffer writeBuffer = ByteBuffer.allocate(2 * SIZE_OF_LONG) + .order(ByteOrder.BIG_ENDIAN); + + public UuidValuesWriter(ValuesWriter valuesWriter, PrimitiveType parquetType) + { + super(parquetType, valuesWriter); + } + + @Override + public void write(Block block) + { + for (int i = 0; i < block.getPositionCount(); i++) { + if (!block.isNull(i)) { + writeBuffer.clear(); + writeBuffer.putLong(block.getLong(i, 0)); + writeBuffer.putLong(block.getLong(i, SIZE_OF_LONG)); + writeBuffer.flip(); + Binary data = Binary.fromReusedByteBuffer(writeBuffer); + getValueWriter().writeBytes(data); + getStatistics().updateStats(data); + } + } + } +} diff --git a/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd b/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd index 7eadd8be3f710..a72c9e8a54349 100644 --- a/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd +++ b/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd @@ -36,5 +36,11 @@ valuesDecoder: "LongDecimalValuesDecoder", primitiveType: "long" }, + { + classNamePrefix: "Uuid", + blockType: "Int128ArrayBlock", + valuesDecoder: "UuidValuesDecoder", + primitiveType: "long" + }, ] } diff --git a/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd b/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd index 0b11669d8026b..0bbef5592ba7e 100644 --- a/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd +++ b/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd @@ -4,7 +4,7 @@ <#assign updatedTemplate = updatedTemplate?replace("com.facebook.presto.common.block.IntArrayBlock", "com.facebook.presto.common.block.${type.blockType}")> <#assign updatedTemplate = updatedTemplate?replace("IntArrayBlock", "${type.blockType}")> <#assign updatedTemplate = updatedTemplate?replace("Int32ValuesDecoder", "${type.valuesDecoder}")> -<#if !type.classNamePrefix?starts_with("LongDecimal")> +<#if !type.classNamePrefix?starts_with("LongDecimal") && !type.classNamePrefix?starts_with("Uuid")> <#assign updatedTemplate = updatedTemplate?replace("int[] values = new int[nextBatchSize]", "${type.primitiveType}[] values = new ${type.primitiveType}[nextBatchSize]")> <#else> <#assign updatedTemplate = updatedTemplate?replace("int[] values = new int[nextBatchSize]", "${type.primitiveType}[] values = new ${type.primitiveType}[nextBatchSize * 2]")> diff --git a/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd b/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd index f19a2140d7d45..6ef04c4ec4981 100644 --- a/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd +++ b/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd @@ -1,5 +1,5 @@ <#list parquetTypes.flatTypes as type> -<#if !type.classNamePrefix?ends_with("Decimal")> +<#if !type.classNamePrefix?ends_with("Decimal") && !type.classNamePrefix?starts_with("Uuid")> <@pp.changeOutputFile name="/com/facebook/presto/parquet/batchreader/${type.classNamePrefix}NestedBatchReader.java" /> <#assign updatedTemplate = nestedTypeTemplate?replace("Int32NestedBatchReader", "${type.classNamePrefix}NestedBatchReader")> <#assign updatedTemplate = updatedTemplate?replace("com.facebook.presto.common.block.IntArrayBlock", "com.facebook.presto.common.block.${type.blockType}")> diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestParquetUtils.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestParquetUtils.java index cea13b18a9da4..9168934f92133 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestParquetUtils.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestParquetUtils.java @@ -144,6 +144,16 @@ public static byte[] generatePlainValuesPage(int valueCount, int valueSizeBits, } break; } + case 128: + for (int i = 0; i < valueCount; i++) { + long value = random.nextLong(); + writer.writeLong(value); + addedValues.add(value); + value = random.nextLong(); + writer.writeLong(value); + addedValues.add(value); + } + break; default: throw new IllegalArgumentException("invalid value size (expected: 4, 8 or 12)"); } diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java index 35f356f6010c1..c94a68e8d19e3 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java @@ -21,8 +21,10 @@ import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64ValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.TimestampValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.UuidValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.BinaryPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.BooleanPlainValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayUuidPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int32PlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int32ShortDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int64PlainValuesDecoder; @@ -36,6 +38,7 @@ import com.facebook.presto.parquet.batchreader.decoders.rle.Int64RLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.TimestampRLEDictionaryValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.UuidRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.dictionary.BinaryBatchDictionary; import com.facebook.presto.parquet.batchreader.dictionary.TimestampDictionary; import com.facebook.presto.parquet.dictionary.IntegerDictionary; @@ -55,6 +58,7 @@ import static com.facebook.presto.parquet.ParquetEncoding.PLAIN_DICTIONARY; import static com.facebook.presto.parquet.batchreader.decoders.TestParquetUtils.generateDictionaryIdPage2048; import static com.facebook.presto.parquet.batchreader.decoders.TestParquetUtils.generatePlainValuesPage; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.lang.Math.min; import static org.apache.parquet.bytes.BytesUtils.UTF8; import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; @@ -142,6 +146,39 @@ private static ShortDecimalValuesDecoder int64ShortDecimalRLE(byte[] pageBytes, return new Int64RLEDictionaryValuesDecoder(getWidthFromMaxInt(dictionarySize), new ByteArrayInputStream(pageBytes), dictionary); } + private static UuidValuesDecoder uuidPlain(byte[] pageBytes) + { + return new FixedLenByteArrayUuidPlainValuesDecoder(16, pageBytes, 0, pageBytes.length); + } + + private static UuidValuesDecoder uuidRle(byte[] pageBytes, int dictionarySize, BinaryBatchDictionary dictionary) + { + return new UuidRLEDictionaryValuesDecoder(getWidthFromMaxInt(dictionarySize), new ByteArrayInputStream(pageBytes), dictionary); + } + + private static void uuidBatchReadWithSkipHelper(int batchSize, int skipSize, int valueCount, UuidValuesDecoder decoder, List expectedValues) + throws IOException + { + long[] actualValues = new long[valueCount * 2]; + int inputOffset = 0; + int outputOffset = 0; + while (inputOffset < valueCount) { + int readBatchSize = min(batchSize, valueCount - inputOffset); + decoder.readNext(actualValues, outputOffset, readBatchSize); + + for (int i = 0; i < (readBatchSize * 2); i++) { + assertEquals(actualValues[(outputOffset * 2) + i], expectedValues.get((inputOffset * 2) + i)); + } + + inputOffset += readBatchSize; + outputOffset += readBatchSize; + + int skipBatchSize = min(skipSize, valueCount - inputOffset); + decoder.skip(skipBatchSize); + inputOffset += skipBatchSize; + } + } + private static void int32BatchReadWithSkipHelper(int batchSize, int skipSize, int valueCount, Int32ValuesDecoder decoder, List expectedValues) throws IOException { @@ -681,4 +718,66 @@ public void testInt64ShortDecimalRLE() int64ShortDecimalBatchReadWithSkipHelper(89, 29, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues); int64ShortDecimalBatchReadWithSkipHelper(1024, 1024, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues); } + + @Test + public void testUuidPlainPlain() + throws IOException + { + int valueCount = 2048; + List expectedValues = new ArrayList<>(); + + byte[] pageBytes = generatePlainValuesPage(valueCount, 128, new Random(83), expectedValues); + // page is read assuming in big endian, so we need to flip the bytes around when comparing read values + expectedValues = expectedValues.stream() + .map(Long.class::cast) + .mapToLong(Long::longValue) + .map(Long::reverseBytes) + .boxed() + .collect(toImmutableList()); + uuidBatchReadWithSkipHelper(valueCount, 0, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(29, 0, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(89, 0, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(1024, 0, valueCount, uuidPlain(pageBytes), expectedValues); + + uuidBatchReadWithSkipHelper(256, 29, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(89, 29, valueCount, uuidPlain(pageBytes), expectedValues); + uuidBatchReadWithSkipHelper(1024, 1024, valueCount, uuidPlain(pageBytes), expectedValues); + } + + @Test + public void testUuidRLEDictionary() + throws IOException + { + int valueCount = 2048; + Random random = new Random(83); + int dictionarySize = 29; + List dictionary = new ArrayList<>(); + List dictionaryIds = new ArrayList<>(); + + byte[] dictionaryPage = generatePlainValuesPage(dictionarySize, 128, random, dictionary); + byte[] dataPage = generateDictionaryIdPage2048(dictionarySize - 1, random, dictionaryIds); + + List expectedValues = new ArrayList<>(); + for (Integer dictionaryId : dictionaryIds) { + expectedValues.add(dictionary.get(dictionaryId * 2)); + expectedValues.add(dictionary.get((dictionaryId * 2) + 1)); + } + + expectedValues = expectedValues.stream() + .map(Long.class::cast) + .mapToLong(Long::longValue) + .map(Long::reverseBytes) + .boxed() + .collect(toImmutableList()); + + BinaryBatchDictionary binaryDictionary = new BinaryBatchDictionary(new DictionaryPage(Slices.wrappedBuffer(dictionaryPage), dictionarySize, PLAIN_DICTIONARY), 16); + uuidBatchReadWithSkipHelper(valueCount, 0, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(29, 0, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(89, 0, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(1024, 0, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + + uuidBatchReadWithSkipHelper(256, 29, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(89, 29, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + uuidBatchReadWithSkipHelper(1024, 1024, valueCount, uuidRle(dataPage, dictionarySize, binaryDictionary), expectedValues); + } } diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java index a3768d560ca37..e7f1aaefa439f 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java @@ -85,7 +85,7 @@ public abstract class AbstractColumnReaderBenchmark @Param({ "true", "false", }) - public boolean enableOptimizedReader; + public boolean batchReaderEnabled; protected abstract PrimitiveField createPrimitiveField(); @@ -95,7 +95,7 @@ public abstract class AbstractColumnReaderBenchmark protected abstract void writeValue(ValuesWriter writer, T batch, int index); - protected abstract boolean getEnableOptimizedReader(); + protected abstract boolean getBatchReaderEnabled(); @Setup public void setup() @@ -126,7 +126,7 @@ public void setup() public int read() throws IOException { - ColumnReader columnReader = ColumnReaderFactory.createReader(field.getDescriptor(), getEnableOptimizedReader()); + ColumnReader columnReader = ColumnReaderFactory.createReader(field.getDescriptor(), getBatchReaderEnabled()); columnReader.init(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages).listIterator(), MAX_VALUES, null, null, Optional.empty(), null, -1, -1), field, null); ColumnReader reader = null; diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java index 32e87327e20fa..071c92cb4a0df 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java @@ -79,9 +79,9 @@ protected void writeValue(ValuesWriter writer, long[] batch, int index) } @Override - protected boolean getEnableOptimizedReader() + protected boolean getBatchReaderEnabled() { - return enableOptimizedReader; + return batchReaderEnabled; } @Override diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java index 664f3e69fad5b..62d094a4bc4b6 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java @@ -78,9 +78,9 @@ protected long[] generateDataBatch(int size) } @Override - protected boolean getEnableOptimizedReader() + protected boolean getBatchReaderEnabled() { - return enableOptimizedReader; + return batchReaderEnabled; } @Override diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java new file mode 100644 index 0000000000000..94258521272d1 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkUuidColumnReader.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import com.facebook.presto.common.type.UuidType; +import com.facebook.presto.parquet.PrimitiveField; +import com.facebook.presto.parquet.RichColumnDescriptor; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; + +import java.util.Random; + +import static com.facebook.presto.parquet.reader.TestData.randomBigInteger; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; + +public class BenchmarkUuidColumnReader + extends AbstractColumnReaderBenchmark +{ + private static final int LENGTH = 2 * SIZE_OF_LONG; + + private static final Random random = new Random(1); + + @Override + protected PrimitiveField createPrimitiveField() + { + PrimitiveType parquetType = Types.optional(FIXED_LEN_BYTE_ARRAY) + .length(LENGTH) + .as(LogicalTypeAnnotation.uuidType()) + .named("name"); + + return new PrimitiveField( + UuidType.UUID, + -1, + -1, + true, + new RichColumnDescriptor(new ColumnDescriptor(new String[] {"test"}, parquetType, 0, 0), parquetType), + 0); + } + + @Override + protected ValuesWriter createValuesWriter(int bufferSize) + { + switch (parquetEncoding) { + case PLAIN: + return new FixedLenByteArrayPlainValuesWriter(LENGTH, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + case DELTA_BYTE_ARRAY: + return new DeltaByteArrayWriter(bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + default: + throw new RuntimeException("Cannot parse parquetEncoding:" + parquetEncoding); + } + } + + @Override + protected void writeValue(ValuesWriter writer, long[] batch, int index) + { + Slice slice = Slices.wrappedLongArray(batch, index * 2, 2); + writer.writeBytes(Binary.fromConstantByteArray(slice.getBytes())); + } + + @Override + protected boolean getBatchReaderEnabled() + { + return batchReaderEnabled; + } + + @Override + protected long[] generateDataBatch(int size) + { + long[] batch = new long[size * 2]; + for (int i = 0; i < size; i++) { + Slice slice = randomBigInteger(random); + batch[i * 2] = slice.getLong(0); + batch[(i * 2) + 1] = slice.getLong(SIZE_OF_LONG); + } + return batch; + } + + public static void main(String[] args) + throws Exception + { + run(BenchmarkUuidColumnReader.class); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java index 270e1460c489b..b6c6249107d99 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java @@ -48,6 +48,7 @@ import static com.facebook.presto.common.type.SmallintType.SMALLINT; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TinyintType.TINYINT; +import static com.facebook.presto.common.type.UuidType.UUID; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.io.Files.createTempDir; @@ -127,6 +128,7 @@ public static Object[][] types() {DATE, LogicalTypeAnnotation.DateLogicalTypeAnnotation.class, "INT32"}, {TIMESTAMP, LogicalTypeAnnotation.TimestampLogicalTypeAnnotation.class, "INT64"}, {MAP, LogicalTypeAnnotation.MapLogicalTypeAnnotation.class, null}, + {UUID, LogicalTypeAnnotation.UUIDLogicalTypeAnnotation.class, "FIXED_LEN_BYTE_ARRAY"}, {ROW, null, null} }; }