diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java index 0b60d1a2b613..1c3828e7daa7 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java @@ -136,6 +136,7 @@ public void before() { spark.conf().set("hive.exec.dynamic.partition", "true"); spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict"); + spark.conf().set("spark.sql.parquet.writeLegacyFormat", false); spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName)); List expected = Lists.newArrayList( @@ -570,6 +571,137 @@ public void schemaEvolutionTestWithSparkSQL() throws Exception { assertEquals("Output must match", expectedAfterAddColumn, afterMigarteAfterAddResults); } + @Test + public void testHiveStyleThreeLevelList() throws Exception { + threeLevelList(true); + } + + @Test + public void testThreeLevelList() throws Exception { + threeLevelList(false); + } + + @Test + public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception { + threeLevelListWithNestedStruct(true); + } + + @Test + public void testThreeLevelListWithNestedStruct() throws Exception { + threeLevelListWithNestedStruct(false); + } + + @Test + public void testHiveStyleThreeLevelLists() throws Exception { + threeLevelLists(true); + } + + @Test + public void testThreeLevelLists() throws Exception { + threeLevelLists(false); + } + + @Test + public void testHiveStyleStructOfThreeLevelLists() throws Exception { + structOfThreeLevelLists(true); + } + + @Test + public void testStructOfThreeLevelLists() throws Exception { + structOfThreeLevelLists(false); + } + + public void threeLevelList(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); + + String tableName = sourceName(String.format("threeLevelList_%s", useLegacyMode)); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 ARRAY>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue = 12345; + sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + private void threeLevelListWithNestedStruct(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); + + String tableName = sourceName(String.format("threeLevelListWithNestedStruct_%s", useLegacyMode)); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 ARRAY>>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue = 12345; + sql("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + private void threeLevelLists(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); + + String tableName = sourceName(String.format("threeLevelLists_%s", useLegacyMode)); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 ARRAY>, col3 ARRAY>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue1 = 12345; + int testValue2 = 987654; + sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))", + tableName, testValue1, testValue2); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + private void structOfThreeLevelLists(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); + + String tableName = sourceName(String.format("structOfThreeLevelLists_%s", useLegacyMode)); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 STRUCT>>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue1 = 12345; + sql("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))", + tableName, testValue1); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + private SparkTable loadTable(String name) throws NoSuchTableException, ParseException { return (SparkTable) catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier()); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java index 0b60d1a2b613..1c3828e7daa7 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java @@ -136,6 +136,7 @@ public void before() { spark.conf().set("hive.exec.dynamic.partition", "true"); spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict"); + spark.conf().set("spark.sql.parquet.writeLegacyFormat", false); spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName)); List expected = Lists.newArrayList( @@ -570,6 +571,137 @@ public void schemaEvolutionTestWithSparkSQL() throws Exception { assertEquals("Output must match", expectedAfterAddColumn, afterMigarteAfterAddResults); } + @Test + public void testHiveStyleThreeLevelList() throws Exception { + threeLevelList(true); + } + + @Test + public void testThreeLevelList() throws Exception { + threeLevelList(false); + } + + @Test + public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception { + threeLevelListWithNestedStruct(true); + } + + @Test + public void testThreeLevelListWithNestedStruct() throws Exception { + threeLevelListWithNestedStruct(false); + } + + @Test + public void testHiveStyleThreeLevelLists() throws Exception { + threeLevelLists(true); + } + + @Test + public void testThreeLevelLists() throws Exception { + threeLevelLists(false); + } + + @Test + public void testHiveStyleStructOfThreeLevelLists() throws Exception { + structOfThreeLevelLists(true); + } + + @Test + public void testStructOfThreeLevelLists() throws Exception { + structOfThreeLevelLists(false); + } + + public void threeLevelList(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); + + String tableName = sourceName(String.format("threeLevelList_%s", useLegacyMode)); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 ARRAY>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue = 12345; + sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + private void threeLevelListWithNestedStruct(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); + + String tableName = sourceName(String.format("threeLevelListWithNestedStruct_%s", useLegacyMode)); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 ARRAY>>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue = 12345; + sql("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + private void threeLevelLists(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); + + String tableName = sourceName(String.format("threeLevelLists_%s", useLegacyMode)); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 ARRAY>, col3 ARRAY>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue1 = 12345; + int testValue2 = 987654; + sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))", + tableName, testValue1, testValue2); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + private void structOfThreeLevelLists(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); + + String tableName = sourceName(String.format("structOfThreeLevelLists_%s", useLegacyMode)); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 STRUCT>>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue1 = 12345; + sql("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))", + tableName, testValue1); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + private SparkTable loadTable(String name) throws NoSuchTableException, ParseException { return (SparkTable) catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier());