Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2068,9 +2068,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
false, // broadcast
partitioningScheme.replicateNullsAndAny,
std::make_shared<HashPartitionFunctionSpec>(
inputType,
keyChannels,
constValues),
inputType, keyChannels, constValues),
outputType,
sourceNode);
return planFragment;
Expand Down Expand Up @@ -2199,6 +2197,21 @@ velox::core::PlanFragment VeloxBatchQueryPlanConverter::toVeloxQueryPlan(
auto planFragment = VeloxQueryPlanConverterBase::toVeloxQueryPlan(
fragment, tableWriteInfo, taskId);

auto partitionedOutputNode =
std::dynamic_pointer_cast<const core::PartitionedOutputNode>(
planFragment.planNode);

VELOX_USER_CHECK_NOT_NULL(
partitionedOutputNode, "PartitionedOutputNode is required");

VELOX_USER_CHECK(
!partitionedOutputNode->isBroadcast(),
"Broadcast shuffle is not supported");

VELOX_USER_CHECK(
!partitionedOutputNode->isReplicateNullsAndAny(),
"Replicate-nulls-and-any shuffle mode is not supported.");

// If the serializedShuffleWriteInfo is not nullptr, it means this fragment
// ends with a shuffle stage. We convert the PartitionedOutputNode to a
// chain of following nodes:
Expand All @@ -2211,20 +2224,10 @@ velox::core::PlanFragment VeloxBatchQueryPlanConverter::toVeloxQueryPlan(
// TableWriteNode can also have PartitionedOutputNode to distribute the
// metadata to coordinator.
if (serializedShuffleWriteInfo_ == nullptr) {
VELOX_USER_CHECK_EQ(1, partitionedOutputNode->numPartitions());
return planFragment;
}

auto partitionedOutputNode =
std::dynamic_pointer_cast<const core::PartitionedOutputNode>(
planFragment.planNode);
VELOX_CHECK(
partitionedOutputNode != nullptr, "PartitionedOutputNode is required");
if (partitionedOutputNode->isBroadcast()) {
VELOX_UNSUPPORTED(
"Broadcast partitioned output node in batch is currently not "
"supported.");
}

auto source = addProjectIfNeeded(
partitionedOutputNode->sources()[0], partitionedOutputNode->outputType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
*/
package com.facebook.presto.spark;

import com.facebook.presto.Session;
import com.facebook.presto.nativeworker.AbstractTestNativeJoinQueries;
import com.facebook.presto.testing.ExpectedQueryRunner;
import com.facebook.presto.testing.QueryRunner;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

public class TestPrestoSparkNativeJoinQueries
extends AbstractTestNativeJoinQueries
Expand Down Expand Up @@ -47,7 +49,24 @@ public Object[][] joinTypeProviderImpl()
return new Object[][] {{partitionedJoin()}};
}

@Test
public void testBroadcastJoin()
{
assertQueryFails(broadcastJoin(), "SELECT * FROM orders o, lineitem l WHERE o.orderkey = l.orderkey",
".*Broadcast shuffle is not supported");
}

// TODO: Enable following Ignored tests after fixing (Tests can be enabled by removing the method)

// Semi and anti joins require support for replicate-nulls-and-any mode in shuffle.
@Override
@Ignore
public void testAntiJoin(Session joinTypeSession) {}

@Override
@Ignore
public void testSemiJoin(Session joinTypeSession) {}

@Override
@Ignore
public void testCrossJoin() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public void testTpchQ11() {}
@Ignore
public void testTpchQ15() {}

// Requires support for replicate-nulls-and-any shuffle mode.
@Override
@Ignore
public void testTpchQ16() {}

@Override
@Ignore
public void testTpchQ18() {}
Expand Down