Skip to content

Persist proper logical type parameters in parquet files#23388

Merged
tdcmeehan merged 1 commit intoprestodb:masterfrom
auden-woolfson:stop_normalizing_timestamps_in_iceberg_connecor
Sep 11, 2024
Merged

Persist proper logical type parameters in parquet files#23388
tdcmeehan merged 1 commit intoprestodb:masterfrom
auden-woolfson:stop_normalizing_timestamps_in_iceberg_connecor

Conversation

@auden-woolfson
Copy link
Contributor

@auden-woolfson auden-woolfson commented Aug 6, 2024

Description

When reading/writing parquet files, logical type parameters are not being persisted properly (see issue #23361). This was noticed using the timestamp type with iceberg, where isAdjustedUTC was always set to true. The parquet reading and writing functionality was updated, and a test was added to the iceberg connector in order to fix this issue.

Similar issues were noticed in the hive connector in #23454, these required some changes to the parquet schema converter.

== RELEASE NOTES ==

Presto Parquet Changes
* Fix bug so that proper logical type parameters are now read and written to Parquet files :pr:`23388`

Presto Iceberg Changes
* Add test for logical type storage in parquet files :pr:`23388`

@auden-woolfson auden-woolfson added the wip Work In Progress label Aug 6, 2024
@auden-woolfson auden-woolfson self-assigned this Aug 6, 2024
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Aug 6, 2024

CLA Signed

The committers listed above are authorized under a signed CLA.

  • ✅ login: auden-woolfson / name: Auden Woolfson (e431b58)

Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a unit test which verifies the behavior for basic timestamp and types with nested timestamps

@auden-woolfson auden-woolfson marked this pull request as draft August 6, 2024 20:45
Comment on lines 150 to 167

List<Types.NestedField> updatedFields = icebergSchema.columns().stream()
.map(field -> {
if (field.type() instanceof Types.TimestampType) {
return Types.NestedField.of(
field.fieldId(),
field.isOptional(),
field.name(),
Types.fromPrimitiveString(Types.TimestampType.withoutZone().toString()));
}

if (field.type().isNestedType()) {
return field;
}

return field;})
.collect(Collectors.toList());
icebergSchema = new Schema(updatedFields);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see, the icebergSchema has no need to be translated here. Seems that the information loss of adjustToUTC is omitted when flushing the parquet message type to the parquet file's footer, and vice versa reading parquet message type from the parquet file's footer metadata.

Referring to MessageTypeConverter.toParquetSchema() and MetadataReader.readTypeSchema().

Copy link
Contributor Author

@auden-woolfson auden-woolfson Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. I think this issue ultimately boils down to the Apache OriginalType class missing any parameter information, and the conversion methods basically just defaulting to isAdjustedToUTC. The PrimitiveType class we use in messageTypeConverter has a logical type annotation, but there is no way to directly build a LogicalType from this.

Within the Apache library, there exists an interface LogicalTypeAnnotationVisitor, and it's inheritor LogicalTypeConverterVisitor does exactly what we need here, however it is private to ParquetMetadataConverter.

This implementation can only be used through toParquetMetadata, which I'm not sure why we aren't currently making use of. Instead of setting the schema of our file metadata using a custom method that currently isn't working for logical types, we could just convert the entire FileMetaData object using the method that Apache already provides us.

The other solution would be to copy the implementation of 'LogicalTypeConverterVisitor' into our message type converter so we can get types from annotations.

TLDR: Apache doesn't expose a method for us to keep going with our custom converter implementation, we can either switch to using their metadata converter or copy some of their protected code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. Our own implementation omitted the logical type annotation. Besides the methods you mentioned above which may introduce a lot of change, another shortcut way maybe that we do special handle for the primitive type with logical type annotation of TimestampLogicalTypeAnnotation in TypeVisitor.visit(primitiveType), set logical type for it's result SchemaElement. And do the opposite thing in MetadataReader.readTypeSchema(...). This is not a universal method, but it can be proven to be feasible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried coding it up this way on a separate branch...

            public void visit(PrimitiveType primitiveType)
            {
                SchemaElement element = new SchemaElement(primitiveType.getName());
                element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition()));
                element.setType(getType(primitiveType.getPrimitiveTypeName()));

                if (primitiveType.getOriginalType() != null) {
                    element.setConverted_type(getConvertedType(primitiveType.getOriginalType()));
                    LogicalTypeAnnotation annotation = primitiveType.getLogicalTypeAnnotation();

                    if (annotation instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
                        TimeUnit unit = TimeUnit.MICROS(new MicroSeconds());
                        switch(((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) annotation).getUnit()) {
                            case NANOS:
                                unit = TimeUnit.NANOS(new NanoSeconds());
                                break;
                            case MILLIS:
                                unit = TimeUnit.MILLIS(new MilliSeconds());
                                break;
                        }

                        element.setLogicalType(LogicalType.TIMESTAMP(
                                new TimestampType(((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) annotation).isAdjustedToUTC(), unit)));
                    }
                }

This could obviously be separated into another method if need be.

I think this will work but wouldn't it be better to use the method Apache already provides? What if in the future we run into a similar problem with another type? We are maintaining a custom type converter that doesn't have any custom functionality apart from the provided one as far as I can tell.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be better to use the method Apache already provides?

Of course, that would be better.

@tdcmeehan tdcmeehan self-assigned this Aug 7, 2024
.setRequiredConfigurationProperties(ImmutableMap.of())
.initialize();

this.icebergFileWriterFactory = injector.getInstance(IcebergFileWriterFactory.class);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to figure out the best way to instantiate this class in the tests here but it has been a very convoluted process. Right now I have this injector implementation but it is still giving me some explicit bindings error for my hive metastore. Does anybody else know if there is a better way to go about this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can build IcebergFileWriterFactory directly. I think the following code could work, you can have a try:

        Session session = getQueryRunner().getDefaultSession();
        ConnectorSession connectorSession = session.toConnectorSession(new ConnectorId("iceberg"));
        TypeManager typeManager = new TypeManager()
        {
            @Override
            public Type getType(TypeSignature signature)
            {
                for (Type type : getTypes()) {
                    if (signature.getBase().equals(type.getTypeSignature().getBase())) {
                        return type;
                    }
                }
                return null;
            }

            @Override
            public Type getParameterizedType(String baseTypeName, List<TypeSignatureParameter> typeParameters)
            {
                return getType(new TypeSignature(baseTypeName, typeParameters));
            }

            @Override
            public boolean canCoerce(Type actualType, Type expectedType)
            {
                throw new UnsupportedOperationException();
            }

            private List<Type> getTypes()
            {
                return ImmutableList.of(BOOLEAN, INTEGER, BIGINT, DOUBLE, VARCHAR, VARBINARY, TIMESTAMP, DATE, HYPER_LOG_LOG);
            }
        };
        HdfsContext hdfsContext = new HdfsContext(connectorSession);
        HdfsEnvironment hdfsEnvironment = IcebergDistributedTestBase.getHdfsEnvironment();
        IcebergFileWriterFactory icebergFileWriterFactory = new IcebergFileWriterFactory(hdfsEnvironment, typeManager,
                new FileFormatDataSourceStats(), new NodeVersion("test"), new OrcFileWriterConfig(), HiveDwrfEncryptionProvider.NO_ENCRYPTION);

        Path path = new Path(createTempDir().getAbsolutePath());
        Schema schema = toIcebergSchema(ImmutableList.of(new ColumnMetadata("a", VARCHAR), new ColumnMetadata("b", INTEGER), new ColumnMetadata("c", TIMESTAMP)));
        IcebergFileWriter icebergFileWriter = icebergFileWriterFactory.createFileWriter(path, schema, new JobConf(), connectorSession,
                hdfsContext, FileFormat.PARQUET, MetricsConfig.getDefault());

        List<Page> input = rowPagesBuilder(VARCHAR, BIGINT, TIMESTAMP)
                .addSequencePage(100, 0, 0, 123)
                .addSequencePage(100, 100, 100, 223)
                .addSequencePage(100, 200, 200, 323)
                .build();
        for (Page page : input) {
            icebergFileWriter.appendRows(page);
        }
        icebergFileWriter.commit();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I was able to add this into the test. It is currently failing but I'm pretty sure this is due to the reader not being fixed yet. Will work on that this afternoon

TimestampType timestamp = type.getTIMESTAMP();
typeBuilder.as(LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit)));
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only had to include these four because they are the ones that include logical type parameters that need to persist through the conversion process. This is an adaptation of the getLogicalTypeAnnotation method from the parquet metadata converter. The actual method does not work since it uses a switch statement on the wrong type, or a type that can't be used for some reason...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this a bit more? I don't understand why we need this corresponding reader change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we were losing the logical type information upon reading the parquet schema. The test will not pass without this logic in place. It's fine if we don't need logical type info for storage in certain databases/formats but within the parquet module it is important to at least read it so that choice can be made within other connectors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you feel that there are sufficient tests in place to ensure that the type conversion logic works with the Parquet built in converter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which converter are you referring to? I'm not sure what tests Apache has in place for their libraries...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's work on some of those tests, the scope of the change makes me nervous that the existing unit test coverage isn't sufficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should special-case for only certain types when reading logical types. If a logical type is set, we should call a method which does the logical type conversion, and then sets the logical type in the descriptor. If the logical type isn't known during conversion, then throw an exception.

See here for an example: https://github.com/trinodb/trino/blob/0d5576a67a855bf95123acaf3ea836d1963861ef/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java#L221-L227

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary, the logical type annotations are already of the proper type. I just added the date type which isn't included in the logic here, to testWriteParquetFileWithLogicalTypes and it is still passing. The only purpose of this logic is to make sure that the parameters are correct, so it only needs to check the logical types that take parameters. Adding conditions for each logical type would be overkill, and confusing since most of the conditions wouldn't actually result in any additional functionality.

That being said, I can extract it into a method named something like getLogicalTypeAnnotationWithParameters to make the intentions a little more clear here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness' sake, I do think that we should exhaustively convert all logical types so we don't have to touch this code afterwards. I agree it doesn't actually make much difference except for parameterized types or other logical types which are stored in the same physical format (e.g. long decimal and UUID). I would prefer to do the exhaustive conversion.

On another note, is this the error you encountered with the other approach?

java.lang.NoSuchMethodError: org.apache.parquet.format.LogicalType.getSetField()Lcom/facebook/presto/hive/$internal/parquet/org/apache/thrift/TFieldIdEnum;
	at org.apache.parquet.format.converter.ParquetMetadataConverter.getLogicalTypeAnnotation(ParquetMetadataConverter.java:1084)
	at org.apache.parquet.format.converter.ParquetMetadataConverter.buildChildren(ParquetMetadataConverter.java:1715)
	at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetSchema(ParquetMetadataConverter.java:1670)
	at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:1526)
	at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:1490)
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:591)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:654)
	at org.apache.iceberg.parquet.ParquetUtil.fileMetrics(ParquetUtil.java:80)
	at org.apache.iceberg.parquet.ParquetUtil.fileMetrics(ParquetUtil.java:75)
	at com.facebook.presto.iceberg.IcebergParquetFileWriter.lambda$getMetrics$0(IcebergParquetFileWriter.java:77)
	at com.facebook.presto.hive.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:23)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I believe that was the error I got, the getSetField method does not exist.

I can update it to an exhaustive list.

@auden-woolfson auden-woolfson marked this pull request as ready for review August 12, 2024 22:28
@auden-woolfson auden-woolfson force-pushed the stop_normalizing_timestamps_in_iceberg_connecor branch 2 times, most recently from 0645477 to 4f1ff5a Compare August 12, 2024 23:00
@steveburnett
Copy link
Contributor

Suggest revising the release note entry for consistency with the Release Notes Guidelines. Maybe something like this:

== RELEASE NOTES ==

Iceberg Connector Changes
* Fix bug so that proper logical type parameters are now read and written to Parquet files :pr:`23388`

@auden-woolfson auden-woolfson changed the title Stop setting adjusted UTC as true in parquet files from iceberg connector Persist proper logical type parameters in parquet files Aug 13, 2024
@auden-woolfson auden-woolfson added parquet parquet file format related iceberg Apache Iceberg related and removed wip Work In Progress labels Aug 16, 2024
List<String> fileColumnNames = icebergSchema.columns().stream()
.map(Types.NestedField::name)
.collect(toImmutableList());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this whitespace change

@auden-woolfson auden-woolfson force-pushed the stop_normalizing_timestamps_in_iceberg_connecor branch from 4f1ff5a to f357e82 Compare August 16, 2024 17:53
Session session = getQueryRunner().getDefaultSession();
ConnectorId connectorId = new ConnectorId("iceberg");
this.connectorSession = session.toConnectorSession(connectorId);
TypeManager typeManager = new TypeManager()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this into a separate class, like TestingTypeManager.

MessageType originalSchema = convert(icebergSchema, "table");
assertEquals(originalSchema, writtenSchema);
}
catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just have the test throws Exception

import static org.testng.Assert.fail;

public class TestIcebergFileWriter
extends AbstractTestQueryFramework
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a QueryRunner? I can't tell where we actually run a full query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't running a full query anywhere in the test, but the query runner is important for getting a valid connector session. There are a lot of variables that go into creating a connector session that actually works here. Maybe we don't need to extend the abstract class, and we can just instantiate the query runner in the before class method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use TestingSession to help? Here's an example from the codebase:

    public static final Session TEST_SESSION = testSessionBuilder()
            .setCatalog("tpch")
            .setSchema(TINY_SCHEMA_NAME)
            .build();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that and it causes the test to fail due to missing session properties... I used "iceberg" as catalog. There is a method to add properties, I can look into how to manually build a good test session for the use case but so far adding the same properties as I do in the constructor for the query runner hasn't worked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work, let me know what failed and we can look into it

fileMetaData.getCreated_by());

ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
FileMetaData parquetMetaData = metadataConverter.toParquetMetadata(1, new ParquetMetadata(parquetMetaDataInput, ImmutableList.of()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this method is stateless. If so, can you make metadataConverter a constant?

Comment on lines 270 to 278
org.apache.parquet.hadoop.metadata.FileMetaData parquetMetaDataInput = new org.apache.parquet.hadoop.metadata.FileMetaData(
messageType,
ImmutableMap.of(),
fileMetaData.getCreated_by());

ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
FileMetaData parquetMetaData = metadataConverter.toParquetMetadata(1, new ParquetMetadata(parquetMetaDataInput, ImmutableList.of()));

fileMetaData.setSchema(parquetMetaData.getSchema());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you extract a helper method to encapsulate the conversion logic to get the Parquet schema?

}

@Test
public void testMetadataCreation()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@auden-woolfson can you help me understand how these tests run before and after your main code changes?

@auden-woolfson auden-woolfson force-pushed the stop_normalizing_timestamps_in_iceberg_connecor branch from 6608044 to 6de9114 Compare August 27, 2024 17:04
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % nit about visibility of LongDecimalType

import static io.airlift.slice.SizeOf.SIZE_OF_LONG;

final class LongDecimalType
public final class LongDecimalType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not anymore, I initially added it to check which decimal type we had in test metadata creation. Good catch

@tdcmeehan
Copy link
Contributor

Please squash commits and we'll be good to merge.

@auden-woolfson auden-woolfson force-pushed the stop_normalizing_timestamps_in_iceberg_connecor branch 2 times, most recently from 4006f8a to 8989792 Compare September 9, 2024 20:50
Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but you'll need to squash the commits again. Glad we got those tests passing. You may want to look at the new pom changes @hantangwangd . These fixed the NoSuchMethod exception the tests were throwing before.

hantangwangd
hantangwangd previously approved these changes Sep 10, 2024
Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for fixing this maven dependency issue @auden-woolfson. The whole change looks good to me.

@auden-woolfson auden-woolfson force-pushed the stop_normalizing_timestamps_in_iceberg_connecor branch from bebdcec to d1ff3c5 Compare September 10, 2024 16:17
@auden-woolfson
Copy link
Contributor Author

Great, thanks for all the help + reviews @ZacBlanco @tdcmeehan @hantangwangd, rebased and squashed

tdcmeehan
tdcmeehan previously approved these changes Sep 10, 2024
ZacBlanco
ZacBlanco previously approved these changes Sep 10, 2024
@auden-woolfson auden-woolfson force-pushed the stop_normalizing_timestamps_in_iceberg_connecor branch 2 times, most recently from d038e5e to fe4b48a Compare September 10, 2024 21:59
The parquet writer, metadata reader, and schema converter were not
properly maintaining logical type parameters when processing types.
Explicit logic to do this had to be added to each of these modules.
@auden-woolfson auden-woolfson force-pushed the stop_normalizing_timestamps_in_iceberg_connecor branch from fe4b48a to e431b58 Compare September 11, 2024 00:40
@tdcmeehan tdcmeehan merged commit e4b8abb into prestodb:master Sep 11, 2024
@jaystarshot jaystarshot mentioned this pull request Nov 1, 2024
25 tasks
@tdcmeehan tdcmeehan added the from:IBM PR from IBM label Dec 13, 2024
@prestodb-ci prestodb-ci requested review from a team, ShahimSharafudeen and wanglinsong and removed request for a team December 13, 2024 15:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:IBM PR from IBM iceberg Apache Iceberg related parquet parquet file format related

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

5 participants

Comments