From 879761b04f003620711b77bae28387985fe27a5b Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Tue, 16 May 2023 17:33:20 -0400 Subject: [PATCH 1/2] [native pos] Fail, not hang, broadcast queries Broadcast queries used to hang forever. Now these queries fail quickly with a clear error message: > Broadcast shuffle is not supported --- .../main/types/PrestoToVeloxQueryPlan.cpp | 25 +++++++++---------- .../TestPrestoSparkNativeJoinQueries.java | 9 +++++++ 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index ed23d2cd2d35e..8caa620ef8feb 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -2068,9 +2068,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( false, // broadcast partitioningScheme.replicateNullsAndAny, std::make_shared( - inputType, - keyChannels, - constValues), + inputType, keyChannels, constValues), outputType, sourceNode); return planFragment; @@ -2199,6 +2197,15 @@ velox::core::PlanFragment VeloxBatchQueryPlanConverter::toVeloxQueryPlan( auto planFragment = VeloxQueryPlanConverterBase::toVeloxQueryPlan( fragment, tableWriteInfo, taskId); + auto partitionedOutputNode = + std::dynamic_pointer_cast( + planFragment.planNode); + if (partitionedOutputNode) { + VELOX_USER_CHECK( + !partitionedOutputNode->isBroadcast(), + "Broadcast shuffle is not supported"); + } + // If the serializedShuffleWriteInfo is not nullptr, it means this fragment // ends with a shuffle stage. We convert the PartitionedOutputNode to a // chain of following nodes: @@ -2214,16 +2221,8 @@ velox::core::PlanFragment VeloxBatchQueryPlanConverter::toVeloxQueryPlan( return planFragment; } - auto partitionedOutputNode = - std::dynamic_pointer_cast( - planFragment.planNode); - VELOX_CHECK( - partitionedOutputNode != nullptr, "PartitionedOutputNode is required"); - if (partitionedOutputNode->isBroadcast()) { - VELOX_UNSUPPORTED( - "Broadcast partitioned output node in batch is currently not " - "supported."); - } + VELOX_USER_CHECK_NOT_NULL( + partitionedOutputNode, "PartitionedOutputNode is required"); auto source = addProjectIfNeeded( partitionedOutputNode->sources()[0], partitionedOutputNode->outputType()); diff --git a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeJoinQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeJoinQueries.java index d2b769bb16f33..47b9d850e47bf 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeJoinQueries.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeJoinQueries.java @@ -13,10 +13,12 @@ */ package com.facebook.presto.spark; +import com.facebook.presto.Session; import com.facebook.presto.nativeworker.AbstractTestNativeJoinQueries; import com.facebook.presto.testing.ExpectedQueryRunner; import com.facebook.presto.testing.QueryRunner; import org.testng.annotations.Ignore; +import org.testng.annotations.Test; public class TestPrestoSparkNativeJoinQueries extends AbstractTestNativeJoinQueries @@ -47,6 +49,13 @@ public Object[][] joinTypeProviderImpl() return new Object[][] {{partitionedJoin()}}; } + @Test + public void testBroadcastJoin() + { + assertQueryFails(broadcastJoin(), "SELECT * FROM orders o, lineitem l WHERE o.orderkey = l.orderkey", + ".*Broadcast shuffle is not supported"); + } + // TODO: Enable following Ignored tests after fixing (Tests can be enabled by removing the method) @Override @Ignore From 079113c33b09637df4d698113e2ef2559f345d15 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Tue, 16 May 2023 22:29:57 -0400 Subject: [PATCH 2/2] [native pos] Fail queries that require replicate-nulls-and-any shuffle mode This mode is currently not supported. Ignoring this flag may cause hard-to-debug incorrect results. --- .../main/types/PrestoToVeloxQueryPlan.cpp | 20 +++++++++++-------- .../TestPrestoSparkNativeJoinQueries.java | 10 ++++++++++ .../TestPrestoSparkNativeTpchQueries.java | 5 +++++ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index 8caa620ef8feb..acf17d94e1022 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -2200,11 +2200,17 @@ velox::core::PlanFragment VeloxBatchQueryPlanConverter::toVeloxQueryPlan( auto partitionedOutputNode = std::dynamic_pointer_cast( planFragment.planNode); - if (partitionedOutputNode) { - VELOX_USER_CHECK( - !partitionedOutputNode->isBroadcast(), - "Broadcast shuffle is not supported"); - } + + VELOX_USER_CHECK_NOT_NULL( + partitionedOutputNode, "PartitionedOutputNode is required"); + + VELOX_USER_CHECK( + !partitionedOutputNode->isBroadcast(), + "Broadcast shuffle is not supported"); + + VELOX_USER_CHECK( + !partitionedOutputNode->isReplicateNullsAndAny(), + "Replicate-nulls-and-any shuffle mode is not supported."); // If the serializedShuffleWriteInfo is not nullptr, it means this fragment // ends with a shuffle stage. We convert the PartitionedOutputNode to a @@ -2218,12 +2224,10 @@ velox::core::PlanFragment VeloxBatchQueryPlanConverter::toVeloxQueryPlan( // TableWriteNode can also have PartitionedOutputNode to distribute the // metadata to coordinator. if (serializedShuffleWriteInfo_ == nullptr) { + VELOX_USER_CHECK_EQ(1, partitionedOutputNode->numPartitions()); return planFragment; } - VELOX_USER_CHECK_NOT_NULL( - partitionedOutputNode, "PartitionedOutputNode is required"); - auto source = addProjectIfNeeded( partitionedOutputNode->sources()[0], partitionedOutputNode->outputType()); diff --git a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeJoinQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeJoinQueries.java index 47b9d850e47bf..4b8f6c4fea6b6 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeJoinQueries.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeJoinQueries.java @@ -57,6 +57,16 @@ public void testBroadcastJoin() } // TODO: Enable following Ignored tests after fixing (Tests can be enabled by removing the method) + + // Semi and anti joins require support for replicate-nulls-and-any mode in shuffle. + @Override + @Ignore + public void testAntiJoin(Session joinTypeSession) {} + + @Override + @Ignore + public void testSemiJoin(Session joinTypeSession) {} + @Override @Ignore public void testCrossJoin() {} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeTpchQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeTpchQueries.java index 3ea60ff7ea63a..c3b88c5634a2b 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeTpchQueries.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeTpchQueries.java @@ -58,6 +58,11 @@ public void testTpchQ11() {} @Ignore public void testTpchQ15() {} + // Requires support for replicate-nulls-and-any shuffle mode. + @Override + @Ignore + public void testTpchQ16() {} + @Override @Ignore public void testTpchQ18() {}