Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ private record SerdeAndInputFormat(String serde, String inputFormat) {}
.putAll(Arrays.stream(values()).collect(
toMap(format -> new SerdeAndInputFormat(format.getSerde(), format.getInputFormat()), identity())))
.put(new SerdeAndInputFormat(PARQUET_HIVE_SERDE_CLASS, "parquet.hive.DeprecatedParquetInputFormat"), PARQUET)
.put(new SerdeAndInputFormat(PARQUET_HIVE_SERDE_CLASS, "org.apache.hadoop.mapred.TextInputFormat"), PARQUET)
.put(new SerdeAndInputFormat(PARQUET_HIVE_SERDE_CLASS, "parquet.hive.MapredParquetInputFormat"), PARQUET)
.buildOrThrow();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ connector.name=hive
hive.metastore.uri=thrift://hadoop-master:9083
hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml
hive.allow-drop-table=true
hive.non-managed-table-writes-enabled=true

# Note: it's currently unclear why this one is needed, while also hive.orc.time-zone=UTC is not needed.
hive.parquet.time-zone=UTC
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@
package io.trino.tests.product.hive;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.tempto.ProductTest;
import io.trino.tempto.hadoop.hdfs.HdfsClient;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.tempto.assertions.QueryAssert.Row;
import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
Expand All @@ -45,6 +50,9 @@ public class TestHiveSparkCompatibility
// see spark-defaults.conf
private static final String TRINO_CATALOG = "hive";

@Inject
private HdfsClient hdfsClient;

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}, dataProvider = "testReadSparkCreatedTableDataProvider")
public void testReadSparkCreatedTable(String sparkTableFormat, String expectedTrinoTableFormat)
{
Expand Down Expand Up @@ -353,7 +361,7 @@ public void testDeleteFailsOnBucketedTableCreatedBySpark()
onSpark().executeQuery("DROP TABLE " + hiveTableName);
}

private static final String[] HIVE_TIMESTAMP_PRECISIONS = new String[]{"MILLISECONDS", "MICROSECONDS", "NANOSECONDS"};
private static final String[] HIVE_TIMESTAMP_PRECISIONS = new String[] {"MILLISECONDS", "MICROSECONDS", "NANOSECONDS"};

@DataProvider
public static Object[][] sparkParquetTimestampFormats()
Expand All @@ -364,10 +372,10 @@ public static Object[][] sparkParquetTimestampFormats()

// Ordering of expected values matches the ordering in HIVE_TIMESTAMP_PRECISIONS
return new Object[][] {
{"TIMESTAMP_MILLIS", millisTimestamp, new String[]{millisTimestamp, millisTimestamp, millisTimestamp}},
{"TIMESTAMP_MICROS", microsTimestamp, new String[]{millisTimestamp, microsTimestamp, microsTimestamp}},
{"TIMESTAMP_MILLIS", millisTimestamp, new String[] {millisTimestamp, millisTimestamp, millisTimestamp}},
{"TIMESTAMP_MICROS", microsTimestamp, new String[] {millisTimestamp, microsTimestamp, microsTimestamp}},
// note that Spark timestamp has microsecond precision
{"INT96", nanosTimestamp, new String[]{millisTimestamp, microsTimestamp, microsTimestamp}},
{"INT96", nanosTimestamp, new String[] {millisTimestamp, microsTimestamp, microsTimestamp}},
};
}

Expand Down Expand Up @@ -567,6 +575,32 @@ public void testReadSparkdDateAndTimePartitionName()
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS})
public void testTextInputFormatWithParquetHiveSerDe()
throws IOException
{
String tableName = "test_text_input_format_with_parquet_hive_ser_de" + randomNameSuffix();
onHive().executeQuery("" +
"CREATE EXTERNAL TABLE " + tableName +
"(col INT) " +
"ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' " +
"STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' " +
"OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " +
"LOCATION '/tmp/" + tableName + "'");
onSpark().executeQuery("INSERT INTO " + tableName + " VALUES(1)");
assertThat(onSpark().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1));
assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1));
List<String> files = hdfsClient.listDirectory("/tmp/" + tableName + "/");
assertThat(files).hasSize(2);
assertThat(files.stream().filter(name -> !name.contains("SUCCESS")).collect(toImmutableList()).get(0)).endsWith("parquet");
List<Object> result = new ArrayList<>();
result.add(null);
assertThat(onHive().executeQuery("SELECT * FROM " + tableName)).containsOnly(new Row(result), new Row(result));
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO " + tableName + " VALUES(2)")).hasMessageFindingMatch("Output format org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat with SerDe org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe is not supported");
assertQueryFailure(() -> onHive().executeQuery("INSERT INTO " + tableName + " VALUES(2)")).hasMessageFindingMatch(".*Error while processing statement.*");
onHive().executeQuery("DROP TABLE " + tableName);
}

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}, dataProvider = "unsupportedPartitionDates")
public void testReadSparkInvalidDatePartitionName(String inputDate, java.sql.Date outputDate)
{
Expand Down