Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 73 additions & 54 deletions presto-native-execution/presto_cpp/main/PrestoToVeloxQueryConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,64 +115,83 @@ void updateFromSystemConfigs(
};

static const std::vector<ConfigMapping> veloxToPrestoConfigMapping{
{std::string(SystemConfig::kQueryMaxMemoryPerNode),
velox::core::QueryConfig::kQueryMaxMemoryPerNode},
{.prestoSystemConfig = std::string(SystemConfig::kQueryMaxMemoryPerNode),
.veloxConfig = velox::core::QueryConfig::kQueryMaxMemoryPerNode},

{
std::string(SystemConfig::kSpillerFileCreateConfig),
velox::core::QueryConfig::kSpillFileCreateConfig,
.prestoSystemConfig =
std::string(SystemConfig::kSpillerFileCreateConfig),
.veloxConfig = velox::core::QueryConfig::kSpillFileCreateConfig,
},

{std::string(SystemConfig::kSpillEnabled),
velox::core::QueryConfig::kSpillEnabled},

{std::string(SystemConfig::kJoinSpillEnabled),
velox::core::QueryConfig::kJoinSpillEnabled},

{std::string(SystemConfig::kOrderBySpillEnabled),
velox::core::QueryConfig::kOrderBySpillEnabled},

{std::string(SystemConfig::kAggregationSpillEnabled),
velox::core::QueryConfig::kAggregationSpillEnabled},

{std::string(SystemConfig::kRequestDataSizesMaxWaitSec),
velox::core::QueryConfig::kRequestDataSizesMaxWaitSec},

{std::string(SystemConfig::kDriverMaxSplitPreload),
velox::core::QueryConfig::kMaxSplitPreloadPerDriver},

{std::string(SystemConfig::kMaxLocalExchangePartitionBufferSize),
velox::core::QueryConfig::kMaxLocalExchangePartitionBufferSize},

{std::string(SystemConfig::kUseLegacyArrayAgg),
velox::core::QueryConfig::kPrestoArrayAggIgnoreNulls},

{std::string{SystemConfig::kTaskWriterCount},
velox::core::QueryConfig::kTaskWriterCount},

{std::string{SystemConfig::kTaskPartitionedWriterCount},
velox::core::QueryConfig::kTaskPartitionedWriterCount},

{std::string(SystemConfig::kSinkMaxBufferSize),
velox::core::QueryConfig::kMaxOutputBufferSize,
[](const auto& value) {
return folly::to<std::string>(velox::config::toCapacity(
value, velox::config::CapacityUnit::BYTE));
}},

{std::string(SystemConfig::kDriverMaxPagePartitioningBufferSize),
velox::core::QueryConfig::kMaxPartitionedOutputBufferSize,
[](const auto& value) {
return folly::to<std::string>(velox::config::toCapacity(
value, velox::config::CapacityUnit::BYTE));
}},

{std::string(SystemConfig::kTaskMaxPartialAggregationMemory),
velox::core::QueryConfig::kMaxPartialAggregationMemory,
[](const auto& value) {
return folly::to<std::string>(velox::config::toCapacity(
value, velox::config::CapacityUnit::BYTE));
}},
{.prestoSystemConfig = std::string(SystemConfig::kSpillEnabled),
.veloxConfig = velox::core::QueryConfig::kSpillEnabled},

{.prestoSystemConfig = std::string(SystemConfig::kJoinSpillEnabled),
.veloxConfig = velox::core::QueryConfig::kJoinSpillEnabled},

{.prestoSystemConfig = std::string(SystemConfig::kOrderBySpillEnabled),
.veloxConfig = velox::core::QueryConfig::kOrderBySpillEnabled},

{.prestoSystemConfig =
std::string(SystemConfig::kAggregationSpillEnabled),
.veloxConfig = velox::core::QueryConfig::kAggregationSpillEnabled},

{.prestoSystemConfig =
std::string(SystemConfig::kRequestDataSizesMaxWaitSec),
.veloxConfig = velox::core::QueryConfig::kRequestDataSizesMaxWaitSec},

{.prestoSystemConfig = std::string(SystemConfig::kDriverMaxSplitPreload),
.veloxConfig = velox::core::QueryConfig::kMaxSplitPreloadPerDriver},

{.prestoSystemConfig =
std::string(SystemConfig::kMaxLocalExchangePartitionBufferSize),
.veloxConfig =
velox::core::QueryConfig::kMaxLocalExchangePartitionBufferSize},

{.prestoSystemConfig = std::string(SystemConfig::kUseLegacyArrayAgg),
.veloxConfig = velox::core::QueryConfig::kPrestoArrayAggIgnoreNulls},

{.prestoSystemConfig = std::string{SystemConfig::kTaskWriterCount},
.veloxConfig = velox::core::QueryConfig::kTaskWriterCount},

{.prestoSystemConfig =
std::string{SystemConfig::kTaskPartitionedWriterCount},
.veloxConfig = velox::core::QueryConfig::kTaskPartitionedWriterCount},

{.prestoSystemConfig = std::string{SystemConfig::kExchangeMaxBufferSize},
.veloxConfig = velox::core::QueryConfig::kMaxExchangeBufferSize,
.toVeloxPropertyValueConverter =
[](const auto& value) {
return folly::to<std::string>(velox::config::toCapacity(
value, velox::config::CapacityUnit::BYTE));
}},

{.prestoSystemConfig = std::string(SystemConfig::kSinkMaxBufferSize),
.veloxConfig = velox::core::QueryConfig::kMaxOutputBufferSize,
.toVeloxPropertyValueConverter =
[](const auto& value) {
return folly::to<std::string>(velox::config::toCapacity(
value, velox::config::CapacityUnit::BYTE));
}},

{.prestoSystemConfig =
std::string(SystemConfig::kDriverMaxPagePartitioningBufferSize),
.veloxConfig = velox::core::QueryConfig::kMaxPartitionedOutputBufferSize,
.toVeloxPropertyValueConverter =
[](const auto& value) {
return folly::to<std::string>(velox::config::toCapacity(
value, velox::config::CapacityUnit::BYTE));
}},

{.prestoSystemConfig =
std::string(SystemConfig::kTaskMaxPartialAggregationMemory),
.veloxConfig = velox::core::QueryConfig::kMaxPartialAggregationMemory,
.toVeloxPropertyValueConverter =
[](const auto& value) {
return folly::to<std::string>(velox::config::toCapacity(
value, velox::config::CapacityUnit::BYTE));
}},
};

for (const auto& configMapping : veloxToPrestoConfigMapping) {
Expand Down
7 changes: 7 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ SystemConfig::SystemConfig() {
BOOL_PROP(kExchangeEnableConnectionPool, true),
BOOL_PROP(kExchangeEnableBufferCopy, true),
BOOL_PROP(kExchangeImmediateBufferTransfer, true),
STR_PROP(kExchangeMaxBufferSize, "32MB"),
NUM_PROP(kTaskRunTimeSliceMicros, 50'000),
BOOL_PROP(kIncludeNodeInSpillPath, false),
NUM_PROP(kOldTaskCleanUpMs, 60'000),
Expand Down Expand Up @@ -953,6 +954,12 @@ bool SystemConfig::exchangeImmediateBufferTransfer() const {
return optionalProperty<bool>(kExchangeImmediateBufferTransfer).value();
}

uint64_t SystemConfig::exchangeMaxBufferSize() const {
return velox::config::toCapacity(
optionalProperty(kExchangeMaxBufferSize).value(),
velox::config::CapacityUnit::BYTE);
}

int32_t SystemConfig::taskRunTimeSliceMicros() const {
return optionalProperty<int32_t>(kTaskRunTimeSliceMicros).value();
}
Expand Down
7 changes: 7 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,11 @@ class SystemConfig : public ConfigBase {
kExchangeHttpClientNumCpuThreadsHwMultiplier{
"exchange.http-client.num-cpu-threads-hw-multiplier"};

/// Maximum size in bytes to accumulate in ExchangeQueue. Enforced
/// approximately, not strictly.
static constexpr std::string_view kExchangeMaxBufferSize{
"exchange.max-buffer-size"};
Comment on lines +744 to +747
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Document units for kExchangeMaxBufferSize for clarity.

Consider updating the documentation or variable naming to indicate that human-readable units (e.g., '32MB') are accepted, as supported by toCapacity.

Suggested change
/// Maximum size in bytes to accumulate in ExchangeQueue. Enforced
/// approximately, not strictly.
static constexpr std::string_view kExchangeMaxBufferSize{
"exchange.max-buffer-size"};
/// Maximum size to accumulate in ExchangeQueue. Enforced
/// approximately, not strictly. Accepts human-readable units (e.g., '32MB', '1GB')
/// as supported by toCapacity. The value is interpreted as bytes if no unit is specified.
static constexpr std::string_view kExchangeMaxBufferSize{
"exchange.max-buffer-size"};


/// The maximum timeslice for a task on thread if there are threads queued.
static constexpr std::string_view kTaskRunTimeSliceMicros{
"task-run-timeslice-micros"};
Expand Down Expand Up @@ -1118,6 +1123,8 @@ class SystemConfig : public ConfigBase {

bool exchangeImmediateBufferTransfer() const;

uint64_t exchangeMaxBufferSize() const;

int32_t taskRunTimeSliceMicros() const;

bool includeNodeInSpillPath() const;
Expand Down
Loading
Loading