Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR "Enable Arrow Flight connector" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ endif
ifneq ($(PRESTO_MEMORY_CHECKER_TYPE),)
EXTRA_CMAKE_FLAGS += -DPRESTO_MEMORY_CHECKER_TYPE=$(PRESTO_MEMORY_CHECKER_TYPE)
endif
ifneq ($(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR),)
EXTRA_CMAKE_FLAGS += -DPRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR=$(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
endif

CMAKE_FLAGS := -DTREAT_WARNINGS_AS_ERRORS=${TREAT_WARNINGS_AS_ERRORS}
CMAKE_FLAGS += -DENABLE_ALL_WARNINGS=${ENABLE_WALL}
Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ follow these steps:
* For development, use `make debug` to build a non-optimized debug version.
* Use `make unittest` to build and run tests.

#### Arrow Flight Connector
To enable Arrow Flight connector support, set the environment variable:
`PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR = "ON"`.

The Arrow Flight connector requires the Arrow Flight library. You can install this dependency
by running the following script from the `presto/presto-native-execution` directory:

`./scripts/setup-adapters.sh arrow_flight`

### Makefile Targets
A reminder of the available Makefile targets can be obtained using `make help`
```
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_subdirectory(types)
add_subdirectory(http)
add_subdirectory(common)
add_subdirectory(thrift)
add_subdirectory(connectors)

add_library(
presto_server_lib
Expand Down Expand Up @@ -48,6 +49,7 @@ target_link_libraries(
presto_common
presto_exception
presto_function_metadata
presto_connector
presto_http
presto_operators
velox_aggregates
Expand Down
28 changes: 16 additions & 12 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/connectors/ConnectorRegistration.h"
#include "presto_cpp/main/http/HttpConstants.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
Expand Down Expand Up @@ -252,8 +253,9 @@ void PrestoServer::run() {

// Register Velox connector factory for iceberg.
// The iceberg catalog is handled by the hive connector factory.
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>("iceberg"));
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
"iceberg"));

registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive"));
Expand All @@ -275,6 +277,8 @@ void PrestoServer::run() {
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

presto::connector::registerAllPrestoConnectors();

initializeVeloxMemory();
initializeThreadPools();

Expand Down Expand Up @@ -1108,18 +1112,18 @@ PrestoServer::getAdditionalHttpServerFilters() {
void PrestoServer::registerConnectorFactories() {
// These checks for connector factories can be removed after we remove the
// registrations from the Velox library.
if (!connector::hasConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>());
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>(
if (!velox::connector::hasConnectorFactory(
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>());
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
kHiveHadoop2ConnectorName));
}
if (!connector::hasConnectorFactory(
connector::tpch::TpchConnectorFactory::kTpchConnectorName)) {
connector::registerConnectorFactory(
std::make_shared<connector::tpch::TpchConnectorFactory>());
if (!velox::connector::hasConnectorFactory(
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::tpch::TpchConnectorFactory>());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ std::optional<RowVectorPtr> SystemDataSource::next(
std::unique_ptr<velox::connector::ConnectorSplit>
SystemPrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* const connectorSplit) const {
const protocol::ConnectorSplit* const connectorSplit,
const std::map<std::string, std::string>& extraCredentials) const {
auto systemSplit = dynamic_cast<const protocol::SystemSplit*>(connectorSplit);
VELOX_CHECK_NOT_NULL(
systemSplit, "Unexpected split type {}", connectorSplit->_type);
Expand Down
4 changes: 3 additions & 1 deletion presto-native-execution/presto_cpp/main/SystemConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ class SystemPrestoToVeloxConnector final : public PrestoToVeloxConnector {

std::unique_ptr<velox::connector::ConnectorSplit> toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit) const final;
const protocol::ConnectorSplit* connectorSplit,
const std::map<std::string, std::string>& extraCredentials = {})
const final;

std::unique_ptr<velox::connector::ColumnHandle> toVeloxColumnHandle(
const protocol::ColumnHandle* column,
Expand Down
5 changes: 4 additions & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateTask(
planFragment,
updateRequest.sources,
updateRequest.outputIds,
updateRequest.extraCredentials,
std::move(queryCtx),
startProcessCpuTime);
}
Expand All @@ -456,6 +457,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateBatchTask(
planFragment,
updateRequest.sources,
updateRequest.outputIds,
updateRequest.extraCredentials,
std::move(queryCtx),
startProcessCpuTime);
}
Expand All @@ -465,6 +467,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
const velox::core::PlanFragment& planFragment,
const std::vector<protocol::TaskSource>& sources,
const protocol::OutputBuffers& outputBuffers,
const std::map<std::string, std::string>& extraCredentials,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime) {
std::shared_ptr<exec::Task> execTask;
Expand Down Expand Up @@ -565,7 +568,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
// Keep track of the max sequence for this batch of splits.
long maxSplitSequenceId{-1};
for (const auto& protocolSplit : source.splits) {
auto split = toVeloxSplit(protocolSplit);
auto split = toVeloxSplit(protocolSplit, extraCredentials);
if (split.hasConnectorSplit()) {
maxSplitSequenceId =
std::max(maxSplitSequenceId, protocolSplit.sequenceId);
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class TaskManager {
const velox::core::PlanFragment& planFragment,
const std::vector<protocol::TaskSource>& sources,
const protocol::OutputBuffers& outputBuffers,
const std::map<std::string, std::string>& extraCredentials,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
add_subdirectory(arrow_flight)
endif()

add_library(presto_connector ConnectorRegistration.cpp)

if(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
target_link_libraries(presto_connector presto_flight_connector)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/connectors/ConnectorRegistration.h"

#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
#include "presto_cpp/main/connectors/arrow_flight/ArrowFlightConnector.h"
#include "presto_cpp/main/connectors/arrow_flight/ArrowPrestoToVeloxConnector.h"
#endif

namespace facebook::presto::connector {

void registerAllPrestoConnectors() {
#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
registerPrestoToVeloxConnector(
std::make_unique<
presto::connector::arrow_flight::ArrowPrestoToVeloxConnector>(
"arrow-flight"));

if (!velox::connector::hasConnectorFactory(
presto::connector::arrow_flight::ArrowFlightConnectorFactory::
kArrowFlightConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<
presto::connector::arrow_flight::ArrowFlightConnectorFactory>());
}
#endif
}

} // namespace facebook::presto::connector
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

namespace facebook::presto::connector {

void registerAllPrestoConnectors();

} // namespace facebook::presto::connector
Loading
Loading