diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergNodePartitioningProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergNodePartitioningProvider.java index fbae3e9c2b9e..5cf409f13b59 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergNodePartitioningProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergNodePartitioningProvider.java @@ -13,7 +13,9 @@ */ package io.trino.plugin.iceberg; +import io.trino.spi.NodeManager; import io.trino.spi.connector.BucketFunction; +import io.trino.spi.connector.ConnectorBucketNodeMap; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorSession; @@ -26,22 +28,36 @@ import javax.inject.Inject; import java.util.List; +import java.util.Optional; import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; +import static io.trino.spi.connector.ConnectorBucketNodeMap.createBucketNodeMap; import static java.util.Objects.requireNonNull; public class IcebergNodePartitioningProvider implements ConnectorNodePartitioningProvider { + private final NodeManager nodeManager; private final TypeOperators typeOperators; @Inject - public IcebergNodePartitioningProvider(TypeManager typeManager) + public IcebergNodePartitioningProvider(NodeManager nodeManager, TypeManager typeManager) { + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.typeOperators = requireNonNull(typeManager, "typeManager is null").getTypeOperators(); } + @Override + public Optional getBucketNodeMapping(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle) + { + if (partitioningHandle instanceof IcebergUpdateHandle) { + return Optional.empty(); + } + + return Optional.of(createBucketNodeMap(nodeManager.getRequiredWorkerNodes().size())); + } + @Override public BucketFunction getBucketFunction( ConnectorTransactionHandle transactionHandle, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 3d4e6b19479a..319062c0ba48 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -5314,6 +5314,24 @@ public void testInsertingIntoTablesWithColumnsWithQuotesInName() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testInsertIntoBucketedColumnWhenTaskWriterCountIsGreaterThanOrEqualToNodeCount() + { + int taskWriterCount = 4; + assertThat(taskWriterCount).isGreaterThanOrEqualTo(getQueryRunner().getNodeCount()); + Session session = Session.builder(getSession()) + .setSystemProperty("task_writer_count", String.valueOf(taskWriterCount)) + .build(); + + String tableName = "test_inserting_into_bucketed_column_when_task_writer_count_is_greater_than_or_equal_to_node_count_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (bucketed_col INT) WITH (partitioning = ARRAY['bucket(bucketed_col, 10)'])"); + + assertUpdate(session, "INSERT INTO " + tableName + " VALUES (1)", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES 1"); + + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testReadFromVersionedTableWithSchemaEvolution() {