Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ void updateFromSystemConfigs(
{std::string{SystemConfig::kTaskPartitionedWriterCount},
velox::core::QueryConfig::kTaskPartitionedWriterCount},

{std::string{SystemConfig::kExchangeMaxBufferSize},
velox::core::QueryConfig::kMaxExchangeBufferSize},

{std::string(SystemConfig::kSinkMaxBufferSize),
velox::core::QueryConfig::kMaxOutputBufferSize,
[](const auto& value) {
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ SystemConfig::SystemConfig() {
BOOL_PROP(kExchangeEnableConnectionPool, true),
BOOL_PROP(kExchangeEnableBufferCopy, true),
BOOL_PROP(kExchangeImmediateBufferTransfer, true),
NUM_PROP(kExchangeMaxBufferSize, 32UL << 20),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line requries MB in unit. Otherwise quereis fail with

presto:di> select * from tpch.sf1.orders limit 2
Query 20251101_231501_00108_hcf8c failed: Non-whitespace character found after end of conversion: "MB"

NUM_PROP(kTaskRunTimeSliceMicros, 50'000),
BOOL_PROP(kIncludeNodeInSpillPath, false),
NUM_PROP(kOldTaskCleanUpMs, 60'000),
Expand Down Expand Up @@ -887,6 +888,10 @@ bool SystemConfig::exchangeImmediateBufferTransfer() const {
return optionalProperty<bool>(kExchangeImmediateBufferTransfer).value();
}

uint64_t SystemConfig::exchangeMaxBufferSize() const {
return optionalProperty<uint64_t>(kExchangeMaxBufferSize).value();
}

int32_t SystemConfig::taskRunTimeSliceMicros() const {
return optionalProperty<int32_t>(kTaskRunTimeSliceMicros).value();
}
Expand Down
7 changes: 7 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,11 @@ class SystemConfig : public ConfigBase {
kExchangeHttpClientNumCpuThreadsHwMultiplier{
"exchange.http-client.num-cpu-threads-hw-multiplier"};

/// Maximum size in bytes to accumulate in ExchangeQueue. Enforced
/// approximately, not strictly.
static constexpr std::string_view kExchangeMaxBufferSize{
"exchange.max-buffer-size"};

/// The maximum timeslice for a task on thread if there are threads queued.
static constexpr std::string_view kTaskRunTimeSliceMicros{
"task-run-timeslice-micros"};
Expand Down Expand Up @@ -1054,6 +1059,8 @@ class SystemConfig : public ConfigBase {

bool exchangeImmediateBufferTransfer() const;

uint64_t exchangeMaxBufferSize() const;

int32_t taskRunTimeSliceMicros() const;

bool includeNodeInSpillPath() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,108 @@ TEST_F(PrestoToVeloxQueryConfigTest, sessionStartTimeConfiguration) {
EXPECT_EQ(
std::numeric_limits<int64_t>::max(), veloxConfig5.sessionStartTimeMs());
}

TEST_F(PrestoToVeloxQueryConfigTest, systemConfigsWithoutSessionOverride) {
// Verifies system configs are properly applied when no session properties
// override them. Uses exact count matching to catch any config additions or
// removals.

auto session = createBasicSession();
session.systemProperties.clear();
auto veloxConfigs = toVeloxConfigs(session);

struct SystemConfigMapping {
std::string veloxConfigKey;
std::string systemConfigKey;
};

// MUST match veloxToPrestoConfigMapping in PrestoToVeloxQueryConfig.cpp
std::vector<SystemConfigMapping> expectedMappings = {
{core::QueryConfig::kQueryMaxMemoryPerNode,
std::string(SystemConfig::kQueryMaxMemoryPerNode)},
{core::QueryConfig::kSpillFileCreateConfig,
std::string(SystemConfig::kSpillerFileCreateConfig)},
{core::QueryConfig::kSpillEnabled,
std::string(SystemConfig::kSpillEnabled)},
{core::QueryConfig::kJoinSpillEnabled,
std::string(SystemConfig::kJoinSpillEnabled)},
{core::QueryConfig::kOrderBySpillEnabled,
std::string(SystemConfig::kOrderBySpillEnabled)},
{core::QueryConfig::kAggregationSpillEnabled,
std::string(SystemConfig::kAggregationSpillEnabled)},
{core::QueryConfig::kRequestDataSizesMaxWaitSec,
std::string(SystemConfig::kRequestDataSizesMaxWaitSec)},
{core::QueryConfig::kMaxSplitPreloadPerDriver,
std::string(SystemConfig::kDriverMaxSplitPreload)},
{core::QueryConfig::kMaxLocalExchangePartitionBufferSize,
std::string(SystemConfig::kMaxLocalExchangePartitionBufferSize)},
{core::QueryConfig::kPrestoArrayAggIgnoreNulls,
std::string(SystemConfig::kUseLegacyArrayAgg)},
{core::QueryConfig::kTaskWriterCount,
std::string(SystemConfig::kTaskWriterCount)},
{core::QueryConfig::kTaskPartitionedWriterCount,
std::string(SystemConfig::kTaskPartitionedWriterCount)},
{core::QueryConfig::kMaxExchangeBufferSize,
std::string(SystemConfig::kExchangeMaxBufferSize)},
{core::QueryConfig::kMaxOutputBufferSize,
std::string(SystemConfig::kSinkMaxBufferSize)},
{core::QueryConfig::kMaxPartitionedOutputBufferSize,
std::string(SystemConfig::kDriverMaxPagePartitioningBufferSize)},
{core::QueryConfig::kMaxPartialAggregationMemory,
std::string(SystemConfig::kTaskMaxPartialAggregationMemory)},
};

const size_t kExpectedSystemConfigMappingCount = 16;
EXPECT_EQ(kExpectedSystemConfigMappingCount, expectedMappings.size())
<< "Update expectedMappings to match veloxToPrestoConfigMapping";

// Verify each system config mapping is present when it has a value
auto* systemConfig = SystemConfig::instance();
for (const auto& mapping : expectedMappings) {
auto systemValue = systemConfig->optionalProperty(mapping.systemConfigKey);
if (systemValue.hasValue()) {
EXPECT_TRUE(veloxConfigs.count(mapping.veloxConfigKey) > 0)
<< "Expected '" << mapping.veloxConfigKey << "' when system config '"
<< mapping.systemConfigKey << "' = " << systemValue.value();
}
}

// Verify special case configs (always added)
EXPECT_TRUE(
veloxConfigs.count(core::QueryConfig::kAdjustTimestampToTimezone) > 0);
EXPECT_EQ(
"true", veloxConfigs.at(core::QueryConfig::kAdjustTimestampToTimezone));

EXPECT_TRUE(
veloxConfigs.count(core::QueryConfig::kDriverCpuTimeSliceLimitMs) > 0);
EXPECT_EQ(
"1000", veloxConfigs.at(core::QueryConfig::kDriverCpuTimeSliceLimitMs));

// Verify session-specific configs
EXPECT_TRUE(veloxConfigs.count(core::QueryConfig::kSessionStartTime) > 0);
EXPECT_EQ(
"1234567890", veloxConfigs.at(core::QueryConfig::kSessionStartTime));

// Calculate expected exact count
size_t expectedExactConfigs = 0;
for (const auto& mapping : expectedMappings) {
if (systemConfig->optionalProperty(mapping.systemConfigKey).hasValue()) {
expectedExactConfigs++;
}
}
expectedExactConfigs += 2; // kAdjustTimestampToTimezone,
// kDriverCpuTimeSliceLimitMs
expectedExactConfigs += 1; // kSessionStartTime

// Use exact matching to catch any config additions/removals
EXPECT_EQ(veloxConfigs.size(), expectedExactConfigs)
<< "Config count mismatch indicates mapping change. Expected "
<< expectedExactConfigs << ", got " << veloxConfigs.size();

// Debug output
std::cout << "System configs (no session overrides):" << std::endl;
for (const auto& [key, value] : veloxConfigs) {
std::cout << " " << key << " = " << value << std::endl;
}
std::cout << "Total: " << veloxConfigs.size() << std::endl;
}
Loading