Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,25 +162,29 @@ LocalShuffleReader::LocalShuffleReader(
}

folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
LocalShuffleReader::next(size_t numBatches) {
LocalShuffleReader::next(uint64_t maxBytes) {
using TRowSize = uint32_t;

if (readPartitionFiles_.empty()) {
readPartitionFiles_ = getReadPartitionFiles();
}

std::vector<std::unique_ptr<ReadBatch>> batches;
batches.reserve(numBatches);
uint64_t totalBytes{0};
// Read files until we reach maxBytes limit or run out of files.
while (readPartitionFileIndex_ < readPartitionFiles_.size()) {
const auto filename = readPartitionFiles_[readPartitionFileIndex_];
auto file = fileSystem_->openFileForRead(filename);
const auto fileSize = file->size();

for (size_t i = 0; i < numBatches; ++i) {
if (readPartitionFileIndex_ >= readPartitionFiles_.size()) {
// Stop if adding this file would exceed maxBytes (unless we haven't read
// any files yet)
if (!batches.empty() && totalBytes + fileSize > maxBytes) {
break;
}

const auto filename = readPartitionFiles_[readPartitionFileIndex_];
auto file = fileSystem_->openFileForRead(filename);
auto buffer = AlignedBuffer::allocate<char>(file->size(), pool_, 0);
file->pread(0, file->size(), buffer->asMutable<void>());
auto buffer = AlignedBuffer::allocate<char>(fileSize, pool_, 0);
file->pread(0, fileSize, buffer->asMutable<void>());
++readPartitionFileIndex_;

// Parse the buffer to extract individual rows.
Expand All @@ -201,6 +205,7 @@ LocalShuffleReader::next(size_t numBatches) {
offset += rowSize;
}

totalBytes += fileSize;
batches.push_back(
std::make_unique<ReadBatch>(std::move(rows), std::move(buffer)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class LocalShuffleReader : public ShuffleReader {
velox::memory::MemoryPool* pool);

folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(
size_t numBatches) override;
uint64_t maxBytes) override;

void noMoreData(bool success) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ namespace facebook::presto::operators {

folly::SemiFuture<ShuffleExchangeSource::Response>
ShuffleExchangeSource::request(
uint32_t /*maxBytes*/,
uint32_t maxBytes,
std::chrono::microseconds /*maxWait*/) {
auto nextBatch = [this]() {
return std::move(shuffleReader_->next(1))
auto nextBatch = [this, maxBytes]() {
return std::move(shuffleReader_->next(maxBytes))
.deferValue([this](std::vector<std::unique_ptr<ReadBatch>>&& batches) {
std::vector<velox::ContinuePromise> promises;
int64_t totalBytes{0};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ class ShuffleReader {
public:
virtual ~ShuffleReader() = default;

/// Fetch the next batch of rows from the shuffle reader.
/// @param bytes Maximum number of bytes to read in this batch.
/// @return A semi-future resolving to a vector of ReadBatch pointers, where
/// each ReadBatch contains rows and associated data buffers.
virtual folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(
size_t numBatches) = 0;
uint64_t maxBytes) = 0;

/// Tell the shuffle system the reader is done. May be called with 'success'
/// true before reading all the data. This happens when a query has a LIMIT or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,25 @@ class TestShuffleReader : public ShuffleReader {
: partition_(partition), readyPartitions_(readyPartitions) {}

folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(
size_t numBatches) override {
uint64_t maxBytes) override {
VELOX_CHECK_GT(maxBytes, 0, "maxBytes must be greater than 0");
TestValue::adjust(
"facebook::presto::operators::test::TestShuffleReader::next", this);
std::vector<std::unique_ptr<ReadBatch>> result;
auto& partitionBatches = (*readyPartitions_)[partition_];

for (size_t i = 0; i < numBatches && !partitionBatches.empty(); ++i) {
if (!partitionBatches.empty()) {
result.push_back(std::move(partitionBatches.back()));
partitionBatches.pop_back();
}

for (size_t totalBytes = 0;
totalBytes < maxBytes && !partitionBatches.empty();) {
result.push_back(std::move(partitionBatches.back()));
totalBytes += result.back()->data->size();
partitionBatches.pop_back();
}

return folly::makeSemiFuture(std::move(result));
}

Expand Down Expand Up @@ -355,6 +363,7 @@ class ShuffleTest : public exec::test::OperatorTestBase {
protected:
void SetUp() override {
exec::test::OperatorTestBase::SetUp();
velox::filesystems::registerLocalFileSystem();
ShuffleInterfaceFactory::registerFactory(
std::string(TestShuffleFactory::kShuffleName),
std::make_unique<TestShuffleFactory>());
Expand Down Expand Up @@ -713,8 +722,6 @@ class ShuffleTest : public exec::test::OperatorTestBase {
{"c16", MAP(TINYINT(), REAL())},
});

// Create a local file system storage based shuffle.
velox::filesystems::registerLocalFileSystem();
auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
auto rootPath = rootDirectory->getPath();
const std::string shuffleWriteInfo =
Expand Down Expand Up @@ -1176,8 +1183,6 @@ TEST_F(ShuffleTest, persistentShuffle) {
uint32_t numPartitions = 1;
uint32_t numMapDrivers = 1;

// Create a local file system storage based shuffle.
velox::filesystems::registerLocalFileSystem();
auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
auto rootPath = rootDirectory->getPath();

Expand Down Expand Up @@ -1205,6 +1210,92 @@ TEST_F(ShuffleTest, persistentShuffle) {
cleanupDirectory(rootPath);
}

TEST_F(ShuffleTest, persistentShuffleBatch) {
auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
auto rootPath = rootDirectory->getPath();

const uint32_t numPartitions = 1;
const uint32_t partition = 0;

const size_t numRows{20};
std::vector<std::string> values{numRows};
std::vector<std::string_view> views{numRows};
const size_t rowSize{64};
for (auto i = 0; i < numRows; ++i) {
values[i] = std::string(rowSize, 'a' + i % 26);
views[i] = values[i];
}

LocalShuffleWriteInfo writeInfo = LocalShuffleWriteInfo::deserialize(
localShuffleWriteInfo(rootPath, numPartitions));

auto writer = std::make_shared<LocalShuffleWriter>(
writeInfo.rootPath,
writeInfo.queryId,
writeInfo.shuffleId,
writeInfo.numPartitions,
/*maxBytesPerPartition=*/1,
pool());

// Serialize and write the data
for (auto i = 0; i < numRows; ++i) {
writer->collect(partition, std::string_view{}, views[i]);
}
writer->noMoreData(true);

// Create reader
LocalShuffleReadInfo readInfo = LocalShuffleReadInfo::deserialize(
localShuffleReadInfo(rootPath, numPartitions, partition));

struct {
uint64_t maxBytes;
int expectedOutputCalls;

std::string debugString() const {
return fmt::format(
"maxBytes: {}, expectedOutputCalls: {}",
maxBytes,
expectedOutputCalls);
}
} testSettings[] = {{1, numRows}, {100, numRows}, {1 << 25, 1}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto reader = std::make_shared<LocalShuffleReader>(
readInfo.rootPath, readInfo.queryId, readInfo.partitionIds, pool());

int numOutputCalls{0};
int numBatches{0};
int totalRows{0};
// Read all batches
while (true) {
auto batches = reader->next(testData.maxBytes)
.via(folly::getGlobalCPUExecutor())
.get();
if (batches.empty()) {
break;
}

++numOutputCalls;
numBatches += batches.size();
for (const auto& batch : batches) {
totalRows += batch->rows.size();
}
}

reader->noMoreData(true);

// Verify we read all rows.
ASSERT_EQ(totalRows, numRows);
// ASSERT_EQ(numBatches, numRows);
// Verify number of output batches.
ASSERT_EQ(numOutputCalls, testData.expectedOutputCalls);
}

cleanupDirectory(rootPath);
}

TEST_F(ShuffleTest, persistentShuffleFuzz) {
fuzzerTest(false, 1);
fuzzerTest(false, 3);
Expand Down
Loading