Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "presto_cpp/main/http/HttpServer.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/StatsFilter.h"
#include "presto_cpp/main/operators/BroadcastExchangeSource.h"
#include "presto_cpp/main/operators/BroadcastWrite.h"
#include "presto_cpp/main/operators/LocalPersistentShuffle.h"
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/operators/ShuffleInterface.h"
Expand Down Expand Up @@ -290,6 +292,10 @@ void PrestoServer::run() {
facebook::velox::exec::ExchangeSource::registerFactory(
operators::UnsafeRowExchangeSource::createExchangeSource);

// Batch broadcast exchange source.
facebook::velox::exec::ExchangeSource::registerFactory(
operators::BroadcastExchangeSource::createExchangeSource);

pool_ = velox::memory::addDefaultLeafMemoryPool();
taskManager_ = std::make_unique<TaskManager>();

Expand Down Expand Up @@ -672,6 +678,12 @@ void PrestoServer::registerCustomOperators() {
std::make_unique<facebook::presto::operators::ShuffleWriteTranslator>());
facebook::velox::exec::Operator::registerOperator(
std::make_unique<operators::ShuffleReadTranslator>());

// Todo - Split Presto & Presto-on-Spark server into different classes
// which will allow server specific operator registration.
facebook::velox::exec::Operator::registerOperator(
std::make_unique<
facebook::presto::operators::BroadcastWriteTranslator>());
}

void PrestoServer::registerFunctions() {
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
protocol::PlanFragment prestoPlan = json::parse(fragment);

auto serializedShuffleWriteInfo = batchUpdateRequest.shuffleWriteInfo;
auto broadcastBasePath = batchUpdateRequest.broadcastBasePath;
auto shuffleName = SystemConfig::instance()->shuffleName();
if (serializedShuffleWriteInfo) {
VELOX_USER_CHECK(
Expand All @@ -253,6 +254,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
VeloxBatchQueryPlanConverter converter(
shuffleName,
std::move(serializedShuffleWriteInfo),
std::move(broadcastBasePath),
queryCtx.get(),
pool_);
auto planFragment = converter.toVeloxQueryPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,22 @@ BroadcastExchangeSource::createExchangeSource(
return nullptr;
}

std::unique_ptr<BroadcastInfo> broadcastInfo;
std::unique_ptr<BroadcastFileInfo> broadcastFileInfo;
try {
broadcastInfo = BroadcastInfo::deserialize(broadcastInfoJson.value());
broadcastFileInfo =
BroadcastFileInfo::deserialize(broadcastInfoJson.value());
} catch (const VeloxException& e) {
throw;
} catch (const std::exception& e) {
VELOX_USER_FAIL("BroadcastInfo deserialization failed: {}", e.what());
}

auto fileSystemBroadcast = BroadcastFactory(broadcastInfo->basePath_);
auto fileSystemBroadcast = BroadcastFactory(broadcastFileInfo->filePath_);
return std::make_unique<BroadcastExchangeSource>(
uri.host(),
destination,
std::move(queue),
fileSystemBroadcast.createReader(broadcastInfo->fileInfos_, pool),
fileSystemBroadcast.createReader(std::move(broadcastFileInfo), pool),
pool);
}
}; // namespace facebook::presto::operators
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,22 @@ std::unique_ptr<BroadcastFileWriter> BroadcastFactory::createWriter(
}

std::shared_ptr<BroadcastFileReader> BroadcastFactory::createReader(
const std::vector<BroadcastFileInfo> fileInfos,
std::unique_ptr<BroadcastFileInfo> fileInfo,
velox::memory::MemoryPool* pool) {
auto broadcastFileReader = std::make_shared<BroadcastFileReader>(
std::move(fileInfos), fileSystem_, pool);
auto broadcastFileReader =
std::make_shared<BroadcastFileReader>(fileInfo, fileSystem_, pool);
return broadcastFileReader;
}

// static
std::unique_ptr<BroadcastInfo> BroadcastInfo::deserialize(
std::unique_ptr<BroadcastFileInfo> BroadcastFileInfo::deserialize(
const std::string& info) {
const auto root = nlohmann::json::parse(info);
std::vector<BroadcastFileInfo> broadcastFileInfos;

for (auto& fileInfo : root["fileInfos"]) {
BroadcastFileInfo broadcastFileInfo;
fileInfo.at("filePath").get_to(broadcastFileInfo.filePath_);
broadcastFileInfos.emplace_back(broadcastFileInfo);
}
return std::make_unique<BroadcastInfo>(root["basePath"], broadcastFileInfos);
auto broadcastFileInfo = std::make_unique<BroadcastFileInfo>();
root.at("filePath").get_to(broadcastFileInfo->filePath_);
return broadcastFileInfo;
}

BroadcastInfo::BroadcastInfo(
std::string basePath,
std::vector<BroadcastFileInfo> fileInfos)
: basePath_(basePath), fileInfos_(fileInfos) {}

BroadcastFileWriter::BroadcastFileWriter(
std::string_view filename,
const RowTypePtr& inputType,
Expand Down Expand Up @@ -142,39 +132,34 @@ void BroadcastFileWriter::write(const RowVectorPtr& rowVector) {
}

BroadcastFileReader::BroadcastFileReader(
std::vector<BroadcastFileInfo> broadcastFileInfos,
std::unique_ptr<BroadcastFileInfo>& broadcastFileInfo,
std::shared_ptr<velox::filesystems::FileSystem> fileSystem,
velox::memory::MemoryPool* pool)
: broadcastFileInfos_(broadcastFileInfos),
: broadcastFileInfo_(std::move(broadcastFileInfo)),
fileSystem_(fileSystem),
readfileIndex_(0),
numFiles_(0),
hasData_(true),
numBytes_(0),
pool_(pool) {}

bool BroadcastFileReader::hasNext() {
return readfileIndex_ < broadcastFileInfos_.size();
return hasData_;
}

velox::BufferPtr BroadcastFileReader::next() {
if (!hasNext()) {
return nullptr;
}

auto readFile = fileSystem_->openFileForRead(
broadcastFileInfos_[readfileIndex_].filePath_);
auto readFile = fileSystem_->openFileForRead(broadcastFileInfo_->filePath_);
auto buffer = AlignedBuffer::allocate<char>(readFile->size(), pool_, 0);
readFile->pread(0, readFile->size(), buffer->asMutable<char>());
++readfileIndex_;
++numFiles_;
numBytes_ += readFile->size();
hasData_ = false;
return buffer;
}

folly::F14FastMap<std::string, int64_t> BroadcastFileReader::stats() {
return {
{"broadcastExchangeSource.numFiles", numFiles_},
{"broadcastExchangeSource.numBytes", numBytes_}};
return {{"broadcastExchangeSource.numBytes", numBytes_}};
}

} // namespace facebook::presto::operators
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ using namespace facebook::velox;

namespace facebook::presto::operators {

/// Struct for single broadcast file info.
/// Struct for single broadcast file info.]
// This struct is a 1:1 strict API mapping to
// presto-spark-base/src/main/java/com/facebook/presto/spark/execution/BroadcastFileInfo.java
// Please refrain from making changes to this API class. If any changes have to
// be made to this struct, one should make sure to make corresponding changes in
// the above Java classes and its corresponding serde functionalities.
struct BroadcastFileInfo {
std::string filePath_;
// TODO: Add additional stats including checksum, num rows, size.
};

/// Struct for broadcastInfo in split location.
struct BroadcastInfo {
/// Deserializes JSON string representing BroadcastInfo.
static std::unique_ptr<BroadcastInfo> deserialize(const std::string& info);

BroadcastInfo(std::string basePath, std::vector<BroadcastFileInfo> fileInfos);

std::string basePath_;
std::vector<BroadcastFileInfo> fileInfos_;
static std::unique_ptr<BroadcastFileInfo> deserialize(
const std::string& info);
};

/// Writes broadcast data to a file.
Expand Down Expand Up @@ -81,7 +78,7 @@ class BroadcastFileWriter {
class BroadcastFileReader {
public:
BroadcastFileReader(
std::vector<BroadcastFileInfo> broadcastFileInfos,
std::unique_ptr<BroadcastFileInfo>& broadcastFileInfo,
std::shared_ptr<velox::filesystems::FileSystem> fileSystem,
velox::memory::MemoryPool* pool);

Expand All @@ -93,14 +90,13 @@ class BroadcastFileReader {
/// Read next block of data.
velox::BufferPtr next();

/// Reader stats - number of files, number of bytes.
/// Reader stats - number of bytes.
folly::F14FastMap<std::string, int64_t> stats();

private:
std::vector<BroadcastFileInfo> broadcastFileInfos_;
std::unique_ptr<BroadcastFileInfo> broadcastFileInfo_;
std::shared_ptr<velox::filesystems::FileSystem> fileSystem_;
int32_t readfileIndex_;
int64_t numFiles_;
bool hasData_;
int64_t numBytes_;
velox::memory::MemoryPool* pool_;
};
Expand All @@ -117,7 +113,7 @@ class BroadcastFactory {
const RowTypePtr& inputType);

std::shared_ptr<BroadcastFileReader> createReader(
const std::vector<BroadcastFileInfo> fileInfos,
const std::unique_ptr<BroadcastFileInfo> fileInfo,
velox::memory::MemoryPool* pool);

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,22 @@ class BroadcastTest : public exec::test::OperatorTestBase {
fmt::format(kBroadcastFileInfoFormat, broadcastFilePath));
}

bool noMoreSplits = false;
uint8_t splitIndex = 0;
// Read back result using BroadcastExchangeSource.
return readCursor(broadcastReadParams, [&](auto* task) {
if (noMoreSplits) {
if (splitIndex >= broadcastFilePaths.size()) {
task->noMoreSplits("0");
return;
}

auto split = exec::Split(
std::make_shared<exec::RemoteConnectorSplit>(fmt::format(
"batch://task?broadcastInfo={{\"basePath\": \"{}\", \"fileInfos\":[{}]}}",
basePath,
boost::algorithm::join(fileInfos, ","))),
"batch://task?broadcastInfo={}",
fmt::format(
kBroadcastFileInfoFormat, broadcastFilePaths[splitIndex]))),
-1);
task->addSplit("0", std::move(split));
task->noMoreSplits("0");
noMoreSplits = true;
++splitIndex;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2700,9 +2700,25 @@ velox::core::PlanFragment VeloxBatchQueryPlanConverter::toVeloxQueryPlan(
VELOX_USER_CHECK_NOT_NULL(
partitionedOutputNode, "PartitionedOutputNode is required");

VELOX_USER_CHECK(
!partitionedOutputNode->isBroadcast(),
"Broadcast shuffle is not supported");
if (partitionedOutputNode->isBroadcast()) {
VELOX_USER_CHECK_NOT_NULL(
broadcastBasePath_, "broadcastBasePath is required");
// TODO - Use original plan node with root node and aggregate operator
// stats for additional nodes.
auto broadcastWriteNode = std::make_shared<operators::BroadcastWriteNode>(
"broadcast-write",
*broadcastBasePath_,
core::LocalPartitionNode::gather(
"broadcast-write-gather",
std::vector<core::PlanNodePtr>{partitionedOutputNode->sources()}));

planFragment.planNode = core::PartitionedOutputNode::broadcast(
"partitioned-output",
1,
broadcastWriteNode->outputType(),
{broadcastWriteNode});
return planFragment;
}

// If the serializedShuffleWriteInfo is not nullptr, it means this fragment
// ends with a shuffle stage. We convert the PartitionedOutputNode to a
Expand Down Expand Up @@ -2746,6 +2762,11 @@ velox::core::PlanNodePtr VeloxBatchQueryPlanConverter::toVeloxQueryPlan(
const std::shared_ptr<protocol::TableWriteInfo>& /* tableWriteInfo */,
const protocol::TaskId& taskId) {
auto rowType = toRowType(node->outputVariables);
// Broadcast exchange source.
if (node->exchangeType == protocol::ExchangeNodeType::REPLICATE) {
return std::make_shared<core::ExchangeNode>(node->id, rowType);
}
// Partitioned shuffle exchange source.
return std::make_shared<operators::ShuffleReadNode>(node->id, rowType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,13 @@ class VeloxBatchQueryPlanConverter : public VeloxQueryPlanConverterBase {
VeloxBatchQueryPlanConverter(
const std::string& shuffleName,
std::shared_ptr<std::string>&& serializedShuffleWriteInfo,
std::shared_ptr<std::string>&& broadcastBasePath,
velox::core::QueryCtx* queryCtx,
velox::memory::MemoryPool* pool)
: VeloxQueryPlanConverterBase(queryCtx, pool),
shuffleName_(shuffleName),
serializedShuffleWriteInfo_(std::move(serializedShuffleWriteInfo)) {}
serializedShuffleWriteInfo_(std::move(serializedShuffleWriteInfo)),
broadcastBasePath_(std::move(broadcastBasePath)) {}

velox::core::PlanFragment toVeloxQueryPlan(
const protocol::PlanFragment& fragment,
Expand All @@ -252,6 +254,7 @@ class VeloxBatchQueryPlanConverter : public VeloxQueryPlanConverterBase {
private:
const std::string shuffleName_;
const std::shared_ptr<std::string> serializedShuffleWriteInfo_;
const std::shared_ptr<std::string> broadcastBasePath_;
};

void registerPrestoPlanNodeSerDe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ std::shared_ptr<const core::PlanNode> assertToVeloxQueryPlan(
std::shared_ptr<const core::PlanNode> assertToBatchVeloxQueryPlan(
const std::string& fileName,
const std::string& shuffleName,
std::shared_ptr<std::string>&& serializedShuffleWriteInfo) {
std::shared_ptr<std::string>&& serializedShuffleWriteInfo,
std::shared_ptr<std::string>&& broadcastBasePath) {
const std::string fragment = slurp(getDataPath(fileName));

protocol::PlanFragment prestoPlan = json::parse(fragment);
Expand All @@ -84,6 +85,7 @@ std::shared_ptr<const core::PlanNode> assertToBatchVeloxQueryPlan(
VeloxBatchQueryPlanConverter converter(
shuffleName,
std::move(serializedShuffleWriteInfo),
std::move(broadcastBasePath),
queryCtx.get(),
pool.get());
return converter
Expand Down Expand Up @@ -165,7 +167,8 @@ TEST_F(PlanConverterTest, batchPlanConversion) {
" \"numPartitions\": {}\n"
"}}",
exec::test::TempDirectoryPath::create()->path,
10)));
10)),
std::make_shared<std::string>("/tmp"));

auto shuffleWrite =
std::dynamic_pointer_cast<const operators::ShuffleWriteNode>(root);
Expand All @@ -187,7 +190,8 @@ TEST_F(PlanConverterTest, batchPlanConversion) {
auto curNode = assertToBatchVeloxQueryPlan(
"FinalAgg.json",
std::string(operators::LocalPersistentShuffleFactory::kShuffleName),
nullptr);
nullptr,
std::make_shared<std::string>("/tmp"));

std::shared_ptr<const operators::ShuffleReadNode> shuffleReadNode;
while (!curNode->sources().empty()) {
Expand Down
Loading