Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions velox/connectors/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ endif()
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()

add_library(velox_registered_connector_factories STATIC RegisterConnectorFactories.cpp)
target_link_libraries(
velox_registered_connector_factories
PUBLIC $<$<BOOL:${VELOX_ENABLE_PARQUET}>:velox_dwio_parquet_reader> velox_dwio_common
)
6 changes: 4 additions & 2 deletions velox/connectors/Connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ bool hasConnectorFactory(const std::string& connectorName) {
}

bool unregisterConnectorFactory(const std::string& connectorName) {
auto count = connectorFactories().erase(connectorName);
return count == 1;
auto factoryCount = connectorFactories().erase(connectorName);
return factoryCount == 1;
}

std::shared_ptr<ConnectorFactory> getConnectorFactory(
Expand Down Expand Up @@ -169,4 +169,6 @@ folly::dynamic ConnectorTableHandle::serialize() const {
return serializeBase("ConnectorTableHandle");
}

ConnectorLocationHandle::~ConnectorLocationHandle() = default;

} // namespace facebook::velox::connector
83 changes: 83 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "velox/common/file/TokenProvider.h"
#include "velox/common/future/VeloxPromise.h"
#include "velox/core/ExpressionEvaluator.h"
#include "velox/core/PartitionFunction.h"
#include "velox/type/Filter.h"
#include "velox/vector/ComplexVector.h"

Expand Down Expand Up @@ -93,6 +94,8 @@ struct ConnectorSplit : public ISerializable {

class ColumnHandle : public ISerializable {
public:
enum class ColumnType { kPartitionKey, kRegular, kSynthesized };

virtual ~ColumnHandle() = default;

virtual const std::string& name() const = 0;
Expand Down Expand Up @@ -165,6 +168,33 @@ class ConnectorInsertTableHandle : public ISerializable {
}
};

class ConnectorLocationHandle : public ISerializable {
public:
enum class TableType { kNew, kExisting, kTemp };

ConnectorLocationHandle(const std::string& connectorId, TableType tableType)
: connectorId_{connectorId}, tableType_{tableType} {}

virtual ~ConnectorLocationHandle();

const std::string& connectorId() const {
return connectorId_;
}

/// New vs existing vs temp.
TableType tableType() const {
return tableType_;
}

virtual std::string toString() const = 0;

virtual folly::dynamic serialize() const = 0;

private:
const std::string connectorId_;
const TableType tableType_;
};

using ConnectorInsertTableHandlePtr =
std::shared_ptr<const ConnectorInsertTableHandle>;

Expand Down Expand Up @@ -688,6 +718,59 @@ class ConnectorFactory {
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) = 0;

virtual std::shared_ptr<ConnectorSplit> makeConnectorSplit(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 Connector is a generic API, but these new APIs are specific to Hive-like connectors. For example, these do not make sense for MySQL connector.

const std::string& connectorId,
const std::string& filePath,
uint64_t start,
uint64_t length,
const folly::dynamic& options = {}) const {
VELOX_UNSUPPORTED("ConnectorSplit not supported by connector", connectorId);
}

virtual std::shared_ptr<connector::ColumnHandle> makeColumnHandle(
const std::string& connectorId,
const std::string& name,
const TypePtr& type,
const folly::dynamic& options = {}) const {
VELOX_UNSUPPORTED(
"connector::ColumnHandle not supported by connector", connectorId);
}

virtual std::shared_ptr<ConnectorTableHandle> makeTableHandle(
const std::string& connectorId,
const std::string& tableName,
std::vector<std::shared_ptr<const connector::ColumnHandle>> columnHandles,
const folly::dynamic& options) const {
VELOX_UNSUPPORTED(
"ConnectorTableHandle not supported by connector", connectorId);
}

virtual std::shared_ptr<ConnectorInsertTableHandle> makeInsertTableHandle(
const std::string& connectorId,
std::vector<std::shared_ptr<const connector::ColumnHandle>> inputColumns,
std::shared_ptr<const ConnectorLocationHandle> locationHandle,
const folly::dynamic& options = {}) const {
VELOX_UNSUPPORTED(
"ConnectorInsertTableHandle not supported by connector", connectorId);
}

virtual std::shared_ptr<ConnectorLocationHandle> makeLocationHandle(
const std::string& connectorId,
ConnectorLocationHandle::TableType tableType =
ConnectorLocationHandle::TableType::kNew,
const folly::dynamic& options = {}) const {
VELOX_UNSUPPORTED(
"ConnectorLocationHandle not supported by connector", connectorId);
}

virtual std::shared_ptr<const core::PartitionFunctionSpec>
makePartitionFunctionSpec(
const std::string& connectorId,
const folly::dynamic& options = {}) const {
VELOX_UNSUPPORTED(
"PartitionFunctionSpec not supported by connector: {}", connectorId);
}

private:
const std::string name_;
};
Expand Down
29 changes: 29 additions & 0 deletions velox/connectors/ConnectorNames.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

namespace facebook::velox::connector {

// TODO: Add a demo connector

constexpr const char* kFuzzerConnectorName = "fuzzer";
constexpr const char* kHiveConnectorName = "hive";
constexpr const char* kIcebergConnectorName = "iceberg";
constexpr const char* kTpchConnectorName = "tpch";
constexpr const char* kTpcdsConnectorName = "tpcds";

} // namespace facebook::velox::connector
64 changes: 64 additions & 0 deletions velox/connectors/RegisterConnectorFactories.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/RegisterConnectorFactories.h"

#ifdef VELOX_ENABLE_HIVE_CONNECTOR
#include "velox/connectors/hive/HiveConnector.h"
#endif
#ifdef VELOX_ENABLE_TPCDS_CONNECTOR
#include "velox/connectors/tpcds/TpcdsConnector.h"
#endif
#ifdef VELOX_ENABLE_TPCH_CONNECTOR
#include "velox/connectors/tpch/TpchConnector.h"
#endif
#ifdef VELOX_ENABLE_FUZZER_CONNECTOR
#include "velox/connectors/fuzzer/FuzzerConnector.h"
#endif

namespace facebook::velox::connector {

void registerConnectorFactories() {
#ifdef VELOX_ENABLE_HIVE_CONNECTOR
hive::registerHiveConnectorFactory();
#endif
#ifdef VELOX_ENABLE_TPCDS_CONNECTOR
tpcds::registerTpcdsConnectorFactory();
#endif
#ifdef VELOX_ENABLE_TPCH_CONNECTOR
tpch::registerTpchConnectorFactory();
#endif
#ifdef VELOX_ENABLE_FUZZER_CONNECTOR
fuzzer::registerFuzzerConnectorFactory();
#endif
}

void unregisterConnectorFactories() {
#ifdef VELOX_ENABLE_HIVE_CONNECTOR
hive::registerHiveConnectorFactory();
#endif
#ifdef VELOX_ENABLE_TPCDS_CONNECTOR
tpcds::registerTpcdsConnectorFactory();
#endif
#ifdef VELOX_ENABLE_TPCH_CONNECTOR
tpch::registerTpchConnectorFactory();
#endif
#ifdef VELOX_ENABLE_FUZZER_CONNECTOR
fuzzer::registerFuzzerConnectorFactory();
#endif
}

} // namespace facebook::velox::connector
25 changes: 25 additions & 0 deletions velox/connectors/RegisterConnectorFactories.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

namespace facebook::velox::connector {

void registerConnectorFactories();

void unregisterConnectorFactories();

} // namespace facebook::velox::connector
10 changes: 10 additions & 0 deletions velox/connectors/fuzzer/FuzzerConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/connectors/fuzzer/FuzzerConnector.h"
#include "velox/connectors/ConnectorNames.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"

namespace facebook::velox::connector::fuzzer {
Expand Down Expand Up @@ -69,4 +70,13 @@ std::optional<RowVectorPtr> FuzzerDataSource::next(
return outputVector;
}

bool registerFuzzerConnectorFactory(
std::unique_ptr<FuzzerConnectorFactory> factory) {
connector::registerConnectorFactory(std::move(factory));
}

bool unregisterFuzzerConnectorFactory() {
connector::unregisterConnectorFactory(connector::kFuzzerConnectorName);
}

} // namespace facebook::velox::connector::fuzzer
7 changes: 7 additions & 0 deletions velox/connectors/fuzzer/FuzzerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ class FuzzerConnectorFactory : public ConnectorFactory {
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, config, ioExecutor);
}

// TODO: Add object makers like makeTableHandle, makeColumnHandle, etc.
};

bool registerFuzzerConnectorFactory(
std::unique_ptr<FuzzerConnectorFactory> factory);

bool unregisterFuzzerConnectorFactory();

} // namespace facebook::velox::connector::fuzzer
Loading