Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <limits>
#if __has_include("filesystem")
#include <filesystem>
namespace fs = std::filesystem;
Expand Down Expand Up @@ -143,6 +144,7 @@ SystemConfig::SystemConfig() {
NUM_PROP(kMaxDriversPerTask, hardwareConcurrency()),
NONE_PROP(kTaskWriterCount),
NONE_PROP(kTaskPartitionedWriterCount),
NONE_PROP(kTaskMaxStorageBroadcastBytes),
NUM_PROP(kConcurrentLifespansPerTask, 1),
STR_PROP(kTaskMaxPartialAggregationMemory, "16MB"),
NUM_PROP(kDriverMaxSplitPreload, 2),
Expand All @@ -167,7 +169,9 @@ SystemConfig::SystemConfig() {
NUM_PROP(kDriverStuckOperatorThresholdMs, 30 * 60 * 1000),
NUM_PROP(
kDriverCancelTasksWithStuckOperatorsThresholdMs, 40 * 60 * 1000),
NUM_PROP(kDriverNumStuckOperatorsToDetachWorker, std::round(0.5 * hardwareConcurrency())),
NUM_PROP(
kDriverNumStuckOperatorsToDetachWorker,
std::round(0.5 * hardwareConcurrency())),
NUM_PROP(kSpillerNumCpuThreadsHwMultiplier, 1.0),
STR_PROP(kSpillerFileCreateConfig, ""),
STR_PROP(kSpillerDirectoryCreateConfig, ""),
Expand Down Expand Up @@ -427,6 +431,10 @@ folly::Optional<int32_t> SystemConfig::taskPartitionedWriterCount() const {
return optionalProperty<int32_t>(kTaskPartitionedWriterCount);
}

folly::Optional<uint64_t> SystemConfig::taskMaxStorageBroadcastBytes() const {
return optionalProperty<uint64_t>(kTaskMaxStorageBroadcastBytes);
}
Comment on lines +434 to +436
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): taskMaxStorageBroadcastBytes uses optionalProperty but lacks validation for negative values.

Casting negative values to uint64_t will result in large positive numbers due to wraparound. Please add a check to ensure only non-negative values are accepted.

Suggested change
folly::Optional<uint64_t> SystemConfig::taskMaxStorageBroadcastBytes() const {
return optionalProperty<uint64_t>(kTaskMaxStorageBroadcastBytes);
}
folly::Optional<uint64_t> SystemConfig::taskMaxStorageBroadcastBytes() const {
auto optValue = optionalProperty<int64_t>(kTaskMaxStorageBroadcastBytes);
if (optValue.hasValue()) {
if (optValue.value() < 0) {
return folly::none;
}
return static_cast<uint64_t>(optValue.value());
}
return folly::none;
}


int32_t SystemConfig::concurrentLifespansPerTask() const {
return optionalProperty<int32_t>(kConcurrentLifespansPerTask).value();
}
Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kTaskWriterCount{"task.writer-count"};
static constexpr std::string_view kTaskPartitionedWriterCount{
"task.partitioned-writer-count"};

/// Maximum number of bytes per task that can be broadcast to storage for
/// storage-based broadcast joins. This property is only applicable to
/// storage-based broadcast join operations, currently used in the Presto on
/// Spark native stack. When the broadcast data size exceeds this limit, the
/// query fails.
static constexpr std::string_view kTaskMaxStorageBroadcastBytes{
"task.max-storage-broadcast-bytes"};
static constexpr std::string_view kConcurrentLifespansPerTask{
"task.concurrent-lifespans-per-task"};
static constexpr std::string_view kTaskMaxPartialAggregationMemory{
Expand Down Expand Up @@ -843,6 +851,8 @@ class SystemConfig : public ConfigBase {

folly::Optional<int32_t> taskPartitionedWriterCount() const;

folly::Optional<uint64_t> taskMaxStorageBroadcastBytes() const;

int32_t concurrentLifespansPerTask() const;

double httpServerNumIoThreadsHwMultiplier() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,29 @@
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include "presto_cpp/external/json/nlohmann/json.hpp"
#include "presto_cpp/main/common/Exception.h"
#include "presto_cpp/main/thrift/ThriftIO.h"
#include "presto_cpp/main/thrift/gen-cpp2/presto_native_types.h"
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
#include "velox/common/file/File.h"
#include "velox/vector/FlatVector.h"

using namespace facebook::velox::exec;
using namespace facebook::velox;
using namespace facebook::presto;

namespace facebook::presto::operators {

#define PRESTO_BROADCAST_LIMIT_EXCEEDED(errorMessage) \
_VELOX_THROW( \
::facebook::velox::VeloxRuntimeError, \
::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \
::facebook::presto::error_code::kExceededLocalBroadcastJoinMemoryLimit \
.c_str(), \
/* isRetriable */ false, \
"{}", \
errorMessage);

namespace {
std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
Expand All @@ -40,11 +53,13 @@ BroadcastFactory::BroadcastFactory(const std::string& basePath)

std::unique_ptr<BroadcastFileWriter> BroadcastFactory::createWriter(
uint64_t writeBufferSize,
uint64_t maxBroadcastBytes,
velox::memory::MemoryPool* pool,
std::unique_ptr<VectorSerde::Options> serdeOptions) {
fileSystem_->mkdir(basePath_);
return std::make_unique<BroadcastFileWriter>(
fmt::format("{}/file_broadcast_{}", basePath_, makeUuid()),
maxBroadcastBytes,
writeBufferSize,
std::move(serdeOptions),
pool);
Expand All @@ -69,6 +84,7 @@ std::unique_ptr<BroadcastFileInfo> BroadcastFileInfo::deserialize(

BroadcastFileWriter::BroadcastFileWriter(
const std::string& pathPrefix,
uint64_t maxBroadcastBytes,
uint64_t writeBufferSize,
std::unique_ptr<VectorSerde::Options> serdeOptions,
velox::memory::MemoryPool* pool)
Expand All @@ -79,7 +95,8 @@ BroadcastFileWriter::BroadcastFileWriter(
"",
std::move(serdeOptions),
getNamedVectorSerde(VectorSerde::Kind::kPresto),
pool) {}
pool),
maxBroadcastBytes_(maxBroadcastBytes) {}

void BroadcastFileWriter::write(const RowVectorPtr& rowVector) {
const auto numRows = rowVector->size();
Expand All @@ -89,6 +106,20 @@ void BroadcastFileWriter::write(const RowVectorPtr& rowVector) {
numRows_ += numRows;
}

void BroadcastFileWriter::updateWriteStats(
uint64_t writtenBytes,
uint64_t /* flushTimeNs */,
uint64_t /* fileWriteTimeNs */) {
writtenBytes_ += writtenBytes;
if (FOLLY_UNLIKELY(writtenBytes_ > maxBroadcastBytes_)) {
PRESTO_BROADCAST_LIMIT_EXCEEDED(fmt::format(
"Storage broadcast join exceeded per task broadcast limit "
"writtenBytes_ {} vs maxBroadcastBytes_ {}",
succinctBytes(writtenBytes_),
succinctBytes(maxBroadcastBytes_)));
}
}

uint64_t BroadcastFileWriter::flush() {
const auto pageBytes = serializer::SerializedPageFileWriter::flush();
if (pageBytes != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class BroadcastFileWriter : velox::serializer::SerializedPageFileWriter {
public:
BroadcastFileWriter(
const std::string& pathPrefix,
uint64_t maxBroadcastBytes,
uint64_t writeBufferSize,
std::unique_ptr<velox::VectorSerde::Options> serdeOptions,
velox::memory::MemoryPool* pool);
Expand All @@ -58,6 +59,11 @@ class BroadcastFileWriter : velox::serializer::SerializedPageFileWriter {
velox::RowVectorPtr fileStats();

private:
void updateWriteStats(
uint64_t writtenBytes,
uint64_t /* flushTimeNs */,
uint64_t /* fileWriteTimeNs */) override;

uint64_t flush() override;

void closeFile() override;
Expand All @@ -70,6 +76,9 @@ class BroadcastFileWriter : velox::serializer::SerializedPageFileWriter {
// [serialized-thrift-footer][footer_size(8)]
void writeFooter();

const uint64_t maxBroadcastBytes_;

uint64_t writtenBytes_{0};
int64_t numRows_{0};
std::vector<int64_t> pageSizes_;
velox::RowVectorPtr fileStats_{nullptr};
Expand Down Expand Up @@ -120,6 +129,7 @@ class BroadcastFactory {

std::unique_ptr<BroadcastFileWriter> createWriter(
uint64_t writeBufferSize,
uint64_t maxBroadcastBytes,
velox::memory::MemoryPool* pool,
std::unique_ptr<velox::VectorSerde::Options> serdeOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* limitations under the License.
*/
#include "presto_cpp/main/operators/BroadcastWrite.h"
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/operators/BroadcastFactory.h"

using namespace facebook::velox::exec;
Expand Down Expand Up @@ -55,10 +56,12 @@ class BroadcastWriteOperator : public Operator {
serdeChannels_(calculateOutputChannels(
planNode->inputType(),
planNode->serdeRowType(),
planNode->serdeRowType())) {
planNode->serdeRowType())),
maxBroadcastBytes_(planNode->maxBroadcastBytes()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: maxBroadcastBytes_ is stored but not used in BroadcastWriteOperator.

If maxBroadcastBytes_ is only needed for BroadcastFileWriter construction, consider removing it from BroadcastWriteOperator's member variables to reduce unnecessary state.

Suggested implementation:

        serdeChannels_(calculateOutputChannels(
            planNode->inputType(),
            planNode->serdeRowType(),
            planNode->serdeRowType())) {
    auto fileBroadcast = BroadcastFactory(planNode->basePath());
    fileBroadcastWriter_ = fileBroadcast.createWriter(
        8 << 20,
        planNode->maxBroadcastBytes(),
        operatorCtx_->pool(),
        getVectorSerdeOptions(ctx->queryConfig(), VectorSerde::Kind::kPresto));
  }
  const std::vector<column_index_t> serdeChannels_;
  std::unique_ptr<BroadcastFileWriter> fileBroadcastWriter_;

auto fileBroadcast = BroadcastFactory(planNode->basePath());
fileBroadcastWriter_ = fileBroadcast.createWriter(
8 << 20,
planNode->maxBroadcastBytes(),
operatorCtx_->pool(),
getVectorSerdeOptions(ctx->queryConfig(), VectorSerde::Kind::kPresto));
}
Expand Down Expand Up @@ -120,6 +123,7 @@ class BroadcastWriteOperator : public Operator {
// Empty if column order in the serdeRowType_ is exactly the same as in input
// or serdeRowType_ has no columns.
const std::vector<column_index_t> serdeChannels_;
const uint64_t maxBroadcastBytes_;
std::unique_ptr<BroadcastFileWriter> fileBroadcastWriter_;
bool finished_{false};
};
Expand All @@ -129,6 +133,7 @@ folly::dynamic BroadcastWriteNode::serialize() const {
auto obj = PlanNode::serialize();
obj["broadcastWriteBasePath"] =
ISerializable::serialize<std::string>(basePath_);
obj["maxBroadcastBytes"] = maxBroadcastBytes_;
obj["rowType"] = serdeRowType_->serialize();
obj["sources"] = ISerializable::serialize(sources_);
return obj;
Expand All @@ -141,6 +146,7 @@ velox::core::PlanNodePtr BroadcastWriteNode::create(
deserializePlanNodeId(obj),
ISerializable::deserialize<std::string>(
obj["broadcastWriteBasePath"], context),
obj["maxBroadcastBytes"].asInt(),
ISerializable::deserialize<RowType>(obj["rowType"]),
ISerializable::deserialize<std::vector<velox::core::PlanNode>>(
obj["sources"], context)[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,68 +28,15 @@ class BroadcastWriteNode : public velox::core::PlanNode {
BroadcastWriteNode(
const velox::core::PlanNodeId& id,
const std::string& basePath,
uint64_t maxBroadcastBytes,
velox::RowTypePtr serdeRowType,
velox::core::PlanNodePtr source)
: velox::core::PlanNode(id),
basePath_{basePath},
serdeRowType_{serdeRowType},
maxBroadcastBytes_{maxBroadcastBytes},
serdeRowType_{std::move(serdeRowType)},
sources_{std::move(source)} {}

class Builder {
public:
Builder() = default;

explicit Builder(const BroadcastWriteNode& other) {
id_ = other.id();
basePath_ = other.basePath();
serdeRowType_ = other.serdeRowType();
source_ = other.sources()[0];
}

Builder& id(velox::core::PlanNodeId id) {
id_ = std::move(id);
return *this;
}

Builder& basePath(std::string basePath) {
basePath_ = std::move(basePath);
return *this;
}

Builder& serdeRowType(velox::RowTypePtr serdeRowType) {
serdeRowType_ = std::move(serdeRowType);
return *this;
}

Builder& source(velox::core::PlanNodePtr source) {
source_ = std::move(source);
return *this;
}

std::shared_ptr<BroadcastWriteNode> build() const {
VELOX_USER_CHECK(id_.has_value(), "BroadcastWriteNode id is not set");
VELOX_USER_CHECK(
basePath_.has_value(), "BroadcastWriteNode basePath is not set");
VELOX_USER_CHECK(
serdeRowType_.has_value(),
"BroadcastWriteNode serdeRowType is not set");
VELOX_USER_CHECK(
source_.has_value(), "BroadcastWriteNode source is not set");

return std::make_shared<BroadcastWriteNode>(
id_.value(),
basePath_.value(),
serdeRowType_.value(),
source_.value());
}

private:
std::optional<velox::core::PlanNodeId> id_;
std::optional<std::string> basePath_;
std::optional<velox::RowTypePtr> serdeRowType_;
std::optional<velox::core::PlanNodePtr> source_;
};

folly::dynamic serialize() const override;

static velox::core::PlanNodePtr create(
Expand All @@ -113,6 +60,10 @@ class BroadcastWriteNode : public velox::core::PlanNode {
return basePath_;
}

uint64_t maxBroadcastBytes() const {
return maxBroadcastBytes_;
}

/// The desired schema of the serialized data. May include a subset of input
/// columns, some columns may be duplicated, some columns may be missing,
/// columns may appear in different order.
Expand All @@ -128,6 +79,7 @@ class BroadcastWriteNode : public velox::core::PlanNode {
void addDetails(std::stringstream& stream) const override {}

const std::string basePath_;
const uint64_t maxBroadcastBytes_;
const velox::RowTypePtr serdeRowType_;
const std::vector<velox::core::PlanNodePtr> sources_;
};
Expand Down
Loading
Loading