Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable;
import static io.trino.plugin.hive.util.HiveUtil.isSparkBucketedTable;
import static io.trino.plugin.hive.util.HiveUtil.toPartitionValues;
import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported;
import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable;
Expand Down Expand Up @@ -1624,6 +1625,9 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
if (!autoCommit) {
throw new TrinoException(NOT_SUPPORTED, "Updating transactional tables is not supported in explicit transactions (use autocommit mode)");
}
if (isSparkBucketedTable(table)) {
throw new TrinoException(NOT_SUPPORTED, "Updating Spark bucketed tables is not supported");
}

// Verify that none of the updated columns are partition columns or bucket columns

Expand Down Expand Up @@ -1724,6 +1728,10 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl
if (isTransactional && !autoCommit) {
throw new TrinoException(NOT_SUPPORTED, "Inserting into Hive transactional tables is not supported in explicit transactions (use autocommit mode)");
}
if (isSparkBucketedTable(table)) {
throw new TrinoException(NOT_SUPPORTED, "Inserting into Spark bucketed tables is not supported");
}

List<HiveColumnHandle> handles = hiveColumnHandles(table, typeManager, getTimestampPrecision(session)).stream()
.filter(columnHandle -> !columnHandle.isHidden())
.collect(toImmutableList());
Expand Down Expand Up @@ -2429,6 +2437,9 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
if (!autoCommit) {
throw new TrinoException(NOT_SUPPORTED, "Deleting from Hive transactional tables is not supported in explicit transactions (use autocommit mode)");
}
if (isSparkBucketedTable(table)) {
throw new TrinoException(NOT_SUPPORTED, "Deleting from Spark bucketed tables is not supported");
}

LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public final class HiveUtil
{
public static final String SPARK_TABLE_PROVIDER_KEY = "spark.sql.sources.provider";
public static final String DELTA_LAKE_PROVIDER = "delta";
public static final String SPARK_TABLE_BUCKET_NUMBER_KEY = "spark.sql.sources.schema.numBuckets";

public static final String ICEBERG_TABLE_TYPE_NAME = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
Expand Down Expand Up @@ -1128,4 +1129,10 @@ public static boolean isIcebergTable(Table table)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(ICEBERG_TABLE_TYPE_NAME));
}

public static boolean isSparkBucketedTable(Table table)
{
return table.getParameters().containsKey(SPARK_TABLE_PROVIDER_KEY)
&& table.getParameters().containsKey(SPARK_TABLE_BUCKET_NUMBER_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,54 @@ public void testSparkParquetTimestampCompatibility(String sparkTimestampFormat,
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS})
public void testInsertFailsOnBucketedTableCreatedBySpark()
{
String hiveTableName = "spark_insert_bucketed_table_" + randomTableSuffix();

onSpark().executeQuery(
"CREATE TABLE default." + hiveTableName + "(a_key integer, a_value integer) " +
"USING PARQUET " +
"CLUSTERED BY (a_key) INTO 3 BUCKETS");

assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO default." + hiveTableName + " VALUES (1, 100)"))
.hasMessageContaining("Inserting into Spark bucketed tables is not supported");

onSpark().executeQuery("DROP TABLE " + hiveTableName);
}

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS})
public void testUpdateFailsOnBucketedTableCreatedBySpark()
{
String hiveTableName = "spark_update_bucketed_table_" + randomTableSuffix();

onSpark().executeQuery(
"CREATE TABLE default." + hiveTableName + "(a_key integer, a_value integer) " +
"USING ORC " +
"CLUSTERED BY (a_key) INTO 3 BUCKETS");

assertQueryFailure(() -> onTrino().executeQuery("UPDATE default." + hiveTableName + " SET a_value = 100 WHERE a_key = 1"))
.hasMessageContaining("Updating Spark bucketed tables is not supported");

onSpark().executeQuery("DROP TABLE " + hiveTableName);
}

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS})
public void testDeleteFailsOnBucketedTableCreatedBySpark()
{
String hiveTableName = "spark_delete_bucketed_table_" + randomTableSuffix();

onSpark().executeQuery(
"CREATE TABLE default." + hiveTableName + "(a_key integer, a_value integer) " +
"USING ORC " +
"CLUSTERED BY (a_key) INTO 3 BUCKETS");

assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM default." + hiveTableName + " WHERE a_key = 1"))
.hasMessageContaining("Deleting from Spark bucketed tables is not supported");

onSpark().executeQuery("DROP TABLE " + hiveTableName);
}

private static final String[] HIVE_TIMESTAMP_PRECISIONS = new String[]{"MILLISECONDS", "MICROSECONDS", "NANOSECONDS"};

@DataProvider
Expand Down