diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java index 8d6b3d93e491..9be534ff19d8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java @@ -186,6 +186,7 @@ private record SerdeAndInputFormat(String serde, String inputFormat) {} .putAll(Arrays.stream(values()).collect( toMap(format -> new SerdeAndInputFormat(format.getSerde(), format.getInputFormat()), identity()))) .put(new SerdeAndInputFormat(PARQUET_HIVE_SERDE_CLASS, "parquet.hive.DeprecatedParquetInputFormat"), PARQUET) + .put(new SerdeAndInputFormat(PARQUET_HIVE_SERDE_CLASS, "org.apache.hadoop.mapred.TextInputFormat"), PARQUET) .put(new SerdeAndInputFormat(PARQUET_HIVE_SERDE_CLASS, "parquet.hive.MapredParquetInputFormat"), PARQUET) .buildOrThrow(); diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive/hive.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive/hive.properties index 25bfc1eabf97..6ed7f0f96bea 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive/hive.properties +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive/hive.properties @@ -2,6 +2,7 @@ connector.name=hive hive.metastore.uri=thrift://hadoop-master:9083 hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml hive.allow-drop-table=true +hive.non-managed-table-writes-enabled=true # Note: it's currently unclear why this one is needed, while also hive.orc.time-zone=UTC is not needed. hive.parquet.time-zone=UTC diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java index 3ad1eb66b8a8..4ef02f32f18b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java @@ -14,15 +14,20 @@ package io.trino.tests.product.hive; import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; import io.trino.tempto.ProductTest; +import io.trino.tempto.hadoop.hdfs.HdfsClient; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.IOException; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.List; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.tempto.assertions.QueryAssert.Row; import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; @@ -45,6 +50,9 @@ public class TestHiveSparkCompatibility // see spark-defaults.conf private static final String TRINO_CATALOG = "hive"; + @Inject + private HdfsClient hdfsClient; + @Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}, dataProvider = "testReadSparkCreatedTableDataProvider") public void testReadSparkCreatedTable(String sparkTableFormat, String expectedTrinoTableFormat) { @@ -353,7 +361,7 @@ public void testDeleteFailsOnBucketedTableCreatedBySpark() onSpark().executeQuery("DROP TABLE " + hiveTableName); } - private static final String[] HIVE_TIMESTAMP_PRECISIONS = new String[]{"MILLISECONDS", "MICROSECONDS", "NANOSECONDS"}; + private static final String[] HIVE_TIMESTAMP_PRECISIONS = new String[] {"MILLISECONDS", "MICROSECONDS", "NANOSECONDS"}; @DataProvider public static Object[][] sparkParquetTimestampFormats() @@ -364,10 +372,10 @@ public static Object[][] sparkParquetTimestampFormats() // Ordering of expected values matches the ordering in HIVE_TIMESTAMP_PRECISIONS return new Object[][] { - {"TIMESTAMP_MILLIS", millisTimestamp, new String[]{millisTimestamp, millisTimestamp, millisTimestamp}}, - {"TIMESTAMP_MICROS", microsTimestamp, new String[]{millisTimestamp, microsTimestamp, microsTimestamp}}, + {"TIMESTAMP_MILLIS", millisTimestamp, new String[] {millisTimestamp, millisTimestamp, millisTimestamp}}, + {"TIMESTAMP_MICROS", microsTimestamp, new String[] {millisTimestamp, microsTimestamp, microsTimestamp}}, // note that Spark timestamp has microsecond precision - {"INT96", nanosTimestamp, new String[]{millisTimestamp, microsTimestamp, microsTimestamp}}, + {"INT96", nanosTimestamp, new String[] {millisTimestamp, microsTimestamp, microsTimestamp}}, }; } @@ -567,6 +575,32 @@ public void testReadSparkdDateAndTimePartitionName() onTrino().executeQuery("DROP TABLE " + trinoTableName); } + @Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}) + public void testTextInputFormatWithParquetHiveSerDe() + throws IOException + { + String tableName = "test_text_input_format_with_parquet_hive_ser_de" + randomNameSuffix(); + onHive().executeQuery("" + + "CREATE EXTERNAL TABLE " + tableName + + "(col INT) " + + "ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' " + + "STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' " + + "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " + + "LOCATION '/tmp/" + tableName + "'"); + onSpark().executeQuery("INSERT INTO " + tableName + " VALUES(1)"); + assertThat(onSpark().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1)); + assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1)); + List files = hdfsClient.listDirectory("/tmp/" + tableName + "/"); + assertThat(files).hasSize(2); + assertThat(files.stream().filter(name -> !name.contains("SUCCESS")).collect(toImmutableList()).get(0)).endsWith("parquet"); + List result = new ArrayList<>(); + result.add(null); + assertThat(onHive().executeQuery("SELECT * FROM " + tableName)).containsOnly(new Row(result), new Row(result)); + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO " + tableName + " VALUES(2)")).hasMessageFindingMatch("Output format org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat with SerDe org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe is not supported"); + assertQueryFailure(() -> onHive().executeQuery("INSERT INTO " + tableName + " VALUES(2)")).hasMessageFindingMatch(".*Error while processing statement.*"); + onHive().executeQuery("DROP TABLE " + tableName); + } + @Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}, dataProvider = "unsupportedPartitionDates") public void testReadSparkInvalidDatePartitionName(String inputDate, java.sql.Date outputDate) {