-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Test all file formats in TestSparkCompatibility #6699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
2 changes: 2 additions & 0 deletions
2
.../docker/presto-product-tests/conf/environment/singlenode-spark-iceberg/iceberg.properties
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,4 @@ | ||
| connector.name=iceberg | ||
| hive.metastore.uri=thrift://hadoop-master:9083 | ||
| # TODO: Remove this config to test default read behavior once Spark writer is fixed. See https://github.com/trinodb/trino/issues/6369 for details | ||
| iceberg.use-file-size-from-metadata=false | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |||||
|
|
||||||
| import io.trino.tempto.ProductTest; | ||||||
| import io.trino.tempto.query.QueryResult; | ||||||
| import org.testng.annotations.DataProvider; | ||||||
| import org.testng.annotations.Test; | ||||||
|
|
||||||
| import java.sql.Date; | ||||||
|
|
@@ -42,10 +43,16 @@ public class TestSparkCompatibility | |||||
| private static final String SPARK_CATALOG = "iceberg_test"; | ||||||
| private static final String PRESTO_CATALOG = "iceberg"; | ||||||
|
|
||||||
| @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testPrestoReadingSparkData() | ||||||
| @DataProvider(name = "storage_formats") | ||||||
| public static Object[][] storageFormats() | ||||||
| { | ||||||
| String baseTableName = "test_presto_reading_primitive_types"; | ||||||
| return new String[][] {{"ORC"}, {"PARQUET"}}; | ||||||
| } | ||||||
|
|
||||||
| @Test(dataProvider = "storage_formats", groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testPrestoReadingSparkData(String format) | ||||||
| { | ||||||
| String baseTableName = "test_presto_reading_primitive_types_" + format; | ||||||
| String sparkTableName = sparkTableName(baseTableName); | ||||||
|
|
||||||
| String sparkTableDefinition = | ||||||
|
|
@@ -58,7 +65,7 @@ public void testPrestoReadingSparkData() | |||||
| ", _boolean BOOLEAN" + | ||||||
| ", _timestamp TIMESTAMP" + | ||||||
| ", _date DATE" + | ||||||
| ") USING ICEBERG"; | ||||||
| ") USING ICEBERG TBLPROPERTIES ('write.format.default' = '" + format + "')"; | ||||||
| onSpark().executeQuery(format(sparkTableDefinition, sparkTableName)); | ||||||
|
|
||||||
| String values = "VALUES (" + | ||||||
|
|
@@ -94,10 +101,10 @@ public void testPrestoReadingSparkData() | |||||
| onSpark().executeQuery("DROP TABLE " + sparkTableName); | ||||||
| } | ||||||
|
|
||||||
| @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testSparkReadingPrestoData() | ||||||
| @Test(dataProvider = "storage_formats", groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testSparkReadingPrestoData(String format) | ||||||
| { | ||||||
| String baseTableName = "test_spark_reading_primitive_types"; | ||||||
| String baseTableName = "test_spark_reading_primitive_types_" + format; | ||||||
| String prestoTableName = prestoTableName(baseTableName); | ||||||
|
|
||||||
| String prestoTableDefinition = | ||||||
|
|
@@ -110,7 +117,7 @@ public void testSparkReadingPrestoData() | |||||
| ", _boolean BOOLEAN" + | ||||||
| //", _timestamp TIMESTAMP" + | ||||||
| ", _date DATE" + | ||||||
| ") WITH (format = 'ORC')"; | ||||||
| ") WITH (format = '" + format + "')"; | ||||||
| onPresto().executeQuery(format(prestoTableDefinition, prestoTableName)); | ||||||
|
|
||||||
| String values = "VALUES (" + | ||||||
|
|
@@ -161,12 +168,12 @@ public void testPrestoCreatesSparkDrops() | |||||
| onSpark().executeQuery("DROP TABLE " + sparkTableName(baseTableName)); | ||||||
| } | ||||||
|
|
||||||
| @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testSparkReadsPrestoPartitionedTable() | ||||||
| @Test(dataProvider = "storage_formats", groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testSparkReadsPrestoPartitionedTable(String format) | ||||||
| { | ||||||
| String baseTableName = "test_spark_reads_presto_partitioned_table"; | ||||||
| String baseTableName = "test_spark_reads_presto_partitioned_table_" + format; | ||||||
| String prestoTableName = prestoTableName(baseTableName); | ||||||
| onPresto().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'])", prestoTableName)); | ||||||
| onPresto().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '" + format + "')", prestoTableName)); | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
sorry if it was confusing, meant a change like the above in #6699 (comment) |
||||||
| onPresto().executeQuery(format("INSERT INTO %s VALUES ('a', 1001), ('b', 1002), ('c', 1003)", prestoTableName)); | ||||||
|
|
||||||
| Row row = row("b", 1002); | ||||||
|
|
@@ -178,12 +185,13 @@ public void testSparkReadsPrestoPartitionedTable() | |||||
| onPresto().executeQuery("DROP TABLE " + prestoTableName); | ||||||
| } | ||||||
|
|
||||||
| @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testPrestoReadsSparkPartitionedTable() | ||||||
| @Test(dataProvider = "storage_formats", groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testPrestoReadsSparkPartitionedTable(String format) | ||||||
| { | ||||||
| String baseTableName = "test_spark_reads_presto_partitioned_table"; | ||||||
| String baseTableName = "test_presto_reads_spark_partitioned_table_" + format; | ||||||
| String sparkTableName = sparkTableName(baseTableName); | ||||||
| onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string)", sparkTableName)); | ||||||
| onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string)" + | ||||||
| "TBLPROPERTIES ('write.format.default' = '" + format + "')", sparkTableName)); | ||||||
| onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1001), ('b', 1002), ('c', 1003)", sparkTableName)); | ||||||
|
|
||||||
| Row row = row("b", 1002); | ||||||
|
|
@@ -196,10 +204,10 @@ public void testPrestoReadsSparkPartitionedTable() | |||||
| onSpark().executeQuery("DROP TABLE " + sparkTableName); | ||||||
| } | ||||||
|
|
||||||
| @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testPrestoReadingCompositeSparkData() | ||||||
| @Test(dataProvider = "storage_formats", groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testPrestoReadingCompositeSparkData(String format) | ||||||
| { | ||||||
| String baseTableName = "test_presto_reading_spark_composites"; | ||||||
| String baseTableName = "test_presto_reading_spark_composites_" + format; | ||||||
| String sparkTableName = sparkTableName(baseTableName); | ||||||
|
|
||||||
| String sparkTableDefinition = "" + | ||||||
|
|
@@ -208,7 +216,8 @@ public void testPrestoReadingCompositeSparkData() | |||||
| " info MAP<STRING, INT>,\n" + | ||||||
| " pets ARRAY<STRING>,\n" + | ||||||
| " user_info STRUCT<name:STRING, surname:STRING, age:INT, gender:STRING>)" + | ||||||
| " USING ICEBERG"; | ||||||
| " USING ICEBERG" + | ||||||
| " TBLPROPERTIES ('write.format.default' = '" + format + "')"; | ||||||
| onSpark().executeQuery(format(sparkTableDefinition, sparkTableName)); | ||||||
|
|
||||||
| String insert = "" + | ||||||
|
|
@@ -222,20 +231,23 @@ public void testPrestoReadingCompositeSparkData() | |||||
| QueryResult prestoResult = onPresto().executeQuery(prestoSelect); | ||||||
| Row row = row("Doc213", 28, "Cat", "Claus"); | ||||||
| assertThat(prestoResult).containsOnly(row); | ||||||
|
|
||||||
| onSpark().executeQuery("DROP TABLE " + sparkTableName); | ||||||
| } | ||||||
|
|
||||||
| @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testSparkReadingCompositePrestoData() | ||||||
| @Test(dataProvider = "storage_formats", groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testSparkReadingCompositePrestoData(String format) | ||||||
| { | ||||||
| String baseTableName = "test_spark_reading_presto_composites"; | ||||||
| String baseTableName = "test_spark_reading_presto_composites_" + format; | ||||||
| String prestoTableName = prestoTableName(baseTableName); | ||||||
|
|
||||||
| String prestoTableDefinition = "" + | ||||||
| "CREATE TABLE %s (" + | ||||||
| " doc_id VARCHAR,\n" + | ||||||
| " info MAP(VARCHAR, INTEGER),\n" + | ||||||
| " pets ARRAY(VARCHAR),\n" + | ||||||
| " user_info ROW(name VARCHAR, surname VARCHAR, age INTEGER, gender VARCHAR))"; | ||||||
| " user_info ROW(name VARCHAR, surname VARCHAR, age INTEGER, gender VARCHAR))" + | ||||||
| " WITH (format = '" + format + "')"; | ||||||
| onPresto().executeQuery(format(prestoTableDefinition, prestoTableName)); | ||||||
|
|
||||||
| String insert = "INSERT INTO %s VALUES('Doc213', MAP(ARRAY['age', 'children'], ARRAY[28, 3]), ARRAY['Dog', 'Cat', 'Pig'], ROW('Santa', 'Claus', 1000, 'MALE'))"; | ||||||
|
|
@@ -246,12 +258,14 @@ public void testSparkReadingCompositePrestoData() | |||||
| QueryResult sparkResult = onSpark().executeQuery(sparkSelect); | ||||||
| Row row = row("Doc213", 28, "Cat", "Claus"); | ||||||
| assertThat(sparkResult).containsOnly(row); | ||||||
|
|
||||||
| onPresto().executeQuery("DROP TABLE " + prestoTableName); | ||||||
| } | ||||||
|
|
||||||
| @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testPrestoReadingNestedSparkData() | ||||||
| @Test(dataProvider = "storage_formats", groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testPrestoReadingNestedSparkData(String format) | ||||||
| { | ||||||
| String baseTableName = "test_presto_reading_nested_spark_data"; | ||||||
| String baseTableName = "test_presto_reading_nested_spark_data_" + format; | ||||||
| String sparkTableName = sparkTableName(baseTableName); | ||||||
|
|
||||||
| String sparkTableDefinition = "" + | ||||||
|
|
@@ -260,7 +274,8 @@ public void testPrestoReadingNestedSparkData() | |||||
| ", nested_map MAP<STRING, ARRAY<STRUCT<sname: STRING, snumber: INT>>>\n" + | ||||||
| ", nested_array ARRAY<MAP<STRING, ARRAY<STRUCT<mname: STRING, mnumber: INT>>>>\n" + | ||||||
| ", nested_struct STRUCT<name:STRING, complicated: ARRAY<MAP<STRING, ARRAY<STRUCT<mname: STRING, mnumber: INT>>>>>)\n" + | ||||||
| " USING ICEBERG"; | ||||||
| " USING ICEBERG" + | ||||||
| " TBLPROPERTIES ('write.format.default' = '" + format + "')"; | ||||||
| onSpark().executeQuery(format(sparkTableDefinition, sparkTableName)); | ||||||
|
|
||||||
| String insert = "" + | ||||||
|
|
@@ -302,20 +317,23 @@ public void testPrestoReadingNestedSparkData() | |||||
|
|
||||||
| QueryResult prestoResult = onPresto().executeQuery(prestoSelect + prestoTableName); | ||||||
| assertThat(prestoResult).containsOnly(row); | ||||||
|
|
||||||
| onSpark().executeQuery("DROP TABLE " + sparkTableName); | ||||||
| } | ||||||
|
|
||||||
| @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testSparkReadingNestedPrestoData() | ||||||
| @Test(dataProvider = "storage_formats", groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | ||||||
| public void testSparkReadingNestedPrestoData(String format) | ||||||
| { | ||||||
| String baseTableName = "test_spark_reading_nested_presto_data"; | ||||||
| String baseTableName = "test_spark_reading_nested_presto_data_" + format; | ||||||
| String prestoTableName = prestoTableName(baseTableName); | ||||||
|
|
||||||
| String prestoTableDefinition = "" + | ||||||
| "CREATE TABLE %s (\n" + | ||||||
| " doc_id VARCHAR\n" + | ||||||
| ", nested_map MAP(VARCHAR, ARRAY(ROW(sname VARCHAR, snumber INT)))\n" + | ||||||
| ", nested_array ARRAY(MAP(VARCHAR, ARRAY(ROW(mname VARCHAR, mnumber INT))))\n" + | ||||||
| ", nested_struct ROW(name VARCHAR, complicated ARRAY(MAP(VARCHAR, ARRAY(ROW(mname VARCHAR, mnumber INT))))))"; | ||||||
| ", nested_struct ROW(name VARCHAR, complicated ARRAY(MAP(VARCHAR, ARRAY(ROW(mname VARCHAR, mnumber INT))))))" + | ||||||
| " WITH (format = '" + format + "')"; | ||||||
| onPresto().executeQuery(format(prestoTableDefinition, prestoTableName)); | ||||||
|
|
||||||
| String insert = "" + | ||||||
|
|
@@ -357,6 +375,8 @@ public void testSparkReadingNestedPrestoData() | |||||
|
|
||||||
| QueryResult sparkResult = onSpark().executeQuery(sparkSelect + sparkTableName); | ||||||
| assertThat(sparkResult).containsOnly(row); | ||||||
|
|
||||||
| onPresto().executeQuery("DROP TABLE " + prestoTableName); | ||||||
| } | ||||||
|
|
||||||
| private static String sparkTableName(String tableName) | ||||||
|
|
||||||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.