diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index d315f4200521..31a1da4551ae 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -268,6 +268,7 @@ import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; +import static io.trino.plugin.hive.util.HiveUtil.isSparkBucketedTable; import static io.trino.plugin.hive.util.HiveUtil.toPartitionValues; import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported; import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable; @@ -1624,6 +1625,9 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable if (!autoCommit) { throw new TrinoException(NOT_SUPPORTED, "Updating transactional tables is not supported in explicit transactions (use autocommit mode)"); } + if (isSparkBucketedTable(table)) { + throw new TrinoException(NOT_SUPPORTED, "Updating Spark bucketed tables is not supported"); + } // Verify that none of the updated columns are partition columns or bucket columns @@ -1724,6 +1728,10 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl if (isTransactional && !autoCommit) { throw new TrinoException(NOT_SUPPORTED, "Inserting into Hive transactional tables is not supported in explicit transactions (use autocommit mode)"); } + if (isSparkBucketedTable(table)) { + throw new TrinoException(NOT_SUPPORTED, "Inserting into Spark bucketed tables is not supported"); + } + List handles = hiveColumnHandles(table, typeManager, getTimestampPrecision(session)).stream() .filter(columnHandle -> !columnHandle.isHidden()) .collect(toImmutableList()); @@ -2429,6 +2437,9 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable if (!autoCommit) { throw new TrinoException(NOT_SUPPORTED, "Deleting from Hive transactional tables is not supported in explicit transactions (use autocommit mode)"); } + if (isSparkBucketedTable(table)) { + throw new TrinoException(NOT_SUPPORTED, "Deleting from Spark bucketed tables is not supported"); + } LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java index 9486f51f3d8d..1276231d5763 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java @@ -177,6 +177,7 @@ public final class HiveUtil { public static final String SPARK_TABLE_PROVIDER_KEY = "spark.sql.sources.provider"; public static final String DELTA_LAKE_PROVIDER = "delta"; + public static final String SPARK_TABLE_BUCKET_NUMBER_KEY = "spark.sql.sources.schema.numBuckets"; public static final String ICEBERG_TABLE_TYPE_NAME = "table_type"; public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg"; @@ -1128,4 +1129,10 @@ public static boolean isIcebergTable(Table table) { return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(ICEBERG_TABLE_TYPE_NAME)); } + + public static boolean isSparkBucketedTable(Table table) + { + return table.getParameters().containsKey(SPARK_TABLE_PROVIDER_KEY) + && table.getParameters().containsKey(SPARK_TABLE_BUCKET_NUMBER_KEY); + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java index 410e7405803b..6c32d34ab2cb 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java @@ -213,6 +213,54 @@ public void testSparkParquetTimestampCompatibility(String sparkTimestampFormat, onSpark().executeQuery("DROP TABLE " + sparkTableName); } + @Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}) + public void testInsertFailsOnBucketedTableCreatedBySpark() + { + String hiveTableName = "spark_insert_bucketed_table_" + randomTableSuffix(); + + onSpark().executeQuery( + "CREATE TABLE default." + hiveTableName + "(a_key integer, a_value integer) " + + "USING PARQUET " + + "CLUSTERED BY (a_key) INTO 3 BUCKETS"); + + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO default." + hiveTableName + " VALUES (1, 100)")) + .hasMessageContaining("Inserting into Spark bucketed tables is not supported"); + + onSpark().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}) + public void testUpdateFailsOnBucketedTableCreatedBySpark() + { + String hiveTableName = "spark_update_bucketed_table_" + randomTableSuffix(); + + onSpark().executeQuery( + "CREATE TABLE default." + hiveTableName + "(a_key integer, a_value integer) " + + "USING ORC " + + "CLUSTERED BY (a_key) INTO 3 BUCKETS"); + + assertQueryFailure(() -> onTrino().executeQuery("UPDATE default." + hiveTableName + " SET a_value = 100 WHERE a_key = 1")) + .hasMessageContaining("Updating Spark bucketed tables is not supported"); + + onSpark().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}) + public void testDeleteFailsOnBucketedTableCreatedBySpark() + { + String hiveTableName = "spark_delete_bucketed_table_" + randomTableSuffix(); + + onSpark().executeQuery( + "CREATE TABLE default." + hiveTableName + "(a_key integer, a_value integer) " + + "USING ORC " + + "CLUSTERED BY (a_key) INTO 3 BUCKETS"); + + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM default." + hiveTableName + " WHERE a_key = 1")) + .hasMessageContaining("Deleting from Spark bucketed tables is not supported"); + + onSpark().executeQuery("DROP TABLE " + hiveTableName); + } + private static final String[] HIVE_TIMESTAMP_PRECISIONS = new String[]{"MILLISECONDS", "MICROSECONDS", "NANOSECONDS"}; @DataProvider