Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,43 +161,51 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader(
fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr);
}

folly::SemiFuture<std::unique_ptr<ReadBatch>> LocalPersistentShuffleReader::next() {
folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
LocalPersistentShuffleReader::next(size_t numBatches) {
using TRowSize = uint32_t;

if (readPartitionFiles_.empty()) {
readPartitionFiles_ = getReadPartitionFiles();
}

if (readPartitionFileIndex_ >= readPartitionFiles_.size()) {
return folly::makeSemiFuture(std::unique_ptr<ReadBatch>{});
}
std::vector<std::unique_ptr<ReadBatch>> batches;
batches.reserve(numBatches);

for (size_t i = 0; i < numBatches; ++i) {
if (readPartitionFileIndex_ >= readPartitionFiles_.size()) {
break;
}

const auto filename = readPartitionFiles_[readPartitionFileIndex_];
auto file = fileSystem_->openFileForRead(filename);
auto buffer = AlignedBuffer::allocate<char>(file->size(), pool_, 0);
file->pread(0, file->size(), buffer->asMutable<void>());
++readPartitionFileIndex_;

// Parse the buffer to extract individual rows.
// Each row is stored as: | row-size (4 bytes) | row-data (row-size bytes) |
std::vector<std::string_view> rows;
const char* data = buffer->as<char>();
size_t offset = 0;
const size_t totalSize = buffer->size();

while (offset + sizeof(TRowSize) <= totalSize) {
// Read row size (stored in big endian).
const TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset));
offset += sizeof(TRowSize);

VELOX_CHECK_LE(offset + rowSize, totalSize, "Invalid row data: row size");
// Create a Row with empty key and the row data as value.
rows.emplace_back(std::string_view{data + offset, rowSize});
offset += rowSize;
}

const auto filename = readPartitionFiles_[readPartitionFileIndex_];
auto file = fileSystem_->openFileForRead(filename);
auto buffer = AlignedBuffer::allocate<char>(file->size(), pool_, 0);
file->pread(0, file->size(), buffer->asMutable<void>());
++readPartitionFileIndex_;

// Parse the buffer to extract individual rows.
// Each row is stored as: | row-size (4 bytes) | row-data (row-size bytes) |
std::vector<std::string_view> rows;
const char* data = buffer->as<char>();
size_t offset = 0;
const size_t totalSize = buffer->size();

while (offset + sizeof(TRowSize) <= totalSize) {
// Read row size (stored in big endian).
const TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset));
offset += sizeof(TRowSize);

VELOX_CHECK_LE(offset + rowSize, totalSize, "Invalid row data: row size");
// Create a Row with empty key and the row data as value.
rows.emplace_back(std::string_view{data + offset, rowSize});
offset += rowSize;
batches.push_back(
std::make_unique<ReadBatch>(std::move(rows), std::move(buffer)));
}

return folly::makeSemiFuture<std::unique_ptr<ReadBatch>>(
std::make_unique<ReadBatch>(std::move(rows), std::move(buffer)));
return folly::makeSemiFuture(std::move(batches));
}

void LocalPersistentShuffleReader::noMoreData(bool success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class LocalPersistentShuffleReader : public ShuffleReader {
std::vector<std::string> partitionIds,
velox::memory::MemoryPool* pool);

folly::SemiFuture<std::unique_ptr<ReadBatch>> next() override;
folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(size_t numBatches) override;

void noMoreData(bool success) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,24 @@ ShuffleExchangeSource::request(
uint32_t /*maxBytes*/,
std::chrono::microseconds /*maxWait*/) {
auto nextBatch = [this]() {
return std::move(shuffleReader_->next())
.deferValue([this](std::unique_ptr<ReadBatch> batch) {
return std::move(shuffleReader_->next(1))
.deferValue([this](std::vector<std::unique_ptr<ReadBatch>>&& batches) {
std::vector<velox::ContinuePromise> promises;
int64_t totalBytes{0};
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (batch == nullptr) {
if (batches.empty()) {
atEnd_ = true;
queue_->enqueueLocked(nullptr, promises);
} else {
totalBytes = batch->data->size();
for (auto& batch : batches) {
totalBytes = batch->data->size();
VELOX_CHECK_LE(totalBytes, std::numeric_limits<int32_t>::max());
++numBatches_;
queue_->enqueueLocked(
std::make_unique<ShuffleRowBatch>(std::move(batch)),
promises);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ class ShuffleReader {
public:
virtual ~ShuffleReader() = default;

/// Reads the next block of data. The function returns null if it has read all
/// the data. The function throws if run into any error.
virtual folly::SemiFuture<std::unique_ptr<ReadBatch>> next() = 0;
virtual folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(
size_t numBatches) = 0;

/// Tell the shuffle system the reader is done. May be called with 'success'
/// true before reading all the data. This happens when a query has a LIMIT or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,19 @@ class TestShuffleReader : public ShuffleReader {
readyPartitions)
: partition_(partition), readyPartitions_(readyPartitions) {}

folly::SemiFuture<std::unique_ptr<ReadBatch>> next() override {
folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(
size_t numBatches) override {
TestValue::adjust(
"facebook::presto::operators::test::TestShuffleReader::next", this);
if ((*readyPartitions_)[partition_].empty()) {
return folly::makeSemiFuture(std::unique_ptr<ReadBatch>(nullptr));
std::vector<std::unique_ptr<ReadBatch>> result;
auto& partitionBatches = (*readyPartitions_)[partition_];

for (size_t i = 0; i < numBatches && !partitionBatches.empty(); ++i) {
result.push_back(std::move(partitionBatches.back()));
partitionBatches.pop_back();
}
auto readBatch = std::move((*readyPartitions_)[partition_].back());
(*readyPartitions_)[partition_].pop_back();
return folly::makeSemiFuture<std::unique_ptr<ReadBatch>>(
std::move(readBatch));

return folly::makeSemiFuture(std::move(result));
}

void noMoreData(bool success) override {
Expand Down
Loading