diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java index 9d7fe39b46482..8c117033a686b 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java @@ -28,6 +28,7 @@ import com.facebook.presto.orc.StorageStripeMetadataSource; import com.facebook.presto.orc.StripeMetadataSourceFactory; import com.facebook.presto.orc.cache.StorageOrcFileTailSource; +import com.facebook.presto.parquet.FileParquetDataSource; import com.facebook.presto.parquet.cache.MetadataReader; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; @@ -56,6 +57,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.mapred.FileSplit; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -63,6 +69,7 @@ import java.io.File; import java.io.IOException; +import java.sql.Timestamp; import java.time.Instant; import java.util.Arrays; import java.util.List; @@ -106,6 +113,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.filter; import static io.airlift.slice.Slices.utf8Slice; +import static java.io.File.createTempFile; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardListObjectInspector; @@ -120,6 +128,7 @@ import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector; import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaTimestampObjectInspector; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -498,6 +507,52 @@ public void testParquetPageSourceSchemaEvolution(int rowCount) .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER)); } + @Test + public void testParquetLogicalTypes() throws IOException + { + HiveFileWriterFactory parquetFileWriterFactory = new ParquetFileWriterFactory(HDFS_ENVIRONMENT, FUNCTION_AND_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE); + + List> allSessionProperties = getAllSessionProperties( + new HiveClientConfig(), + new ParquetFileWriterConfig().setParquetOptimizedWriterEnabled(true), + createOrcHiveCommonClientConfig(true, 100.0)); + + TestingConnectorSession session = new TestingConnectorSession(allSessionProperties); + File file = createTempFile("logicaltest", ".parquet"); + long timestamp = new DateTime(2011, 5, 6, 7, 8, 9, 123).getMillis(); + + try { + createTestFile( + file.getAbsolutePath(), + PARQUET, + HiveCompressionCodec.NONE, + ImmutableList.of(new TestColumn("t_timestamp", javaTimestampObjectInspector, new Timestamp(timestamp), timestamp)), + session, + 3, + parquetFileWriterFactory); + + FileParquetDataSource dataSource = new FileParquetDataSource(file); + ParquetMetadata parquetMetadata = MetadataReader.readFooter( + dataSource, + file.length(), + Optional.empty(), + false).getParquetMetadata(); + + MessageType writtenSchema = parquetMetadata.getFileMetaData().getSchema(); + Type timestampType = writtenSchema.getType("t_timestamp"); + if (timestampType.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation annotation = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) timestampType.getLogicalTypeAnnotation(); + assertFalse(annotation.isAdjustedToUTC()); + } + else { + fail("the logical type annotation saved was not of type TimestampLogicalTypeAnnotation"); + } + } + finally { + file.delete(); + } + } + private static List getTestColumnsSupportedByParquet() { // Write of complex hive data to Parquet is broken diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index 1550d47846ac1..6071bab59cbd4 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -38,18 +38,15 @@ runtime - - org.apache.parquet - parquet-format-structures - ${dep.parquet.version} - runtime - - org.apache.parquet parquet-common ${dep.parquet.version} + + org.apache.parquet + parquet-format-structures + org.apache.parquet parquet-format diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java new file mode 100644 index 0000000000000..e96b0a501d788 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java @@ -0,0 +1,185 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.iceberg; + +import com.facebook.presto.Session; +import com.facebook.presto.common.Page; +import com.facebook.presto.common.type.BooleanType; +import com.facebook.presto.common.type.DoubleType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.common.type.TypeSignature; +import com.facebook.presto.common.type.TypeSignatureParameter; +import com.facebook.presto.hive.FileFormatDataSourceStats; +import com.facebook.presto.hive.HdfsContext; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveCompressionCodec; +import com.facebook.presto.hive.HiveDwrfEncryptionProvider; +import com.facebook.presto.hive.NodeVersion; +import com.facebook.presto.hive.OrcFileWriterConfig; +import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.parquet.FileParquetDataSource; +import com.facebook.presto.parquet.cache.MetadataReader; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.session.PropertyMetadata; +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder; +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.DateType.DATE; +import static com.facebook.presto.common.type.HyperLogLogType.HYPER_LOG_LOG; +import static com.facebook.presto.common.type.IntegerType.INTEGER; +import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.iceberg.IcebergAbstractMetadata.toIcebergSchema; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.IcebergSessionProperties.dataSizeSessionProperty; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static com.google.common.io.Files.createTempDir; +import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert; +import static org.testng.Assert.assertEquals; + +public class TestIcebergFileWriter +{ + private IcebergFileWriterFactory icebergFileWriterFactory; + private HdfsContext hdfsContext; + private ConnectorSession connectorSession; + + @BeforeClass + public void setup() throws Exception + { + ConnectorId connectorId = new ConnectorId("iceberg"); + SessionPropertyManager sessionPropertyManager = new SessionPropertyManager(); + + sessionPropertyManager.addConnectorSessionProperties( + connectorId, + ImmutableList.of( + dataSizeSessionProperty("parquet_writer_page_size", "Parquet: Writer page size", new DataSize(10, DataSize.Unit.KILOBYTE), false), + dataSizeSessionProperty("parquet_writer_block_size", "Parquet: Writer block size", new DataSize(10, DataSize.Unit.KILOBYTE), false), + new PropertyMetadata<>( + "parquet_writer_version", + "Parquet: Writer version", + VARCHAR, + ParquetProperties.WriterVersion.class, + ParquetProperties.WriterVersion.PARQUET_2_0, + false, + value -> ParquetProperties.WriterVersion.valueOf(((String) value).toUpperCase()), + ParquetProperties.WriterVersion::name), + new PropertyMetadata<>( + "compression_codec", + "The compression codec to use when writing files", + VARCHAR, + HiveCompressionCodec.class, + HiveCompressionCodec.NONE, + false, + value -> HiveCompressionCodec.valueOf(((String) value).toUpperCase()), + HiveCompressionCodec::name))); + + Session session = testSessionBuilder(sessionPropertyManager) + .setCatalog(ICEBERG_CATALOG) + .setSchema("tpch") + .build(); + + this.connectorSession = session.toConnectorSession(connectorId); + TypeManager typeManager = new TestingTypeManager(); + this.hdfsContext = new HdfsContext(connectorSession); + HdfsEnvironment hdfsEnvironment = IcebergDistributedTestBase.getHdfsEnvironment(); + this.icebergFileWriterFactory = new IcebergFileWriterFactory(hdfsEnvironment, typeManager, + new FileFormatDataSourceStats(), new NodeVersion("test"), new OrcFileWriterConfig(), HiveDwrfEncryptionProvider.NO_ENCRYPTION); + } + + @Test + public void testWriteParquetFileWithLogicalTypes() throws Exception + { + Path path = new Path(createTempDir().getAbsolutePath() + "/test.parquet"); + Schema icebergSchema = toIcebergSchema(ImmutableList.of( + new ColumnMetadata("a", VARCHAR), + new ColumnMetadata("b", INTEGER), + new ColumnMetadata("c", TIMESTAMP), + new ColumnMetadata("d", DATE))); + IcebergFileWriter icebergFileWriter = this.icebergFileWriterFactory.createFileWriter(path, icebergSchema, new JobConf(), connectorSession, + hdfsContext, FileFormat.PARQUET, MetricsConfig.getDefault()); + + List input = rowPagesBuilder(VARCHAR, BIGINT, TIMESTAMP, DATE) + .addSequencePage(100, 0, 0, 123, 100) + .addSequencePage(100, 100, 100, 223, 100) + .addSequencePage(100, 200, 200, 323, 100) + .build(); + for (Page page : input) { + icebergFileWriter.appendRows(page); + } + icebergFileWriter.commit(); + + File parquetFile = new File(path.toString()); + FileParquetDataSource dataSource = new FileParquetDataSource(parquetFile); + ParquetMetadata parquetMetadata = MetadataReader.readFooter( + dataSource, + parquetFile.length(), + Optional.empty(), + false).getParquetMetadata(); + MessageType writtenSchema = parquetMetadata.getFileMetaData().getSchema(); + MessageType originalSchema = convert(icebergSchema, "table"); + assertEquals(originalSchema, writtenSchema); + } + + private static class TestingTypeManager + implements TypeManager + { + @Override + public Type getType(TypeSignature signature) + { + for (Type type : getTypes()) { + if (signature.getBase().equals(type.getTypeSignature().getBase())) { + return type; + } + } + return null; + } + + @Override + public Type getParameterizedType(String baseTypeName, List typeParameters) + { + return getType(new TypeSignature(baseTypeName, typeParameters)); + } + + @Override + public boolean canCoerce(Type actualType, Type expectedType) + { + throw new UnsupportedOperationException(); + } + + private List getTypes() + { + return ImmutableList.of(BooleanType.BOOLEAN, INTEGER, BIGINT, DoubleType.DOUBLE, VARCHAR, VARBINARY, TIMESTAMP, DATE, HYPER_LOG_LOG); + } + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java index 908f006378573..8af920d74d7b8 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java @@ -32,14 +32,20 @@ import org.apache.parquet.format.ColumnCryptoMetaData; import org.apache.parquet.format.ColumnMetaData; import org.apache.parquet.format.ConvertedType; +import org.apache.parquet.format.DecimalType; import org.apache.parquet.format.Encoding; import org.apache.parquet.format.EncryptionWithColumnKey; import org.apache.parquet.format.FileCryptoMetaData; import org.apache.parquet.format.FileMetaData; +import org.apache.parquet.format.IntType; import org.apache.parquet.format.KeyValue; +import org.apache.parquet.format.LogicalType; import org.apache.parquet.format.RowGroup; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.format.Statistics; +import org.apache.parquet.format.TimeType; +import org.apache.parquet.format.TimeUnit; +import org.apache.parquet.format.TimestampType; import org.apache.parquet.format.Type; import org.apache.parquet.format.Util; import org.apache.parquet.format.converter.ParquetMetadataConverter; @@ -49,6 +55,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -366,10 +373,14 @@ private static void readTypeSchema(Types.GroupBuilder builder, Iterator annotationWithParams = getLogicalTypeAnnotationWithParameters(type); + annotationWithParams.ifPresent(typeBuilder::as); + } if (element.isSetField_id()) { typeBuilder.id(element.field_id); } @@ -377,6 +388,62 @@ private static void readTypeSchema(Types.GroupBuilder builder, Iterator getLogicalTypeAnnotationWithParameters(LogicalType type) + { + if (type.isSetTIME()) { + TimeType time = type.getTIME(); + return Optional.of(LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, convertTimeUnit(time.unit))); + } + if (type.isSetDECIMAL()) { + DecimalType decimal = type.getDECIMAL(); + return Optional.of(LogicalTypeAnnotation.decimalType(decimal.scale, decimal.precision)); + } + if (type.isSetINTEGER()) { + IntType integer = type.getINTEGER(); + return Optional.of(LogicalTypeAnnotation.intType(integer.bitWidth, integer.isSigned)); + } + if (type.isSetTIMESTAMP()) { + TimestampType timestamp = type.getTIMESTAMP(); + return Optional.of(LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit))); + } + if (type.isSetDATE()) { + return Optional.of(LogicalTypeAnnotation.dateType()); + } + if (type.isSetBSON()) { + return Optional.of(LogicalTypeAnnotation.bsonType()); + } + if (type.isSetJSON()) { + return Optional.of(LogicalTypeAnnotation.jsonType()); + } + if (type.isSetUUID()) { + return Optional.of(LogicalTypeAnnotation.uuidType()); + } + if (type.isSetENUM()) { + return Optional.of(LogicalTypeAnnotation.enumType()); + } + if (type.isSetLIST()) { + return Optional.of(LogicalTypeAnnotation.listType()); + } + if (type.isSetMAP()) { + return Optional.of(LogicalTypeAnnotation.mapType()); + } + if (type.isSetSTRING()) { + return Optional.of(LogicalTypeAnnotation.stringType()); + } + return Optional.empty(); + } + + private static LogicalTypeAnnotation.TimeUnit convertTimeUnit(TimeUnit unit) + { + if (unit.isSetMILLIS()) { + return LogicalTypeAnnotation.TimeUnit.MILLIS; + } + else if (unit.isSetNANOS()) { + return LogicalTypeAnnotation.TimeUnit.NANOS; + } + return LogicalTypeAnnotation.TimeUnit.MICROS; + } + public static org.apache.parquet.column.statistics.Statistics readStats(Statistics statistics, PrimitiveTypeName type) { org.apache.parquet.column.statistics.Statistics stats = org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType(type); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/MessageTypeConverter.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/MessageTypeConverter.java deleted file mode 100644 index 1e9788c547251..0000000000000 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/MessageTypeConverter.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.parquet.writer; - -import org.apache.parquet.format.ConvertedType; -import org.apache.parquet.format.FieldRepetitionType; -import org.apache.parquet.format.SchemaElement; -import org.apache.parquet.format.Type; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.TypeVisitor; - -import java.util.ArrayList; -import java.util.List; - -class MessageTypeConverter -{ - private MessageTypeConverter() {} - - static List toParquetSchema(MessageType schema) - { - List result = new ArrayList<>(); - addToList(result, schema); - return result; - } - - private static void addToList(List result, org.apache.parquet.schema.Type field) - { - field.accept(new TypeVisitor() - { - @Override - public void visit(PrimitiveType primitiveType) - { - SchemaElement element = new SchemaElement(primitiveType.getName()); - element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition())); - element.setType(getType(primitiveType.getPrimitiveTypeName())); - if (primitiveType.getOriginalType() != null) { - element.setConverted_type(getConvertedType(primitiveType.getOriginalType())); - } - if (primitiveType.getDecimalMetadata() != null) { - element.setPrecision(primitiveType.getDecimalMetadata().getPrecision()); - element.setScale(primitiveType.getDecimalMetadata().getScale()); - } - if (primitiveType.getTypeLength() > 0) { - element.setType_length(primitiveType.getTypeLength()); - } - if (primitiveType.getId() != null) { - element.setField_id(primitiveType.getId().intValue()); - } - result.add(element); - } - - @Override - public void visit(MessageType messageType) - { - SchemaElement element = new SchemaElement(messageType.getName()); - if (messageType.getId() != null) { - element.setField_id(messageType.getId().intValue()); - } - visitChildren(result, messageType.asGroupType(), element); - } - - @Override - public void visit(GroupType groupType) - { - SchemaElement element = new SchemaElement(groupType.getName()); - element.setRepetition_type(toParquetRepetition(groupType.getRepetition())); - if (groupType.getOriginalType() != null) { - element.setConverted_type(getConvertedType(groupType.getOriginalType())); - } - if (groupType.getId() != null) { - element.setField_id(groupType.getId().intValue()); - } - visitChildren(result, groupType, element); - } - - private void visitChildren(final List result, - GroupType groupType, SchemaElement element) - { - element.setNum_children(groupType.getFieldCount()); - result.add(element); - for (org.apache.parquet.schema.Type field : groupType.getFields()) { - addToList(result, field); - } - } - }); - } - - private static FieldRepetitionType toParquetRepetition(org.apache.parquet.schema.Type.Repetition repetition) - { - return FieldRepetitionType.valueOf(repetition.name()); - } - - private static org.apache.parquet.format.Type getType(PrimitiveType.PrimitiveTypeName type) - { - switch (type) { - case INT64: - return Type.INT64; - case INT32: - return Type.INT32; - case BOOLEAN: - return Type.BOOLEAN; - case BINARY: - return Type.BYTE_ARRAY; - case FLOAT: - return Type.FLOAT; - case DOUBLE: - return Type.DOUBLE; - case INT96: - return Type.INT96; - case FIXED_LEN_BYTE_ARRAY: - return Type.FIXED_LEN_BYTE_ARRAY; - default: - throw new RuntimeException("Unknown primitive type " + type); - } - } - - private static ConvertedType getConvertedType(OriginalType type) - { - switch (type) { - case UTF8: - return ConvertedType.UTF8; - case MAP: - return ConvertedType.MAP; - case MAP_KEY_VALUE: - return ConvertedType.MAP_KEY_VALUE; - case LIST: - return ConvertedType.LIST; - case ENUM: - return ConvertedType.ENUM; - case DECIMAL: - return ConvertedType.DECIMAL; - case DATE: - return ConvertedType.DATE; - case TIME_MILLIS: - return ConvertedType.TIME_MILLIS; - case TIME_MICROS: - return ConvertedType.TIME_MICROS; - case TIMESTAMP_MILLIS: - return ConvertedType.TIMESTAMP_MILLIS; - case TIMESTAMP_MICROS: - return ConvertedType.TIMESTAMP_MICROS; - case INTERVAL: - return ConvertedType.INTERVAL; - case INT_8: - return ConvertedType.INT_8; - case INT_16: - return ConvertedType.INT_16; - case INT_32: - return ConvertedType.INT_32; - case INT_64: - return ConvertedType.INT_64; - case UINT_8: - return ConvertedType.UINT_8; - case UINT_16: - return ConvertedType.UINT_16; - case UINT_32: - return ConvertedType.UINT_32; - case UINT_64: - return ConvertedType.UINT_64; - case JSON: - return ConvertedType.JSON; - case BSON: - return ConvertedType.BSON; - default: - throw new RuntimeException("Unknown original type " + type); - } - } -} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java index 22a5396120723..5c846552d43ee 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetSchemaConverter.java @@ -25,6 +25,7 @@ import com.facebook.presto.spi.PrestoException; import com.google.common.collect.ImmutableList; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -126,9 +127,15 @@ else if (decimalType.isShort()) { if (DATE.equals(type)) { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition).as(OriginalType.DATE).named(name); } - if (BIGINT.equals(type) || TIMESTAMP.equals(type)) { + if (BIGINT.equals(type)) { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).named(name); } + if (TIMESTAMP.equals(type)) { + Types.PrimitiveBuilder parquetTypeBuilder = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition); + LogicalTypeAnnotation annotation = LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS); + parquetTypeBuilder.as(annotation); + return parquetTypeBuilder.named(name); + } if (DOUBLE.equals(type)) { return Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition).named(name); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriter.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriter.java index ca06ab41497f2..c70e312e85bab 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriter.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriter.java @@ -17,6 +17,7 @@ import com.facebook.presto.common.type.Type; import com.facebook.presto.parquet.writer.ColumnWriter.BufferData; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.airlift.slice.DynamicSliceOutput; import io.airlift.slice.OutputStreamSliceOutput; import io.airlift.slice.Slice; @@ -27,8 +28,11 @@ import org.apache.parquet.format.ColumnMetaData; import org.apache.parquet.format.FileMetaData; import org.apache.parquet.format.RowGroup; +import org.apache.parquet.format.SchemaElement; import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import org.openjdk.jol.info.ClassLayout; @@ -62,6 +66,7 @@ public class ParquetWriter implements Closeable { private static final int INSTANCE_SIZE = ClassLayout.parseClass(ParquetWriter.class).instanceSize(); + private static final ParquetMetadataConverter METADATA_CONVERTER = new ParquetMetadataConverter(); private static final int CHUNK_MAX_BYTES = toIntExact(DataSize.valueOf("128MB").toBytes()); private static final int DEFAULT_ROW_GROUP_MAX_ROW_COUNT = 10_000; @@ -259,16 +264,28 @@ static Slice getFooter(List rowGroups, MessageType messageType) { FileMetaData fileMetaData = new FileMetaData(); fileMetaData.setVersion(1); - fileMetaData.setSchema(MessageTypeConverter.toParquetSchema(messageType)); long totalRows = rowGroups.stream().mapToLong(RowGroup::getNum_rows).sum(); fileMetaData.setNum_rows(totalRows); fileMetaData.setRow_groups(ImmutableList.copyOf(rowGroups)); + fileMetaData.setSchema(getParquetSchema(fileMetaData.getCreated_by(), messageType)); + DynamicSliceOutput dynamicSliceOutput = new DynamicSliceOutput(40); Util.writeFileMetaData(fileMetaData, dynamicSliceOutput); return dynamicSliceOutput.slice(); } + private static List getParquetSchema(String createdBy, MessageType messageType) + { + org.apache.parquet.hadoop.metadata.FileMetaData parquetMetaDataInput = new org.apache.parquet.hadoop.metadata.FileMetaData( + messageType, + ImmutableMap.of(), + createdBy); + + FileMetaData parquetMetaData = METADATA_CONVERTER.toParquetMetadata(1, new ParquetMetadata(parquetMetaDataInput, ImmutableList.of())); + return parquetMetaData.getSchema(); + } + private void updateRowGroups(List columnMetaData) { // TODO Avoid writing empty row group diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java index a6217388b1ecc..270e1460c489b 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/writer/TestParquetWriter.java @@ -14,36 +14,69 @@ package com.facebook.presto.parquet.writer; import com.facebook.presto.common.PageBuilder; +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.common.type.MapType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.TimestampType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.parquet.FileParquetDataSource; +import com.facebook.presto.parquet.cache.MetadataReader; import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.testng.annotations.AfterClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.Files; import java.util.List; +import java.util.Optional; import java.util.Random; -import java.util.UUID; +import static com.facebook.presto.common.block.MethodHandleUtil.nativeValueGetter; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.common.type.DateType.DATE; +import static com.facebook.presto.common.type.DoubleType.DOUBLE; import static com.facebook.presto.common.type.IntegerType.INTEGER; +import static com.facebook.presto.common.type.SmallintType.SMALLINT; +import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.common.type.TinyintType.TINYINT; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.io.Files.createTempDir; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static java.util.UUID.randomUUID; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +@Test(singleThreaded = true) public class TestParquetWriter { private File temporaryDirectory; private File parquetFile; + private static final DecimalType VERY_SHORT_DECIMAL = DecimalType.createDecimalType(1); + private static final DecimalType SHORT_DECIMAL = DecimalType.createDecimalType(18); + private static final DecimalType LONG_DECIMAL = DecimalType.createDecimalType(38); + private static final Type MAP = new MapType( + VARCHAR, + VARCHAR, + nativeValueGetter(VARCHAR), + nativeValueGetter(VARCHAR)); + private static final Type ROW = RowType.from(ImmutableList.of(RowType.field("varchar", VARCHAR))); + @Test public void testRowGroupFlushInterleavedColumnWriterFallbacks() { @@ -65,7 +98,7 @@ public void testRowGroupFlushInterleavedColumnWriterFallbacks() // maintain col_1's dictionary size approximately half of raw data BIGINT.writeLong(pageBuilder.getBlockBuilder(0), pageIdx * 100 + rand.nextInt(50)); INTEGER.writeLong(pageBuilder.getBlockBuilder(1), rand.nextInt(100000000)); - VARCHAR.writeString(pageBuilder.getBlockBuilder(2), UUID.randomUUID().toString()); + VARCHAR.writeString(pageBuilder.getBlockBuilder(2), randomUUID().toString()); BOOLEAN.writeBoolean(pageBuilder.getBlockBuilder(3), rand.nextBoolean()); pageBuilder.declarePosition(); } @@ -77,8 +110,95 @@ public void testRowGroupFlushInterleavedColumnWriterFallbacks() } } + @DataProvider(name = "testMetadataCreation") + public static Object[][] types() + { + return new Object[][] { + {TINYINT, null, "INT32"}, + {SMALLINT, null, "INT32"}, + {BIGINT, null, "INT64"}, + {INTEGER, null, "INT32"}, + {DOUBLE, null, "DOUBLE"}, + {VARCHAR, null, "BINARY"}, + {BOOLEAN, null, "BOOLEAN"}, + {VERY_SHORT_DECIMAL, LogicalTypeAnnotation.DecimalLogicalTypeAnnotation.class, "INT32"}, + {SHORT_DECIMAL, LogicalTypeAnnotation.DecimalLogicalTypeAnnotation.class, "INT64"}, + {LONG_DECIMAL, LogicalTypeAnnotation.DecimalLogicalTypeAnnotation.class, "FIXED_LEN_BYTE_ARRAY"}, + {DATE, LogicalTypeAnnotation.DateLogicalTypeAnnotation.class, "INT32"}, + {TIMESTAMP, LogicalTypeAnnotation.TimestampLogicalTypeAnnotation.class, "INT64"}, + {MAP, LogicalTypeAnnotation.MapLogicalTypeAnnotation.class, null}, + {ROW, null, null} + }; + } + + @Test(dataProvider = "testMetadataCreation") + public void testMetadataCreation(Type type, Class annotationType, String primitiveName) + throws Exception + { + temporaryDirectory = createTempDir(); + parquetFile = new File(temporaryDirectory, randomUUID() + ".parquet"); + + List names = ImmutableList.of("col_1"); + List types = ImmutableList.of(type); + + ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder() + .setMaxPageSize(DataSize.succinctBytes(1000)) + .setMaxBlockSize(DataSize.succinctBytes(15000)) + .setMaxDictionaryPageSize(DataSize.succinctBytes(1000)) + .build(); + + try (ParquetWriter parquetWriter = createParquetWriter(parquetFile, types, names, parquetWriterOptions, CompressionCodecName.UNCOMPRESSED)) { + PageBuilder pageBuilder = new PageBuilder(0, types); + pageBuilder.getBlockBuilder(0).appendNull(); + pageBuilder.declarePosition(); + parquetWriter.write(pageBuilder.build()); + } + FileParquetDataSource dataSource = new FileParquetDataSource(parquetFile); + ParquetMetadata parquetMetadata = MetadataReader.readFooter( + dataSource, + parquetFile.length(), + Optional.empty(), + false).getParquetMetadata(); + MessageType parquetSchema = parquetMetadata.getFileMetaData().getSchema(); + List parquetTypes = parquetSchema.getFields(); + + checkTypes(parquetTypes.get(0), annotationType, primitiveName); + + if (type instanceof DecimalType) { + org.apache.parquet.schema.Type decimalType = parquetTypes.get(0); + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalTypeAnnotation = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) decimalType.getLogicalTypeAnnotation(); + assertEquals(decimalTypeAnnotation.getScale(), ((DecimalType) type).getScale()); + assertEquals(decimalTypeAnnotation.getPrecision(), ((DecimalType) type).getPrecision()); + } + else if (type instanceof TimestampType) { + org.apache.parquet.schema.Type timestampType = parquetTypes.get(0); + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timeAnnotation = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) timestampType.getLogicalTypeAnnotation(); + assertEquals(timeAnnotation.getUnit(), LogicalTypeAnnotation.TimeUnit.MILLIS); + assertFalse(timeAnnotation.isAdjustedToUTC()); + } + } + + private static void checkTypes(org.apache.parquet.schema.Type type, Class expectedAnnotationType, String expectedPrimitiveTypeName) + { + if (expectedPrimitiveTypeName != null) { + PrimitiveType primitiveType = type.asPrimitiveType(); + assertEquals(primitiveType.getPrimitiveTypeName().name(), expectedPrimitiveTypeName); + } + else { + assertThrows(type::asPrimitiveType); + } + + LogicalTypeAnnotation annotation = type.getLogicalTypeAnnotation(); + if (expectedAnnotationType != null) { + assertTrue(expectedAnnotationType.isInstance(annotation)); + } + else { + assertNull(annotation); + } + } + public static ParquetWriter createParquetWriter(File outputFile, List types, List columnNames, - ParquetWriterOptions parquetWriterOptions, CompressionCodecName compressionCodecName) + ParquetWriterOptions parquetWriterOptions, CompressionCodecName compressionCodecName) throws Exception { checkArgument(types.size() == columnNames.size()); @@ -86,7 +206,7 @@ public static ParquetWriter createParquetWriter(File outputFile, List type types, columnNames); return new ParquetWriter( - new FileOutputStream(outputFile), + Files.newOutputStream(outputFile.toPath()), schemaConverter.getMessageType(), schemaConverter.getPrimitiveTypes(), columnNames,