Support id column mapping for Delta Lake#13678
Conversation
e26384a to
f9ec2dc
Compare
f9ec2dc to
6c5a125
Compare
...elta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java
Outdated
Show resolved
Hide resolved
...elta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
...elta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java
Outdated
Show resolved
Hide resolved
...uct-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
Outdated
Show resolved
Hide resolved
|
@findinpath Addressed comments. |
plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
Outdated
Show resolved
Hide resolved
...elta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java
Outdated
Show resolved
Hide resolved
...uct-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java
Outdated
Show resolved
Hide resolved
...elta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
Outdated
Show resolved
Hide resolved
041a562 to
f3171df
Compare
The job is suspended #13703
f3171df to
0443023
Compare
alexjo2144
left a comment
There was a problem hiding this comment.
Just skimming so far, but had one high level question.
Here we're passing the idea of column ids all the way through to the parquet reader. Would it be easier to leave the reader alone and use the existing index based approach? We could read the schema when setting up the reader and use the ids there to figure out the indexes in the Parquet file.
Does that make sense?
lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java
Outdated
Show resolved
Hide resolved
...n/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeUpdatablePageSource.java
Outdated
Show resolved
Hide resolved
Let me try this approach. |
0443023 to
6011e46
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java
Outdated
Show resolved
Hide resolved
...in/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java
Outdated
Show resolved
Hide resolved
6011e46 to
bc162ab
Compare
|
Addressed comments. |
bc162ab to
b8a5b64
Compare
|
One last thing, but besides that looks good to me. You don't have to store the column mapping mode in every split, since it is a table level property it can't change from file to file. Instead, the PageSourceProvider has access to the DeltaLakeTableHandle, you can get it from there |
b8a5b64 to
8fc14ab
Compare
|
Thanks, updated. |
...in/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java
Outdated
Show resolved
Hide resolved
2494f72 to
95aa880
Compare
...in/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java
Outdated
Show resolved
Hide resolved
| switch (columnMapping) { | ||
| case ID: | ||
| Integer fieldId = deltaLakeColumnHandle.getFieldId().orElseThrow(() -> new IllegalArgumentException("Field ID must exist")); | ||
| String fieldName = requireNonNull(fieldIdToName.get(fieldId), "Field name is null"); |
There was a problem hiding this comment.
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
public void testColumnMappingModeNameIdAddColumn()
{
String tableName = "test_dl_column_mapping_mode_name_" + randomTableSuffix();
onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
" (a_number INT, a_varchar STRING)" +
" USING delta " +
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
" TBLPROPERTIES (" +
" 'delta.columnMapping.mode'='id'," +
" 'delta.minReaderVersion'='2'," +
" 'delta.minWriterVersion'='5')");
try {
onDelta().executeQuery("" +
"INSERT INTO default." + tableName + " VALUES " +
"(1, 'ala'), " +
"(2, 'bala')");
List<Row> expectedRows = ImmutableList.of(
row(1, "ala"),
row(2, "bala"));
assertThat(onDelta().executeQuery("SELECT a_number, a_varchar FROM default." + tableName))
.containsOnly(expectedRows);
assertThat(onTrino().executeQuery("SELECT a_number, a_varchar FROM delta.default." + tableName))
.containsOnly(expectedRows);
// Verify the connector can read renamed columns correctly
//onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN a_number TO new_a_number");
onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN another_varchar STRING");
assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
.containsOnly(ImmutableList.of(
row("a_number", "integer", "", ""),
row("a_varchar", "varchar", "", ""),
row("another_varchar", "varchar", "", "")));
onDelta().executeQuery("INSERT INTO default." + tableName + "(a_number, a_varchar, another_varchar) VALUES (3, 'porto', 'cala')");
expectedRows = ImmutableList.of(
row(1, "ala", null),
row(2, "bala", null),
row(3, "porto", "cala"));
assertThat(onDelta().executeQuery("SELECT a_number, a_varchar, another_varchar FROM default." + tableName))
.containsOnly(expectedRows);
assertThat(onTrino().executeQuery("SELECT a_number, a_varchar, another_varchar FROM delta.default." + tableName))
.containsOnly(expectedRows);
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}
This product test in TestDeltaLakeColumnMappingMode fails because the newly added column is not found in the parquetFieldIdToName map (because it didn't exist at the time when the file got created through the initial inserts).
findepi
left a comment
There was a problem hiding this comment.
Skimming.
Thank you @alexjo2144 @findinpath for your reviews!
95aa880 to
79703a5
Compare
|
CI hit #12818 |
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java
Outdated
Show resolved
Hide resolved
| assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) | ||
| .containsOnly(expectedRows); | ||
| assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) | ||
| .containsOnly(expectedRows); |
There was a problem hiding this comment.
While testing, I added the following snippet on your test case:
onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN another_varchar");
assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
.containsOnly(row("a_number", "integer", "", ""));
expectedRows = ImmutableList.of(row(1), row(2), row(3));
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(expectedRows);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(expectedRows);
Since we are testing on how to the column mapping acts when adding a column we may as well verify how it acts when dropping a column.
I stumbled into an unrelated issue:
| 2022-09-07 20:21:44 INFO: FAILURE / io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode.testColumnMappingModeIdAddColumn (Groups: profile_specific_tests, delta-lake-exclude-91, delta-lake-databricks, delta-lake-exclude-73) took 25.6 seconds
tests | 2022-09-07 20:21:44 SEVERE: Failure cause:
tests | io.trino.tempto.query.QueryExecutionException: java.sql.SQLException: [Databricks][DatabricksJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: org.apache.hive.service.cli.HiveSQLException: Error running query: java.lang.UnsupportedOperationException: Unrecognized column change class org.apache.spark.sql.connector.catalog.TableChange$DeleteColumn. You may be running an out of date Delta Lake version.
tests | at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:53)
tests | at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:435)
tests | at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:257)
tests | at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
tests | at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties(ThriftLocalProperties.scala:123)
tests | at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties$(ThriftLocalProperties.scala:48)
tests | at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:52)
tests | at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:235)
tests | at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:220)
tests | at java.security.AccessController.doPrivileged(Native Method)
tests | at javax.security.auth.Subject.doAs(Subject.java:422)
tests | at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
tests | at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:269)
tests | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
tests | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
tests | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
tests | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
tests | at java.lang.Thread.run(Thread.java:748)
tests | Caused by: java.lang.UnsupportedOperationException: Unrecognized column change class org.apache.spark.sql.connector.catalog.TableChange$DeleteColumn. You may be running an out of date Delta Lake version.
Will ask about it on Databricks slack
https://delta-users.slack.com/archives/CGK79PLV6/p1662561719425569
There was a problem hiding this comment.
You need to be running Databricks 11 for DROP COLUMN https://docs.databricks.com/release-notes/runtime/11.0.html#support-for-dropping-columns-in-delta-tables-public-preview
There was a problem hiding this comment.
Indeed. Thanks @alexjo2144 . Testing with Databricks 11 the scenario evoked earlier proved to be successful.
|
|
||
| return new DeltaLakePageSource( | ||
| deltaLakeColumns, | ||
| nullColumnNames.build(), |
There was a problem hiding this comment.
I think you could get by without passing the null column names all the way through. The ParquetPageSource should already return nulls for columns whose name doesn't show up in the file. Maybe if the column is missing from the file you could give it a name like missing_column_<uuid>?
I'm not sure if this is cleaner though.
There was a problem hiding this comment.
Actually, I tried the same approach first and settled on the current change because I felt passing dummy names is redundant and the ParquetPageSource behavior may change in the future. I don't stick to the current approach though.
@findepi Do you have any opinion?
There was a problem hiding this comment.
I am not being able to fully weight the consequences, but i agree that dummy names look weird.
Would would be the benefit of doing so?
There was a problem hiding this comment.
It gets rid of the changes in DeltaLakePageSource. That's about it
There was a problem hiding this comment.
I don't really have a preference, happy to merge as is
79703a5 to
90b911e
Compare
|
Rebased on upstream to resolve conflicts. |
90b911e to
32bce58
Compare
|
Rebased on upstream to resolve conflicts. |
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java
Outdated
Show resolved
Hide resolved
32bce58 to
2cf1a9d
Compare
|
Addressed all comments. |
|
CI hit #13199 in |
2cf1a9d to
16e0bcc
Compare
|
Rebased on upstream to resolve conflicts. |
Also, extract a method to verify supported column mapping and make DeltaLakePageSourceProvider.getParquetTupleDomain public.
16e0bcc to
834b469
Compare
|
|
||
| private final String name; | ||
| private final Type type; | ||
| private final OptionalInt fieldId; |
Description
Fixes #13629
Documentation
(x) No documentation is needed.
Release notes
(x) Release notes entries required with the following suggested text: