Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void before() {

spark.conf().set("hive.exec.dynamic.partition", "true");
spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
spark.conf().set("spark.sql.parquet.writeLegacyFormat", false);
spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));

List<SimpleRecord> expected = Lists.newArrayList(
Expand Down Expand Up @@ -570,6 +571,137 @@ public void schemaEvolutionTestWithSparkSQL() throws Exception {
assertEquals("Output must match", expectedAfterAddColumn, afterMigarteAfterAddResults);
}

@Test
public void testHiveStyleThreeLevelList() throws Exception {
threeLevelList(true);
}

@Test
public void testThreeLevelList() throws Exception {
threeLevelList(false);
}

@Test
public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(true);
}

@Test
public void testThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(false);
}

@Test
public void testHiveStyleThreeLevelLists() throws Exception {
threeLevelLists(true);
}

@Test
public void testThreeLevelLists() throws Exception {
threeLevelLists(false);
}

@Test
public void testHiveStyleStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(true);
}

@Test
public void testStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(false);
}

public void threeLevelList(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelList_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelListWithNestedStruct(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelListWithNestedStruct_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: STRUCT<col3: INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: INT>>, col3 ARRAY<STRUCT<col4: INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
int testValue2 = 987654;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))",
tableName, testValue1, testValue2);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void structOfThreeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("structOfThreeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 STRUCT<col2: ARRAY<STRUCT<col3: INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
sql("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))",
tableName, testValue1);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}


private SparkTable loadTable(String name) throws NoSuchTableException, ParseException {
return (SparkTable) catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void before() {

spark.conf().set("hive.exec.dynamic.partition", "true");
spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
spark.conf().set("spark.sql.parquet.writeLegacyFormat", false);
spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));

List<SimpleRecord> expected = Lists.newArrayList(
Expand Down Expand Up @@ -570,6 +571,137 @@ public void schemaEvolutionTestWithSparkSQL() throws Exception {
assertEquals("Output must match", expectedAfterAddColumn, afterMigarteAfterAddResults);
}

@Test
public void testHiveStyleThreeLevelList() throws Exception {
threeLevelList(true);
}

@Test
public void testThreeLevelList() throws Exception {
threeLevelList(false);
}

@Test
public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(true);
}

@Test
public void testThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(false);
}

@Test
public void testHiveStyleThreeLevelLists() throws Exception {
threeLevelLists(true);
}

@Test
public void testThreeLevelLists() throws Exception {
threeLevelLists(false);
}

@Test
public void testHiveStyleStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(true);
}

@Test
public void testStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(false);
}

public void threeLevelList(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelList_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelListWithNestedStruct(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelListWithNestedStruct_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: STRUCT<col3: INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: INT>>, col3 ARRAY<STRUCT<col4: INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
int testValue2 = 987654;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))",
tableName, testValue1, testValue2);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void structOfThreeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("structOfThreeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 STRUCT<col2: ARRAY<STRUCT<col3: INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
sql("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))",
tableName, testValue1);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}


private SparkTable loadTable(String name) throws NoSuchTableException, ParseException {
return (SparkTable) catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier());
Expand Down