Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,21 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader(
fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr);
}

BufferPtr LocalPersistentShuffleReader::next() {
folly::SemiFuture<BufferPtr> LocalPersistentShuffleReader::next() {
if (readPartitionFiles_.empty()) {
readPartitionFiles_ = getReadPartitionFiles();
}

if (readPartitionFileIndex_ >= readPartitionFiles_.size()) {
return nullptr;
return folly::makeSemiFuture<BufferPtr>(BufferPtr{});
}

const auto filename = readPartitionFiles_[readPartitionFileIndex_];
auto file = fileSystem_->openFileForRead(filename);
auto buffer = AlignedBuffer::allocate<char>(file->size(), pool_, 0);
file->pread(0, file->size(), buffer->asMutable<void>());
++readPartitionFileIndex_;
return buffer;
return folly::makeSemiFuture<BufferPtr>(std::move(buffer));
}

void LocalPersistentShuffleReader::noMoreData(bool success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class LocalPersistentShuffleReader : public ShuffleReader {
std::vector<std::string> partitionIds_,
velox::memory::MemoryPool* FOLLY_NONNULL pool);

velox::BufferPtr next() override;
folly::SemiFuture<velox::BufferPtr> next() override;

void noMoreData(bool success) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ShuffleReader {

/// Reads the next block of data. The function returns null if it has read all
/// the data. The function throws if run into any error.
virtual velox::BufferPtr next() = 0;
virtual folly::SemiFuture<velox::BufferPtr> next() = 0;

/// Tell the shuffle system the reader is done. May be called with 'success'
/// true before reading all the data. This happens when a query has a LIMIT or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,45 @@ folly::SemiFuture<UnsafeRowExchangeSource::Response>
UnsafeRowExchangeSource::request(
uint32_t /*maxBytes*/,
uint32_t /*maxWaitSeconds*/) {
std::vector<velox::ContinuePromise> promises;
int64_t totalBytes = 0;
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (atEnd_) {
Comment thread
patrickstuedi marked this conversation as resolved.
Outdated
return folly::makeFuture(Response{0, true});
}
auto nextBatch = [this]() {
return std::move(shuffle_->next())
.deferValue([this](velox::BufferPtr buffer) {
std::vector<velox::ContinuePromise> promises;
int64_t totalBytes = 0;

velox::BufferPtr buffer;
CALL_SHUFFLE(buffer = shuffle_->next(), "next");
if (buffer == nullptr) {
atEnd_ = true;
queue_->enqueueLocked(nullptr, promises);
} else {
totalBytes = buffer->size();

++numBatches_;

auto ioBuf = folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size());
// NOTE: SerializedPage's onDestructionCb_ captures one reference on
// 'buffer' to keep its alive until SerializedPage destruction. Also note
// that 'buffer' should have been allocated from memory pool. Hence, we
// don't need to update the memory usage counting for the associated
// 'ioBuf' attached to SerializedPage on destruction.
queue_->enqueueLocked(
std::make_unique<velox::exec::SerializedPage>(
std::move(ioBuf), [buffer](auto& /*unused*/) {}),
promises);
}
}
for (auto& promise : promises) {
promise.setValue();
}
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (buffer == nullptr) {
atEnd_ = true;
queue_->enqueueLocked(nullptr, promises);
} else {
totalBytes = buffer->size();

++numBatches_;

auto ioBuf =
folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size());
queue_->enqueueLocked(
std::make_unique<velox::exec::SerializedPage>(
std::move(ioBuf), [buffer](auto& /*unused*/) {}),
promises);
}
}

for (auto& promise : promises) {
promise.setValue();
}

return folly::makeFuture(Response{totalBytes, atEnd_});
})
.deferError(
[](folly::exception_wrapper e) mutable
-> UnsafeRowExchangeSource::Response {
VELOX_FAIL("ShuffleReader::{} failed: {}", "next", e.what());
});
};

return folly::makeFuture(Response{totalBytes, atEnd_});
CALL_SHUFFLE(return nextBatch(), "next");
}

folly::F14FastMap<std::string, int64_t> UnsafeRowExchangeSource::stats() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;

void close() override {}
void close() override {
shuffle_->noMoreData(true);
}

folly::F14FastMap<std::string, int64_t> stats() const override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,17 @@ class TestShuffleReader : public ShuffleReader {
readyPartitions)
: partition_(partition), readyPartitions_(readyPartitions) {}

BufferPtr next() override {
folly::SemiFuture<BufferPtr> next() override {
TestValue::adjust(
"facebook::presto::operators::test::TestShuffleReader::next", this);
if ((*readyPartitions_)[partition_].empty()) {
return nullptr;
BufferPtr buffer = nullptr;
return folly::makeSemiFuture<BufferPtr>(std::move(buffer));
}

auto buffer = (*readyPartitions_)[partition_].back();
(*readyPartitions_)[partition_].pop_back();
return buffer;
return folly::makeSemiFuture<BufferPtr>(std::move(buffer));
}

void noMoreData(bool success) override {
Expand Down