From 539fa190d729ce12eebb8f3a64f3f1649b9458f9 Mon Sep 17 00:00:00 2001 From: MJ Deng Date: Thu, 22 Dec 2022 13:42:21 -0800 Subject: [PATCH 1/2] Fix BatchTaskUpdate request URI matching order --- .../presto_cpp/main/TaskResource.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/TaskResource.cpp b/presto-native-execution/presto_cpp/main/TaskResource.cpp index 7b75c8e6e48ae..691abbf0c0532 100644 --- a/presto-native-execution/presto_cpp/main/TaskResource.cpp +++ b/presto-native-execution/presto_cpp/main/TaskResource.cpp @@ -112,18 +112,20 @@ void TaskResource::registerUris(http::HttpServer& server) { return acknowledgeResults(message, pathMatch); }); + // task/(.+)/batch must come before the /v1/task/(.+) as it's more specific + // otherwise all requests will be matched with /v1/task/(.+) server.registerPost( - R"(/v1/task/(.+))", + R"(/v1/task/(.+)/batch)", [&](proxygen::HTTPMessage* message, const std::vector& pathMatch) { - return createOrUpdateTask(message, pathMatch); + return createOrUpdateBatchTask(message, pathMatch); }); server.registerPost( - R"(/v1/task/(.+)/batch)", + R"(/v1/task/(.+))", [&](proxygen::HTTPMessage* message, const std::vector& pathMatch) { - return createOrUpdateBatchTask(message, pathMatch); + return createOrUpdateTask(message, pathMatch); }); server.registerDelete( From 1cee0f10b534f107e49a37f2abf15992680551e0 Mon Sep 17 00:00:00 2001 From: MJ Deng Date: Thu, 22 Dec 2022 13:50:00 -0800 Subject: [PATCH 2/2] Fix batch query plan convertor for non shuffle plan --- .../main/types/PrestoToVeloxQueryPlan.cpp | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index 8439da826b42c..279cb710a81a8 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -2151,23 +2151,25 @@ velox::core::PlanFragment VeloxQueryPlanConverter::toBatchVeloxQueryPlan( std::shared_ptr&& serializedShuffleWriteInfo) { auto planFragment = toVeloxQueryPlan(fragment, tableWriteInfo, taskId); - // If the last node is a PartitionedOutputNode, it means this fragment ends - // with a shuffle stage. We convert the PartitionedOutputNode to a chain of - // following nodes: + // If the serializedShuffleWriteInfo is not nullptr, it means this fragment + // ends with a shuffle stage. We convert the PartitionedOutputNode to a + // chain of following nodes: // (1) A PartitionAndSerializeNode. // (2) A "gather" LocalPartitionNode that gathers results from multiple // threads to one thread. // (3) A ShuffleWriteNode. + // To be noted, whether the last node of the plan is PartitionedOutputNode + // can't guarantee the query has shuffle stage, for example a plan with + // TableWriteNode can also have PartitionedOutputNode to distribute the + // metadata to coordinator. + if (serializedShuffleWriteInfo == nullptr) { + return planFragment; + } auto partitionedOutputNode = std::dynamic_pointer_cast( planFragment.planNode); - VELOX_CHECK_EQ( - partitionedOutputNode == nullptr, - serializedShuffleWriteInfo == nullptr, - "Writer shuffle info and PartitionedOutputNode should be set together"); - if (partitionedOutputNode == nullptr) { - return planFragment; - } + VELOX_CHECK( + partitionedOutputNode != nullptr, "PartitionedOutputNode is required"); if (partitionedOutputNode->isBroadcast()) { VELOX_UNSUPPORTED( "Broadcast partitioned output node in batch is currently not "