Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.trino.plugin.iceberg;

import io.trino.spi.NodeManager;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.connector.ConnectorBucketNodeMap;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -26,22 +28,36 @@
import javax.inject.Inject;

import java.util.List;
import java.util.Optional;

import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.spi.connector.ConnectorBucketNodeMap.createBucketNodeMap;
import static java.util.Objects.requireNonNull;

public class IcebergNodePartitioningProvider
implements ConnectorNodePartitioningProvider
{
private final NodeManager nodeManager;
private final TypeOperators typeOperators;

@Inject
public IcebergNodePartitioningProvider(TypeManager typeManager)
public IcebergNodePartitioningProvider(NodeManager nodeManager, TypeManager typeManager)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.typeOperators = requireNonNull(typeManager, "typeManager is null").getTypeOperators();
}

@Override
public Optional<ConnectorBucketNodeMap> getBucketNodeMapping(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
if (partitioningHandle instanceof IcebergUpdateHandle) {
return Optional.empty();
}

return Optional.of(createBucketNodeMap(nodeManager.getRequiredWorkerNodes().size()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I don't understand the change.
Is ConnectorNodePartitioningProvider.getBucketNodeMapping mandatory to implement?
@electrum 's 3207925 (part of #7933) suggests it should be optional to implement this method.

If it's optional, do we have a bug in the engine, which manifests only when this method is not implemented?
If so, shouldn't we have a fix in the engine?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a bug in the engine. I believe this implementation will break MERGE.

Copy link
Copy Markdown
Member

@electrum electrum Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementing the method in this way causes MERGE to fail with

Insert and update layout have mismatched BucketNodeMap

Which is why we made it optional to implement this method. We need to track down why the task_writer_count causes the query to fail. None of the existing integration tests caught this case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@electrum Can you take over the issue?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can look at this. Thanks for writing the test, it's helpful.

}

@Override
public BucketFunction getBucketFunction(
ConnectorTransactionHandle transactionHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5314,6 +5314,24 @@ public void testInsertingIntoTablesWithColumnsWithQuotesInName()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testInsertIntoBucketedColumnWhenTaskWriterCountIsGreaterThanOrEqualToNodeCount()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're testing the greater than case (or equal case), not both

{
int taskWriterCount = 4;
assertThat(taskWriterCount).isGreaterThanOrEqualTo(getQueryRunner().getNodeCount());
Comment on lines +5320 to +5321
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be explicit which situation you're testing (equal, or greater than)

Session session = Session.builder(getSession())
.setSystemProperty("task_writer_count", String.valueOf(taskWriterCount))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TASK_WRITER_COUNT is a public constant, you can use it here

.build();

String tableName = "test_inserting_into_bucketed_column_when_task_writer_count_is_greater_than_or_equal_to_node_count_" + randomTableSuffix();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the name shorter

assertUpdate("CREATE TABLE " + tableName + " (bucketed_col INT) WITH (partitioning = ARRAY['bucket(bucketed_col, 10)'])");

assertUpdate(session, "INSERT INTO " + tableName + " VALUES (1)", 1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INSERT nationkey SELECT FROM tpch.tiny.nation
otherwise the planner could realize it's inserting exactly one row, and could limit writer count to 1 without talking to the connector.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, worth adding cases with CTAS, UPDATE, DELETE and MERGE

assertQuery("SELECT * FROM " + tableName, "VALUES 1");

assertUpdate("DROP TABLE " + tableName);
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW the problem doesn't look Iceberg specific. Should this be part of BCT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a way of creating bucketed tables in BCT? I think only Hive and Iceberg would support this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the problem about bucketed tables only? Are "plainly partitioned" tables not affected?

Anyway, i hear you on the test setup challenge, making it hard to test in BCT.


@Test
public void testReadFromVersionedTableWithSchemaEvolution()
{
Expand Down