diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 988895a824109..99c596e064df4 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -36,7 +36,7 @@ #include "presto_cpp/main/http/filters/StatsFilter.h" #include "presto_cpp/main/operators/BroadcastExchangeSource.h" #include "presto_cpp/main/operators/BroadcastWrite.h" -#include "presto_cpp/main/operators/LocalPersistentShuffle.h" +#include "presto_cpp/main/operators/LocalShuffle.h" #include "presto_cpp/main/operators/PartitionAndSerialize.h" #include "presto_cpp/main/operators/ShuffleExchangeSource.h" #include "presto_cpp/main/operators/ShuffleRead.h" diff --git a/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt b/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt index 104bf74db3825..6f1be6b01bca4 100644 --- a/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt @@ -17,7 +17,7 @@ add_library( ShuffleExchangeSource.cpp ShuffleRead.cpp ShuffleWrite.cpp - LocalPersistentShuffle.cpp + LocalShuffle.cpp BroadcastWrite.cpp BroadcastFactory.cpp BroadcastExchangeSource.cpp diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp similarity index 90% rename from presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp rename to presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp index 158583aa677a5..68f2fa1c838ff 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp +++ b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "presto_cpp/main/operators/LocalPersistentShuffle.h" +#include "presto_cpp/main/operators/LocalShuffle.h" #include "presto_cpp/external/json/nlohmann/json.hpp" #include "presto_cpp/main/common/Configs.h" @@ -45,7 +45,7 @@ inline std::string createShuffleFileName( const static std::string kReadyForReadFilename = "readyForRead"; } // namespace -LocalPersistentShuffleWriter::LocalPersistentShuffleWriter( +LocalShuffleWriter::LocalShuffleWriter( const std::string& rootPath, const std::string& queryId, uint32_t shuffleId, @@ -67,13 +67,13 @@ LocalPersistentShuffleWriter::LocalPersistentShuffleWriter( fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr); } -std::unique_ptr -LocalPersistentShuffleWriter::getNextOutputFile(int32_t partition) { +std::unique_ptr LocalShuffleWriter::getNextOutputFile( + int32_t partition) { auto filename = nextAvailablePartitionFileName(rootPath_, partition); return fileSystem_->openFileForWrite(filename); } -std::string LocalPersistentShuffleWriter::nextAvailablePartitionFileName( +std::string LocalShuffleWriter::nextAvailablePartitionFileName( const std::string& root, int32_t partition) const { int fileCount = 0; @@ -92,7 +92,7 @@ std::string LocalPersistentShuffleWriter::nextAvailablePartitionFileName( return filename; } -void LocalPersistentShuffleWriter::storePartitionBlock(int32_t partition) { +void LocalShuffleWriter::storePartitionBlock(int32_t partition) { auto& buffer = inProgressPartitions_[partition]; auto file = getNextOutputFile(partition); file->append( @@ -102,7 +102,7 @@ void LocalPersistentShuffleWriter::storePartitionBlock(int32_t partition) { inProgressSizes_[partition] = 0; } -void LocalPersistentShuffleWriter::collect( +void LocalShuffleWriter::collect( int32_t partition, std::string_view /* key */, std::string_view data) { @@ -137,7 +137,7 @@ void LocalPersistentShuffleWriter::collect( inProgressSizes_[partition] += size; } -void LocalPersistentShuffleWriter::noMoreData(bool success) { +void LocalShuffleWriter::noMoreData(bool success) { // Delete all shuffle files on failure. if (!success) { cleanup(); @@ -149,7 +149,7 @@ void LocalPersistentShuffleWriter::noMoreData(bool success) { } } -LocalPersistentShuffleReader::LocalPersistentShuffleReader( +LocalShuffleReader::LocalShuffleReader( const std::string& rootPath, const std::string& queryId, std::vector partitionIds, @@ -162,7 +162,7 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader( } folly::SemiFuture>> -LocalPersistentShuffleReader::next(size_t numBatches) { +LocalShuffleReader::next(size_t numBatches) { using TRowSize = uint32_t; if (readPartitionFiles_.empty()) { @@ -208,15 +208,14 @@ LocalPersistentShuffleReader::next(size_t numBatches) { return folly::makeSemiFuture(std::move(batches)); } -void LocalPersistentShuffleReader::noMoreData(bool success) { +void LocalShuffleReader::noMoreData(bool success) { // On failure, reset the index of the files to be read. if (!success) { readPartitionFileIndex_ = 0; } } -std::vector LocalPersistentShuffleReader::getReadPartitionFiles() - const { +std::vector LocalShuffleReader::getReadPartitionFiles() const { // Get rid of excess '/' characters in the path. auto trimmedRootPath = rootPath_; while (trimmedRootPath.length() > 0 && @@ -239,7 +238,7 @@ std::vector LocalPersistentShuffleReader::getReadPartitionFiles() return partitionFiles; } -void LocalPersistentShuffleWriter::cleanup() { +void LocalShuffleWriter::cleanup() { auto files = fileSystem_->list(rootPath_); for (auto& file : files) { fileSystem_->remove(file); @@ -276,7 +275,7 @@ std::shared_ptr LocalPersistentShuffleFactory::createReader( velox::memory::MemoryPool* pool) { const operators::LocalShuffleReadInfo readInfo = operators::LocalShuffleReadInfo::deserialize(serializedStr); - return std::make_shared( + return std::make_shared( readInfo.rootPath, readInfo.queryId, readInfo.partitionIds, pool); } @@ -287,7 +286,7 @@ std::shared_ptr LocalPersistentShuffleFactory::createWriter( SystemConfig::instance()->localShuffleMaxPartitionBytes(); const operators::LocalShuffleWriteInfo writeInfo = operators::LocalShuffleWriteInfo::deserialize(serializedStr); - return std::make_shared( + return std::make_shared( writeInfo.rootPath, writeInfo.queryId, writeInfo.shuffleId, diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h similarity index 96% rename from presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h rename to presto-native-execution/presto_cpp/main/operators/LocalShuffle.h index a5182a5f5e975..130e1c0f4a02a 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h +++ b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h @@ -69,9 +69,9 @@ struct LocalShuffleReadInfo { /// multi-process use scenarios as long as each producer or consumer is assigned /// to a distinct group of partition IDs. Each of them can create an instance of /// this class (pointing to the same root path) to read and write shuffle data. -class LocalPersistentShuffleWriter : public ShuffleWriter { +class LocalShuffleWriter : public ShuffleWriter { public: - LocalPersistentShuffleWriter( + LocalShuffleWriter( const std::string& rootPath, const std::string& queryId, uint32_t shuffleId, @@ -123,15 +123,16 @@ class LocalPersistentShuffleWriter : public ShuffleWriter { std::shared_ptr fileSystem_; }; -class LocalPersistentShuffleReader : public ShuffleReader { +class LocalShuffleReader : public ShuffleReader { public: - LocalPersistentShuffleReader( + LocalShuffleReader( const std::string& rootPath, const std::string& queryId, std::vector partitionIds, velox::memory::MemoryPool* pool); - folly::SemiFuture>> next(size_t numBatches) override; + folly::SemiFuture>> next( + size_t numBatches) override; void noMoreData(bool success) override; diff --git a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp index 2dd95b3c18016..21f5bc25bc727 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp @@ -14,7 +14,7 @@ #include #include "folly/init/Init.h" #include "presto_cpp/external/json/nlohmann/json.hpp" -#include "presto_cpp/main/operators/LocalPersistentShuffle.h" +#include "presto_cpp/main/operators/LocalShuffle.h" #include "presto_cpp/main/operators/PartitionAndSerialize.h" #include "presto_cpp/main/operators/ShuffleExchangeSource.h" #include "presto_cpp/main/operators/ShuffleRead.h" diff --git a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp index b6d322b2a0b90..b4a5bc4590b1e 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp @@ -15,7 +15,7 @@ #include "presto_cpp/main/common/tests/test_json.h" #include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" -#include "presto_cpp/main/operators/LocalPersistentShuffle.h" +#include "presto_cpp/main/operators/LocalShuffle.h" #include "presto_cpp/main/operators/PartitionAndSerialize.h" #include "presto_cpp/main/operators/ShuffleRead.h" #include "presto_cpp/main/operators/ShuffleWrite.h"