Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/sidecar.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ The following HTTP endpoints are implemented by the Presto C++ sidecar.
validates the Velox plan. Returns any errors encountered during plan
conversion.

.. function:: POST /v1/expressions

Optimizes a list of ``RowExpression``\s from the http request using
a combination of constant folding and logical rewrites by leveraging
the ``ExprOptimizer`` from Velox. Returns a list of ``RowExpressionOptimizationResult``,
that contains either the optimized ``RowExpression`` or the ``NativeSidecarFailureInfo``
in case the expression optimization failed.

Configuration Properties
------------------------

Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ target_link_libraries(
$<TARGET_OBJECTS:presto_protocol>
presto_common
presto_exception
presto_expression_optimizer
presto_function_metadata
presto_connectors
presto_http
Expand Down
60 changes: 60 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <glog/logging.h>
#include <proxygen/lib/http/HTTPHeaders.h>
#include "presto_cpp/main/Announcer.h"
#include "presto_cpp/main/CoordinatorDiscoverer.h"
#include "presto_cpp/main/PeriodicMemoryChecker.h"
Expand All @@ -42,6 +43,7 @@
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/ShuffleWrite.h"
#include "presto_cpp/main/types/ExpressionOptimizer.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "presto_cpp/main/types/VeloxPlanConversion.h"
#include "velox/common/base/Counters.h"
Expand Down Expand Up @@ -100,6 +102,8 @@ constexpr char const* kTaskUriFormat =
constexpr char const* kConnectorName = "connector.name";
constexpr char const* kLinuxSharedLibExt = ".so";
constexpr char const* kMacOSSharedLibExt = ".dylib";
constexpr char const* kOptimized = "OPTIMIZED";
constexpr char const* kEvaluated = "EVALUATED";

protocol::NodeState convertNodeState(presto::NodeState nodeState) {
switch (nodeState) {
Expand Down Expand Up @@ -192,6 +196,50 @@ void unregisterVeloxCudf() {
#endif
}

json::array_t getOptimizedExpressions(
const proxygen::HTTPHeaders& httpHeaders,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
folly::Executor* executor,
velox::memory::MemoryPool* pool) {
static constexpr char const* kOptimizerLevelHeader =
"X-Presto-Expression-Optimizer-Level";
const auto& optimizerLevelString =
httpHeaders.getSingleOrEmpty(kOptimizerLevelHeader);
VELOX_USER_CHECK(
(optimizerLevelString == kOptimized) ||
(optimizerLevelString == kEvaluated),
"Optimizer level should be OPTIMIZED or EVALUATED, received {}.",
optimizerLevelString);
auto optimizerLevel = (optimizerLevelString == kOptimized)
? expression::OptimizerLevel::kOptimized
: expression::OptimizerLevel::kEvaluated;

static constexpr char const* kTimezoneHeader = "X-Presto-Time-Zone";
const auto& timezone = httpHeaders.getSingleOrEmpty(kTimezoneHeader);
std::unordered_map<std::string, std::string> config(
{{velox::core::QueryConfig::kSessionTimezone, timezone},
{velox::core::QueryConfig::kAdjustTimestampToTimezone, "true"}});
auto queryConfig = velox::core::QueryConfig{std::move(config)};
auto queryCtx =
velox::core::QueryCtx::create(executor, std::move(queryConfig));

json input = json::parse(util::extractMessageBody(body));
VELOX_USER_CHECK(input.is_array(), "Body of request should be a JSON array.");
const json::array_t expressionList = static_cast<json::array_t>(input);
std::vector<RowExpressionPtr> expressions;
for (const auto& j : expressionList) {
expressions.push_back(j);
}
const auto optimizedList = expression::optimizeExpressions(
expressions, timezone, optimizerLevel, queryCtx.get(), pool);

json::array_t result;
for (const auto& optimized : optimizedList) {
result.push_back(optimized);
}
return result;
}

} // namespace

std::string nodeState2String(NodeState nodeState) {
Expand Down Expand Up @@ -1727,6 +1775,18 @@ void PrestoServer::registerSidecarEndpoints() {
http::sendOkResponse(downstream, getFunctionsMetadata(catalog));
});
});
httpServer_->registerPost(
"/v1/expressions",
[this](
proxygen::HTTPMessage* message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
const auto& httpHeaders = message->getHeaders();
const auto result = getOptimizedExpressions(
httpHeaders, body, driverExecutor_.get(), nativeWorkerPool_.get());
http::sendOkResponse(downstream, result);
});

httpServer_->registerPost(
"/v1/velox/plan",
[server = this](
Expand Down
14 changes: 14 additions & 0 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool)
add_library(presto_velox_plan_conversion OBJECT VeloxPlanConversion.cpp)
target_link_libraries(presto_velox_plan_conversion velox_type)

add_library(presto_velox_to_presto_expr VeloxToPrestoExpr.cpp)

target_link_libraries(
presto_velox_to_presto_expr
presto_exception
presto_type_converter
presto_types
presto_protocol
)

add_library(presto_expression_optimizer ExpressionOptimizer.cpp)

target_link_libraries(presto_expression_optimizer presto_types presto_velox_to_presto_expr)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "presto_cpp/main/types/ExpressionOptimizer.h"
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/common/Exception.h"
#include "presto_cpp/main/http/HttpServer.h"
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include "presto_cpp/main/types/TypeParser.h"
#include "presto_cpp/main/types/VeloxToPrestoExpr.h"
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
#include "velox/core/Expressions.h"
#include "velox/expression/Expr.h"
#include "velox/expression/ExprOptimizer.h"

namespace facebook::presto::expression {

namespace {

static const velox::expression::MakeFailExpr kMakeFailExpr =
[](const std::string& error,
const velox::TypePtr& type) -> velox::core::TypedExprPtr {
return std::make_shared<velox::core::CastTypedExpr>(
type,
std::vector<velox::core::TypedExprPtr>{
std::make_shared<velox::core::CallTypedExpr>(
velox::UNKNOWN(),
std::vector<velox::core::TypedExprPtr>{
std::make_shared<velox::core::ConstantTypedExpr>(
velox::VARCHAR(), error)},
fmt::format(
"{}fail",
SystemConfig::instance()->prestoDefaultNamespacePrefix()))},
false);
};

// Tries to evaluate `expr`, irrespective of its determinism, to a constant
// value.
velox::VectorPtr tryEvaluateToConstant(
const velox::core::TypedExprPtr& expr,
velox::core::QueryCtx* queryCtx,
velox::memory::MemoryPool* pool) {
auto data =
velox::BaseVector::create<velox::RowVector>(velox::ROW({}), 1, pool);
velox::core::ExecCtx execCtx{pool, queryCtx};
velox::exec::ExprSet exprSet({expr}, &execCtx);
velox::exec::EvalCtx evalCtx(&execCtx, &exprSet, data.get());

const velox::SelectivityVector singleRow(1);
std::vector<velox::VectorPtr> results(1);
exprSet.eval(singleRow, evalCtx, results);
return results.at(0);
}

protocol::RowExpressionOptimizationResult optimizeExpression(
const RowExpressionPtr& input,
OptimizerLevel& optimizerLevel,
const VeloxExprConverter& prestoToVeloxConverter,
const expression::VeloxToPrestoExprConverter& veloxToPrestoConverter,
velox::core::QueryCtx* queryCtx,
velox::memory::MemoryPool* pool) {
protocol::RowExpressionOptimizationResult result;
const auto expr = prestoToVeloxConverter.toVeloxExpr(input);
auto optimized =
velox::expression::optimize(expr, queryCtx, pool, kMakeFailExpr);

if (optimizerLevel == OptimizerLevel::kEvaluated) {
try {
const auto evalResult = tryEvaluateToConstant(optimized, queryCtx, pool);
optimized = std::make_shared<velox::core::ConstantTypedExpr>(evalResult);
} catch (const velox::VeloxException& e) {
result.expressionFailureInfo =
toNativeSidecarFailureInfo(translateToPrestoException(e));
result.optimizedExpression = nullptr;
return result;
} catch (const std::exception& e) {
result.expressionFailureInfo =
toNativeSidecarFailureInfo(translateToPrestoException(e));
result.optimizedExpression = nullptr;
return result;
}
}

result.optimizedExpression =
veloxToPrestoConverter.getRowExpression(optimized, input);
return result;
}

} // namespace

std::vector<protocol::RowExpressionOptimizationResult> optimizeExpressions(
const std::vector<RowExpressionPtr>& input,
const std::string& timezone,
OptimizerLevel& optimizerLevel,
velox::core::QueryCtx* queryCtx,
velox::memory::MemoryPool* pool) {
TypeParser typeParser;
const VeloxExprConverter prestoToVeloxConverter(pool, &typeParser);
const expression::VeloxToPrestoExprConverter veloxToPrestoConverter(pool);
std::vector<protocol::RowExpressionOptimizationResult> result;
for (const auto& expression : input) {
result.push_back(optimizeExpression(
expression,
optimizerLevel,
prestoToVeloxConverter,
veloxToPrestoConverter,
queryCtx,
pool));
}
return result;
}

} // namespace facebook::presto::expression
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "presto_cpp/external/json/nlohmann/json.hpp"
#include "presto_cpp/presto_protocol/presto_protocol.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/core/QueryCtx.h"

using RowExpressionPtr =
std::shared_ptr<facebook::presto::protocol::RowExpression>;

namespace facebook::presto::expression {

/// Optimizer level, which indicates the extent to which expressions should be
/// optimized.
/// TODO: This should be obtained from Presto protocol after refactoring the
/// enum from ExpressionOptimizer in Presto SPI.
enum class OptimizerLevel {
/// Removes all redundancy in a RowExpression using the ExpressionOptimizer in
/// Velox.
kOptimized = 0,
/// Attempts to evaluate the RowExpression into a constant, even when the
/// expression is non-deterministic.
kEvaluated,
};

/// Optimizes the input list of RowExpressions. For each input RowExpression,
/// the result is an optimized expression on success or failure info.
/// @param input List of RowExpressions to be optimized.
/// @param timezone Session timezone, received from Presto coordinator.
/// @param optimizerLevel Optimizer level, received from Presto coordinator.
/// The optimizerLevel can either be OPTIMIZED or EVALUATED. OPTIMIZED removes
/// all redundancy in a RowExpression by leveraging the ExpressionOptimizer in
/// Velox, and EVALUATED attempts to evaluate the RowExpression into a constant
/// even when the expression is non-deterministic.
/// @param queryCtx Query context to be used during optimization.
/// @param pool Memory pool, required for expression evaluation.
std::vector<protocol::RowExpressionOptimizationResult> optimizeExpressions(
const std::vector<RowExpressionPtr>& input,
const std::string& timezone,
OptimizerLevel& optimizerLevel,
velox::core::QueryCtx* queryCtx,
velox::memory::MemoryPool* pool);
} // namespace facebook::presto::expression
Loading
Loading