Enable writer scaling by default#10614
Conversation
6fa2dc1 to
d068d3c
Compare
d068d3c to
fd86491
Compare
463d3e9 to
848ff39
Compare
arhimondr
left a comment
There was a problem hiding this comment.
Here is my understanding of the most recent changes:
supportsReportingWrittenBytesreplaces a connector capabilities flag. We chose this approach to allow connector return different results for different tables.supportsReportingWrittenByteshas to acceptTableMetadata, asTableHandleis not available for tables that are about to be created (e.g.: CREATE TABLE AS SELECT statement)TableMetadatahas to be included inWriterTargetwhich must be JSON serializable. TheTableMetadataobject has to be JSON serializable itself.TableMetadataincludes property map that is currently excluded from serialization. The property map is technically designed to store information such as table format. If this information is lost during serializationsupportsReportingWrittenBytesmay not function as intended. However at this momentsupportsReportingWrittenBytesis invoked before the serialization round trip.
Relying on the fact that TableMetadata is not serialized / deserialized before making a supportsReportingWrittenBytes seems to be a little fragile. To make it less fragile we have to make sure the table properties stored in the TableMetadata are JSON serializable.
@radek-starburst, @martint How strongly do we feel about being able to return different results for different tables? Is there a real precedent when we would like to return different results in an existing connector? I'm just wondering whether it is worth messing with the table properties serialization to provide this capability?
core/trino-main/src/main/java/io/trino/metadata/TableMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ColumnMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorTableMetadata.java
Outdated
Show resolved
Hide resolved
thanks @arhimondr for listing the issues with metadata / TableMetadata / serialization here. Writer scaling is somewhat similar to (and currently exclusive with) writing layout.
@radek-starburst did you consider such approach? |
Firstly, thanks for you review :) Yes
Yes, exactly
Yes
No, it is not invoked before serialization round trip (not in tests). This object is serializable but not completelty serializable what means that just some fields will be null.
Yes, it is not so easy to make it completelty serializable because of
For me, I cannot imagine a such situation that we can have different results for different tables. |
848ff39 to
d13e1b4
Compare
@findepi , I did not. I could try to implement it as you suggest, but firstly, we would better wait for all to agree the common version. |
d13e1b4 to
dd68992
Compare
dd68992 to
2ef4539
Compare
There was a problem hiding this comment.
TableMetadata is a container object that describes the shape of a table (columns, properties, etc). Using it to identify a table is not appropriate, especially given that a caller can make up any TableMetadata it wants.
Tables should be identified either by name (catalog/schema/name) or by handle (i.e., and opaque identifier).
There was a problem hiding this comment.
@arhimondr @radek-starburst @martint
i happened to discuss this with @radek-starburst today
- for existing tables (INSERT) the
ConnectorTableHandleshould be presented to theConnectorMetadatato drive the decision.ConnectorTableHandleis the best info we can provide, so let's use it, - for new tables (CTAS) the
ConnectorTableMetadatashould be presented to the connector. It contains the best information possible. Showing onlypropertiesis not sufficient -- it allows connector to inspect eg partitioning, but eg doesn't allow to inspect types. - for UPDATE and DELETE nothing is needed since writer scaling is not applicable for these operations -- the operations happen within source split
- for TABLE EXECUTE -- ideally we should provide
ConnectorTableHandle+ConnectorTableExecuteHandle, since connector's decision may depend on the actual operation being executed. However, since we don't have such use-case today AND it is easy to backwards-compatibly add a new such API in the future, let's provideConnectorTableHandleonly as in INSERT case
2b925fd to
923eb04
Compare
arhimondr
left a comment
There was a problem hiding this comment.
LGTM % nits
We usually try to avoid (but not always do) having commits like Implement class A. Generally the goal should be for every commit to be self-sufficient and represent a meaningful self containing change.
I would recommend squashing the commits into a single one with a simple title Enable scaled_writers by default for supported connectors
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
209079b to
732a85d
Compare
Do you mean to squash all commits to the one? |
732a85d to
8f5bd98
Compare
1e0ae04 to
f355e69
Compare
arhimondr
left a comment
There was a problem hiding this comment.
Do you mean to squash all commits to the one?
Yes, they are all part of the same logical change
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java
Outdated
Show resolved
Hide resolved
b43d561 to
87a6587
Compare
87a6587 to
8032524
Compare
This PR enable scale writers by defult, adds supportsReportingWrittenBytes method to Metadata and implements a supportsReportingWrittenBytes method for Iceberg, DeltaLake and Hive connectors. Additionally, validator ValidateScaledWritersUsage was added to validate whether the SCALED_WRITER_DISTRIBUTION partition scheme should be used actually. Adding tests for this feature and fix some other tests. The test for validator was added too.
8032524 to
2e96713
Compare
|
I think we need a release notes entry for this. Any suggestions? |
|
First cut of docs update .. #12261 |
|
| .setSystemProperty(TASK_WRITER_COUNT, "2") | ||
| .build(); | ||
| getQueryRunner().execute(session, format("CREATE TABLE IF NOT EXISTS %s AS SELECT * FROM %s", "linetime_multiple_file_backed", "tpch.tiny.lineitem")).getMaterializedRows(); | ||
| getQueryRunner().execute(session, format("CREATE TABLE IF NOT EXISTS %s AS SELECT * FROM %s", "orders_multiple_file_backed", "tpch.tiny.orders")).getMaterializedRows(); |
| // We need to prepare tables for this test. The test is required to use tables that are backed by at lest two files | ||
| Session session = Session.builder(getSession()) | ||
| .setSystemProperty(TASK_WRITER_COUNT, "2") |
There was a problem hiding this comment.
I don't think writer count >= 2 guarantees number of files >= 2.
It still may be possible that all data ends up on a single machine and single thread (just not very likely).
Explicit partitioning, or using small target-file-size can guarantee 2+ files.
Also, if the test relies on # of files, it should explicitly validate the state of the table, eg via SELECT count(DISTINCT "$path").
cc @alexjo2144
There was a problem hiding this comment.
I did not know that. I was thinking that it depends on number of writers. I am going to analyze it more deeply.
supportsReportingWrittenBytesmethod toMetadatainterfacesupportsReportingWrittenBytesmethod for Iceberg and Hive connectors