Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,45 +82,47 @@ public PartitionFunction getPartitionFunction(
PartitioningScheme partitioningScheme,
List<Type> partitionChannelTypes)
{
Optional<int[]> bucketToPartition = partitioningScheme.getBucketToPartition();
checkArgument(bucketToPartition.isPresent(), "Bucket to partition must be set before a partition function can be created");
int[] bucketToPartition = partitioningScheme.getBucketToPartition()
.orElseThrow(() -> new IllegalArgumentException("Bucket to partition must be set before a partition function can be created"));

PartitioningHandle partitioningHandle = partitioningScheme.getPartitioning().getHandle();
if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle) {
checkArgument(partitioningScheme.getBucketToPartition().isPresent(), "Bucket to partition must be set before a partition function can be created");

return ((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getPartitionFunction(
partitionChannelTypes,
partitioningScheme.getHashColumn().isPresent(),
partitioningScheme.getBucketToPartition().get(),
bucketToPartition,
blockTypeOperators);
}

BucketFunction bucketFunction = getBucketFunction(session, partitioningHandle, partitionChannelTypes, bucketToPartition.get().length);
return new BucketPartitionFunction(bucketFunction, partitioningScheme.getBucketToPartition().get());
BucketFunction bucketFunction = getBucketFunction(session, partitioningHandle, partitionChannelTypes, bucketToPartition.length);
return new BucketPartitionFunction(bucketFunction, bucketToPartition);
}

public BucketFunction getBucketFunction(Session session, PartitioningHandle partitioning, List<Type> partitionChannelTypes, int bucketCount)
public BucketFunction getBucketFunction(Session session, PartitioningHandle partitioningHandle, List<Type> partitionChannelTypes, int bucketCount)
{
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(partitioning.getConnectorId().get());
CatalogName catalogName = partitioningHandle.getConnectorId()
.orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(catalogName);
BucketFunction bucketFunction = partitioningProvider.getBucketFunction(
partitioning.getTransactionHandle().orElse(null),
partitioningHandle.getTransactionHandle().orElseThrow(() -> new IllegalArgumentException("No transactionHandle for partitioning handle: " + partitioningHandle)),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is transaction handle optional then? What is the case when it is not set? Worth a comment or more descriptive exception message?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the empty transaction handle is for SystemPartitioningHandle (internal use)

session.toConnectorSession(),
partitioning.getConnectorHandle(),
partitioningHandle.getConnectorHandle(),
partitionChannelTypes,
bucketCount);
checkArgument(bucketFunction != null, "No bucket function for partitioning: %s", partitioning);
checkArgument(bucketFunction != null, "No bucket function for partitioning: %s", partitioningHandle);
return bucketFunction;
}

public List<ConnectorPartitionHandle> listPartitionHandles(
Session session,
PartitioningHandle partitioningHandle)
{
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(partitioningHandle.getConnectorId().get());
CatalogName catalogName = partitioningHandle.getConnectorId()
.orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(catalogName);
return partitioningProvider.listPartitionHandles(
partitioningHandle.getTransactionHandle().orElse(null),
session.toConnectorSession(partitioningHandle.getConnectorId().get()),
partitioningHandle.getTransactionHandle().orElseThrow(() -> new IllegalArgumentException("No transactionHandle for partitioning handle: " + partitioningHandle)),
session.toConnectorSession(catalogName),
partitioningHandle.getConnectorHandle());
}

Expand Down Expand Up @@ -197,24 +199,28 @@ private static List<InternalNode> getFixedMapping(ConnectorBucketNodeMap connect
.collect(toImmutableList());
}

public ConnectorBucketNodeMap getConnectorBucketNodeMap(Session session, PartitioningHandle partitioning)
public ConnectorBucketNodeMap getConnectorBucketNodeMap(Session session, PartitioningHandle partitioningHandle)
{
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(partitioning.getConnectorId().get());
CatalogName catalogName = partitioningHandle.getConnectorId()
.orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(catalogName);
ConnectorBucketNodeMap connectorBucketNodeMap = partitioningProvider.getBucketNodeMap(
partitioning.getTransactionHandle().orElse(null),
session.toConnectorSession(partitioning.getConnectorId().get()),
partitioning.getConnectorHandle());
checkArgument(connectorBucketNodeMap != null, "No partition map %s", partitioning);
partitioningHandle.getTransactionHandle().orElseThrow(() -> new IllegalArgumentException("No transactionHandle for partitioning handle: " + partitioningHandle)),
session.toConnectorSession(catalogName),
partitioningHandle.getConnectorHandle());
checkArgument(connectorBucketNodeMap != null, "No partition map %s", partitioningHandle);
return connectorBucketNodeMap;
}

private ToIntFunction<Split> getSplitToBucket(Session session, PartitioningHandle partitioningHandle)
{
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(partitioningHandle.getConnectorId().get());
CatalogName catalogName = partitioningHandle.getConnectorId()
.orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(catalogName);

ToIntFunction<ConnectorSplit> splitBucketFunction = partitioningProvider.getSplitBucketFunction(
partitioningHandle.getTransactionHandle().orElse(null),
session.toConnectorSession(partitioningHandle.getConnectorId().get()),
partitioningHandle.getTransactionHandle().orElseThrow(() -> new IllegalArgumentException("No transactionHandle for partitioning handle: " + partitioningHandle)),
session.toConnectorSession(catalogName),
partitioningHandle.getConnectorHandle());
checkArgument(splitBucketFunction != null, "No partitioning %s", partitioningHandle);

Expand Down