1111 * See the License for the specific language governing permissions and
1212 * limitations under the License.
1313 */
14- #include " presto_cpp/main/operators/LocalPersistentShuffle .h"
14+ #include " presto_cpp/main/operators/LocalShuffle .h"
1515#include " presto_cpp/external/json/nlohmann/json.hpp"
1616#include " presto_cpp/main/common/Configs.h"
1717
@@ -45,7 +45,7 @@ inline std::string createShuffleFileName(
4545const static std::string kReadyForReadFilename = " readyForRead" ;
4646} // namespace
4747
48- LocalPersistentShuffleWriter::LocalPersistentShuffleWriter (
48+ LocalShuffleWriter::LocalShuffleWriter (
4949 const std::string& rootPath,
5050 const std::string& queryId,
5151 uint32_t shuffleId,
@@ -67,13 +67,13 @@ LocalPersistentShuffleWriter::LocalPersistentShuffleWriter(
6767 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
6868}
6969
70- std::unique_ptr<velox::WriteFile>
71- LocalPersistentShuffleWriter::getNextOutputFile ( int32_t partition) {
70+ std::unique_ptr<velox::WriteFile> LocalShuffleWriter::getNextOutputFile (
71+ int32_t partition) {
7272 auto filename = nextAvailablePartitionFileName (rootPath_, partition);
7373 return fileSystem_->openFileForWrite (filename);
7474}
7575
76- std::string LocalPersistentShuffleWriter ::nextAvailablePartitionFileName (
76+ std::string LocalShuffleWriter ::nextAvailablePartitionFileName (
7777 const std::string& root,
7878 int32_t partition) const {
7979 int fileCount = 0 ;
@@ -92,7 +92,7 @@ std::string LocalPersistentShuffleWriter::nextAvailablePartitionFileName(
9292 return filename;
9393}
9494
95- void LocalPersistentShuffleWriter ::storePartitionBlock (int32_t partition) {
95+ void LocalShuffleWriter ::storePartitionBlock (int32_t partition) {
9696 auto & buffer = inProgressPartitions_[partition];
9797 auto file = getNextOutputFile (partition);
9898 file->append (
@@ -102,7 +102,7 @@ void LocalPersistentShuffleWriter::storePartitionBlock(int32_t partition) {
102102 inProgressSizes_[partition] = 0 ;
103103}
104104
105- void LocalPersistentShuffleWriter ::collect (
105+ void LocalShuffleWriter ::collect (
106106 int32_t partition,
107107 std::string_view /* key */ ,
108108 std::string_view data) {
@@ -137,7 +137,7 @@ void LocalPersistentShuffleWriter::collect(
137137 inProgressSizes_[partition] += size;
138138}
139139
140- void LocalPersistentShuffleWriter ::noMoreData (bool success) {
140+ void LocalShuffleWriter ::noMoreData (bool success) {
141141 // Delete all shuffle files on failure.
142142 if (!success) {
143143 cleanup ();
@@ -149,7 +149,7 @@ void LocalPersistentShuffleWriter::noMoreData(bool success) {
149149 }
150150}
151151
152- LocalPersistentShuffleReader::LocalPersistentShuffleReader (
152+ LocalShuffleReader::LocalShuffleReader (
153153 const std::string& rootPath,
154154 const std::string& queryId,
155155 std::vector<std::string> partitionIds,
@@ -162,7 +162,7 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader(
162162}
163163
164164folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
165- LocalPersistentShuffleReader ::next (size_t numBatches) {
165+ LocalShuffleReader ::next (size_t numBatches) {
166166 using TRowSize = uint32_t ;
167167
168168 if (readPartitionFiles_.empty ()) {
@@ -208,15 +208,14 @@ LocalPersistentShuffleReader::next(size_t numBatches) {
208208 return folly::makeSemiFuture (std::move (batches));
209209}
210210
211- void LocalPersistentShuffleReader ::noMoreData (bool success) {
211+ void LocalShuffleReader ::noMoreData (bool success) {
212212 // On failure, reset the index of the files to be read.
213213 if (!success) {
214214 readPartitionFileIndex_ = 0 ;
215215 }
216216}
217217
218- std::vector<std::string> LocalPersistentShuffleReader::getReadPartitionFiles ()
219- const {
218+ std::vector<std::string> LocalShuffleReader::getReadPartitionFiles () const {
220219 // Get rid of excess '/' characters in the path.
221220 auto trimmedRootPath = rootPath_;
222221 while (trimmedRootPath.length () > 0 &&
@@ -239,7 +238,7 @@ std::vector<std::string> LocalPersistentShuffleReader::getReadPartitionFiles()
239238 return partitionFiles;
240239}
241240
242- void LocalPersistentShuffleWriter ::cleanup () {
241+ void LocalShuffleWriter ::cleanup () {
243242 auto files = fileSystem_->list (rootPath_);
244243 for (auto & file : files) {
245244 fileSystem_->remove (file);
@@ -276,7 +275,7 @@ std::shared_ptr<ShuffleReader> LocalPersistentShuffleFactory::createReader(
276275 velox::memory::MemoryPool* pool) {
277276 const operators::LocalShuffleReadInfo readInfo =
278277 operators::LocalShuffleReadInfo::deserialize (serializedStr);
279- return std::make_shared<operators::LocalPersistentShuffleReader >(
278+ return std::make_shared<operators::LocalShuffleReader >(
280279 readInfo.rootPath , readInfo.queryId , readInfo.partitionIds , pool);
281280}
282281
@@ -287,7 +286,7 @@ std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter(
287286 SystemConfig::instance ()->localShuffleMaxPartitionBytes ();
288287 const operators::LocalShuffleWriteInfo writeInfo =
289288 operators::LocalShuffleWriteInfo::deserialize (serializedStr);
290- return std::make_shared<operators::LocalPersistentShuffleWriter >(
289+ return std::make_shared<operators::LocalShuffleWriter >(
291290 writeInfo.rootPath ,
292291 writeInfo.queryId ,
293292 writeInfo.shuffleId ,
0 commit comments