diff --git a/presto-native-execution/CMakeLists.txt b/presto-native-execution/CMakeLists.txt index 1f4e8d69d2d19..69763d098e63d 100644 --- a/presto-native-execution/CMakeLists.txt +++ b/presto-native-execution/CMakeLists.txt @@ -165,6 +165,7 @@ include_directories(.) include_directories(velox) include_directories(velox/velox/external/xxhash) include_directories(${VELOX_ROOT}) +include_directories(${CMAKE_BINARY_DIR}) add_subdirectory(velox) diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index a3d63fdb8ce6c..897ce6a087748 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -25,20 +25,8 @@ #include "presto_cpp/main/types/TypeSignatureTypeConverter.h" // clang-format on -#include -#include -#include - #include -#if __has_include("filesystem") -#include -namespace fs = std::filesystem; -#else -#include -namespace fs = std::experimental::filesystem; -#endif - using namespace facebook::velox; namespace facebook::presto { @@ -112,6 +100,46 @@ std::shared_ptr toColumnHandle( throw std::invalid_argument("Unknown column handle type: " + column->_type); } +connector::hive::LocationHandle::TableType toTableType( + protocol::TableType tableType) { + switch (tableType) { + case protocol::TableType::NEW: + return connector::hive::LocationHandle::TableType::kNew; + case protocol::TableType::EXISTING: + return connector::hive::LocationHandle::TableType::kExisting; + case protocol::TableType::TEMPORARY: + return connector::hive::LocationHandle::TableType::kTemporary; + default: + throw std::invalid_argument("Unknown table type"); + } +} + +connector::hive::LocationHandle::WriteMode toWriteMode( + protocol::WriteMode writeMode) { + switch (writeMode) { + case protocol::WriteMode::STAGE_AND_MOVE_TO_TARGET_DIRECTORY: + return connector::hive::LocationHandle::WriteMode:: + kStageAndMoveToTargetDirectory; + case protocol::WriteMode::DIRECT_TO_TARGET_NEW_DIRECTORY: + return connector::hive::LocationHandle::WriteMode:: + kDirectToTargetNewDirectory; + case protocol::WriteMode::DIRECT_TO_TARGET_EXISTING_DIRECTORY: + return connector::hive::LocationHandle::WriteMode:: + kDirectToTargetExistingDirectory; + default: + throw std::invalid_argument("Unknown write mode"); + } +} + +std::shared_ptr toLocationHandle( + const protocol::LocationHandle& locationHandle) { + return std::make_shared( + locationHandle.targetPath, + locationHandle.writePath, + toTableType(locationHandle.tableType), + toWriteMode(locationHandle.writeMode)); +} + int64_t toInt64( const std::shared_ptr& block, const VeloxExprConverter& exprConverter, @@ -1632,24 +1660,51 @@ VeloxQueryPlanConverter::toVeloxQueryPlan( const std::shared_ptr& node, const std::shared_ptr& tableWriteInfo, const protocol::TaskId& taskId) { - auto outputTableHandle = std::dynamic_pointer_cast( - tableWriteInfo->writerTarget) - ->handle; - - auto hiveOutputTableHandle = - std::dynamic_pointer_cast( - outputTableHandle.connectorHandle); + std::string connectorId; + std::vector> + inputColumns; + std::shared_ptr hiveTableHandle; + if (auto createHandle = std::dynamic_pointer_cast( + tableWriteInfo->writerTarget)) { + connectorId = createHandle->handle.connectorId; + + auto hiveOutputTableHandle = + std::dynamic_pointer_cast( + createHandle->handle.connectorHandle); + + for (const auto& columnHandle : hiveOutputTableHandle->inputColumns) { + inputColumns.emplace_back( + std::dynamic_pointer_cast( + toColumnHandle(&columnHandle))); + } - auto uuid = boost::uuids::random_generator()(); - auto fileName = boost::uuids::to_string(uuid); + hiveTableHandle = std::make_shared( + inputColumns, toLocationHandle(hiveOutputTableHandle->locationHandle)); + } else if ( + auto insertHandle = std::dynamic_pointer_cast( + tableWriteInfo->writerTarget)) { + connectorId = insertHandle->handle.connectorId; + + auto hiveInsertTableHandle = + std::dynamic_pointer_cast( + insertHandle->handle.connectorHandle); + + for (const auto& columnHandle : hiveInsertTableHandle->inputColumns) { + inputColumns.emplace_back( + std::dynamic_pointer_cast( + toColumnHandle(&columnHandle))); + } - auto filePath = - fs::path(hiveOutputTableHandle->locationHandle.writePath) / fileName; + hiveTableHandle = std::make_shared( + inputColumns, toLocationHandle(hiveInsertTableHandle->locationHandle)); + } else { + VELOX_UNSUPPORTED( + "Unsupported table writer handle: {}", + toJsonString(tableWriteInfo->writerTarget)); + } - auto hiveTableHandle = - std::make_shared(filePath); - auto insertTableHandle = std::make_shared( - outputTableHandle.connectorId, hiveTableHandle); + auto insertTableHandle = + std::make_shared(connectorId, hiveTableHandle); auto outputType = toRowType( {node->rowCountVariable, diff --git a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp index 2bf4ebef08b48..b849ce257bcae 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp @@ -6412,7 +6412,7 @@ void to_json(json& j, const std::shared_ptr& p) { } String type = p->_type; - if (type == "hive") { + if (getConnectorKey(type) == "hive") { j = *std::static_pointer_cast(p); return; } @@ -6430,7 +6430,7 @@ void from_json(const json& j, std::shared_ptr& p) { " ConnectorInsertTableHandle ConnectorInsertTableHandle"); } - if (type == "hive") { + if (getConnectorKey(type) == "hive") { std::shared_ptr k = std::make_shared(); j.get_to(*k); diff --git a/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorInsertTableHandle.cpp.inc b/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorInsertTableHandle.cpp.inc new file mode 100644 index 0000000000000..90e89be0faa94 --- /dev/null +++ b/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorInsertTableHandle.cpp.inc @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace facebook::presto::protocol { +void to_json(json& j, const std::shared_ptr& p) { + if (p == nullptr) { + return; + } + String type = p->_type; + + if (getConnectorKey(type) == "hive") { + j = *std::static_pointer_cast(p); + return; + } + + throw TypeError(type + " no abstract type ConnectorInsertTableHandle "); +} + +void from_json(const json& j, std::shared_ptr& p) { + String type; + try { + type = p->getSubclassKey(j); + } catch (json::parse_error& e) { + throw ParseError( + std::string(e.what()) + + " ConnectorInsertTableHandle ConnectorInsertTableHandle"); + } + + if (getConnectorKey(type) == "hive") { + std::shared_ptr k = + std::make_shared(); + j.get_to(*k); + p = std::static_pointer_cast(k); + return; + } + + throw TypeError(type + " no abstract type ConnectorInsertTableHandle "); +} +} // namespace facebook::presto::protocol diff --git a/presto-native-execution/velox b/presto-native-execution/velox index f65557fc30f5d..22bed48d4d187 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit f65557fc30f5dcd88e2094c99cfd8433552aa310 +Subproject commit 22bed48d4d1879241c9da074dc967b01424e4627