Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader(
fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr);
}

bool LocalPersistentShuffleReader::hasNext() {
BufferPtr LocalPersistentShuffleReader::next() {
if (readPartitionFiles_.empty()) {
readPartitionFiles_ = getReadPartitionFiles();
}

return readPartitionFileIndex_ < readPartitionFiles_.size();
}
if (readPartitionFileIndex_ >= readPartitionFiles_.size()) {
return nullptr;
}

BufferPtr LocalPersistentShuffleReader::next() {
auto filename = readPartitionFiles_[readPartitionFileIndex_];
const auto filename = readPartitionFiles_[readPartitionFileIndex_];
auto file = fileSystem_->openFileForRead(filename);
auto buffer = AlignedBuffer::allocate<char>(file->size(), pool_, 0);
file->pread(0, file->size(), buffer->asMutable<void>());
Expand Down Expand Up @@ -246,10 +246,7 @@ std::shared_ptr<ShuffleReader> LocalPersistentShuffleFactory::createReader(
const operators::LocalShuffleReadInfo readInfo =
operators::LocalShuffleReadInfo::deserialize(serializedStr);
return std::make_shared<operators::LocalPersistentShuffleReader>(
readInfo.rootPath,
readInfo.queryId,
readInfo.partitionIds,
pool);
readInfo.rootPath, readInfo.queryId, readInfo.partitionIds, pool);
}

std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ class LocalPersistentShuffleReader : public ShuffleReader {
std::vector<std::string> partitionIds_,
velox::memory::MemoryPool* FOLLY_NONNULL pool);

bool hasNext() override;

velox::BufferPtr next() override;

void noMoreData(bool success) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ class ShuffleReader {
public:
virtual ~ShuffleReader() = default;

/// Check by the reader to see if more blocks are available
virtual bool hasNext() = 0;
/// Deprecate, do not use!
virtual bool hasNext() {
return true;
}

/// Read the next block of data.
/// Reads the next block of data. The function returns null if it has read all
/// the data. The function throws if run into any error.
virtual velox::BufferPtr next() = 0;

/// Tell the shuffle system the reader is done. May be called with 'success'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,12 @@ UnsafeRowExchangeSource::request(
return folly::makeFuture(Response{0, true});
}

bool hasNext;
CALL_SHUFFLE(hasNext = shuffle_->hasNext(), "hasNext");

if (!hasNext) {
velox::BufferPtr buffer;
CALL_SHUFFLE(buffer = shuffle_->next(), "next");
if (buffer == nullptr) {
atEnd_ = true;
queue_->enqueueLocked(nullptr, promises);
} else {
velox::BufferPtr buffer;
CALL_SHUFFLE(buffer = shuffle_->next(), "next");
totalBytes = buffer->size();

++numBatches_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,12 @@ class TestShuffleReader : public ShuffleReader {
readyPartitions)
: partition_(partition), readyPartitions_(readyPartitions) {}

bool hasNext() override {
TestValue::adjust(
"facebook::presto::operators::test::TestShuffleReader::hasNext", this);
return !(*readyPartitions_)[partition_].empty();
}

BufferPtr next() override {
TestValue::adjust(
"facebook::presto::operators::test::TestShuffleReader::next", this);
VELOX_CHECK(!(*readyPartitions_)[partition_].empty());
if ((*readyPartitions_)[partition_].empty()) {
return nullptr;
}

auto buffer = (*readyPartitions_)[partition_].back();
(*readyPartitions_)[partition_].pop_back();
Expand Down Expand Up @@ -811,14 +807,6 @@ TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) {
params.planNode = exec::test::PlanBuilder()
.addNode(addShuffleReadNode(asRowType(data->type())))
.planNode();
{
SCOPED_TESTVALUE_SET(
"facebook::presto::operators::test::TestShuffleReader::hasNext",
injectFailure);

VELOX_ASSERT_THROW(
runShuffleReadTask(params, info), "ShuffleReader::hasNext failed");
}

{
SCOPED_TESTVALUE_SET(
Expand Down