diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp index a50de8095df7f..b87c50e841404 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp @@ -14,7 +14,6 @@ #include #include -#include "presto_cpp/main/common/Configs.h" #include "presto_cpp/main/operators/BroadcastExchangeSource.h" using namespace facebook::velox; @@ -133,12 +132,14 @@ BroadcastExchangeSource::createExchangeSource( VELOX_USER_FAIL("BroadcastInfo deserialization failed: {}", e.what()); } - auto fileSystemBroadcast = BroadcastFactory(broadcastFileInfo->filePath_); + auto fileSystem = + velox::filesystems::getFileSystem(broadcastFileInfo->filePath_, nullptr); return std::make_shared( uri.host(), destination, queue, - fileSystemBroadcast.createReader(std::move(broadcastFileInfo), pool), + std::make_shared( + broadcastFileInfo, fileSystem, pool), pool); } } // namespace facebook::presto::operators diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.h b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.h index 75ae8e0cbbcf6..9b23c4cbe4713 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.h @@ -13,10 +13,9 @@ */ #pragma once -#include "presto_cpp/main/operators/BroadcastFactory.h" -#include "velox/core/PlanNode.h" -#include "velox/exec/Exchange.h" -#include "velox/exec/Operator.h" +#include "presto_cpp/main/operators/BroadcastFile.h" +#include "velox/exec/ExchangeQueue.h" +#include "velox/exec/ExchangeSource.h" namespace facebook::presto::operators { diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastFile.cpp similarity index 87% rename from presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp rename to presto-native-execution/presto_cpp/main/operators/BroadcastFile.cpp index 08da15cf55660..c8565712b7226 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastFile.cpp @@ -11,10 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "presto_cpp/main/operators/BroadcastFactory.h" -#include -#include -#include +#include "presto_cpp/main/operators/BroadcastFile.h" #include "presto_cpp/external/json/nlohmann/json.hpp" #include "presto_cpp/main/common/Exception.h" #include "presto_cpp/main/thrift/ThriftIO.h" @@ -39,40 +36,6 @@ namespace facebook::presto::operators { "{}", \ errorMessage); -namespace { -std::string makeUuid() { - return boost::lexical_cast(boost::uuids::random_generator()()); -} -} // namespace - -BroadcastFactory::BroadcastFactory(const std::string& basePath) - : basePath_(basePath) { - VELOX_CHECK(!basePath.empty(), "Base path for broadcast files is empty!"); - fileSystem_ = velox::filesystems::getFileSystem(basePath, nullptr); -} - -std::unique_ptr BroadcastFactory::createWriter( - uint64_t writeBufferSize, - uint64_t maxBroadcastBytes, - velox::memory::MemoryPool* pool, - std::unique_ptr serdeOptions) { - fileSystem_->mkdir(basePath_); - return std::make_unique( - fmt::format("{}/file_broadcast_{}", basePath_, makeUuid()), - maxBroadcastBytes, - writeBufferSize, - std::move(serdeOptions), - pool); -} - -std::shared_ptr BroadcastFactory::createReader( - std::unique_ptr fileInfo, - velox::memory::MemoryPool* pool) { - auto broadcastFileReader = - std::make_shared(fileInfo, fileSystem_, pool); - return broadcastFileReader; -} - // static std::unique_ptr BroadcastFileInfo::deserialize( const std::string& info) { diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.h b/presto-native-execution/presto_cpp/main/operators/BroadcastFile.h similarity index 84% rename from presto-native-execution/presto_cpp/main/operators/BroadcastFactory.h rename to presto-native-execution/presto_cpp/main/operators/BroadcastFile.h index 8bc9e47420010..6aff0bc39626e 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.h +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastFile.h @@ -118,27 +118,4 @@ class BroadcastFileReader { uint32_t numPagesRead_{0}; std::vector pageSizes_; }; - -/// Factory to create Writers & Reader for file based broadcast. -class BroadcastFactory { - public: - /// Create FileBroadcast to write files under specified basePath. - BroadcastFactory(const std::string& basePath); - - virtual ~BroadcastFactory() = default; - - std::unique_ptr createWriter( - uint64_t writeBufferSize, - uint64_t maxBroadcastBytes, - velox::memory::MemoryPool* pool, - std::unique_ptr serdeOptions); - - std::shared_ptr createReader( - const std::unique_ptr fileInfo, - velox::memory::MemoryPool* pool); - - private: - const std::string basePath_; - std::shared_ptr fileSystem_; -}; } // namespace facebook::presto::operators diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp index 796a78e36f622..385a0761a781e 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp @@ -12,14 +12,21 @@ * limitations under the License. */ #include "presto_cpp/main/operators/BroadcastWrite.h" -#include "presto_cpp/main/common/Configs.h" -#include "presto_cpp/main/operators/BroadcastFactory.h" +#include +#include +#include +#include "presto_cpp/main/operators/BroadcastFile.h" +#include "velox/common/file/FileSystems.h" using namespace facebook::velox::exec; using namespace facebook::velox; namespace facebook::presto::operators { namespace { +std::string makeUuid() { + return boost::lexical_cast(boost::uuids::random_generator()()); +} + velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) { return obj["id"].asString(); } @@ -58,12 +65,16 @@ class BroadcastWriteOperator : public Operator { planNode->serdeRowType(), planNode->serdeRowType())), maxBroadcastBytes_(planNode->maxBroadcastBytes()) { - auto fileBroadcast = BroadcastFactory(planNode->basePath()); - fileBroadcastWriter_ = fileBroadcast.createWriter( - 8 << 20, + const auto& basePath = planNode->basePath(); + VELOX_CHECK(!basePath.empty(), "Base path for broadcast files is empty!"); + auto fileSystem = velox::filesystems::getFileSystem(basePath, nullptr); + fileSystem->mkdir(basePath); + fileBroadcastWriter_ = std::make_unique( + fmt::format("{}/file_broadcast_{}", basePath, makeUuid()), planNode->maxBroadcastBytes(), - operatorCtx_->pool(), - getVectorSerdeOptions(ctx->queryConfig(), VectorSerde::Kind::kPresto)); + 8 << 20, + getVectorSerdeOptions(ctx->queryConfig(), VectorSerde::Kind::kPresto), + operatorCtx_->pool()); } bool needsInput() const override { diff --git a/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt b/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt index b14e7ff0cd3e3..bec2e9dde9631 100644 --- a/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt @@ -19,7 +19,7 @@ add_library( ShuffleWrite.cpp LocalShuffle.cpp BroadcastWrite.cpp - BroadcastFactory.cpp + BroadcastFile.cpp BroadcastExchangeSource.cpp BinarySortableSerializer.cpp ) diff --git a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp index ee8426e406d20..7899de068ed21 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp @@ -15,6 +15,7 @@ #include #include "presto_cpp/main/common/Exception.h" #include "presto_cpp/main/operators/BroadcastExchangeSource.h" +#include "presto_cpp/main/operators/BroadcastFile.h" #include "presto_cpp/main/operators/BroadcastWrite.h" #include "presto_cpp/main/operators/tests/PlanBuilder.h" #include "velox/buffer/Buffer.h" @@ -22,6 +23,8 @@ #include "velox/common/compression/Compression.h" #include "velox/common/file/FileSystems.h" #include "velox/core/QueryConfig.h" +#include "velox/exec/Exchange.h" +#include "velox/exec/ExchangeSource.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/QueryAssertions.h"