diff --git a/velox/connectors/CMakeLists.txt b/velox/connectors/CMakeLists.txt index 3fd17dde2d2..492acb57959 100644 --- a/velox/connectors/CMakeLists.txt +++ b/velox/connectors/CMakeLists.txt @@ -32,3 +32,9 @@ endif() if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() + +add_library(velox_registered_connector_factories STATIC RegisterConnectorFactories.cpp) +target_link_libraries( + velox_registered_connector_factories + PUBLIC $<$:velox_dwio_parquet_reader> velox_dwio_common +) diff --git a/velox/connectors/Connector.cpp b/velox/connectors/Connector.cpp index e7c72af478b..f345de03cb7 100644 --- a/velox/connectors/Connector.cpp +++ b/velox/connectors/Connector.cpp @@ -58,8 +58,8 @@ bool hasConnectorFactory(const std::string& connectorName) { } bool unregisterConnectorFactory(const std::string& connectorName) { - auto count = connectorFactories().erase(connectorName); - return count == 1; + auto factoryCount = connectorFactories().erase(connectorName); + return factoryCount == 1; } std::shared_ptr getConnectorFactory( @@ -169,4 +169,6 @@ folly::dynamic ConnectorTableHandle::serialize() const { return serializeBase("ConnectorTableHandle"); } +ConnectorLocationHandle::~ConnectorLocationHandle() = default; + } // namespace facebook::velox::connector diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 10cf82b342c..0a6d091b769 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -27,6 +27,7 @@ #include "velox/common/file/TokenProvider.h" #include "velox/common/future/VeloxPromise.h" #include "velox/core/ExpressionEvaluator.h" +#include "velox/core/PartitionFunction.h" #include "velox/type/Filter.h" #include "velox/vector/ComplexVector.h" @@ -93,6 +94,8 @@ struct ConnectorSplit : public ISerializable { class ColumnHandle : public ISerializable { public: + enum class ColumnType { kPartitionKey, kRegular, kSynthesized }; + virtual ~ColumnHandle() = default; virtual const std::string& name() const = 0; @@ -165,6 +168,33 @@ class ConnectorInsertTableHandle : public ISerializable { } }; +class ConnectorLocationHandle : public ISerializable { + public: + enum class TableType { kNew, kExisting, kTemp }; + + ConnectorLocationHandle(const std::string& connectorId, TableType tableType) + : connectorId_{connectorId}, tableType_{tableType} {} + + virtual ~ConnectorLocationHandle(); + + const std::string& connectorId() const { + return connectorId_; + } + + /// New vs existing vs temp. + TableType tableType() const { + return tableType_; + } + + virtual std::string toString() const = 0; + + virtual folly::dynamic serialize() const = 0; + + private: + const std::string connectorId_; + const TableType tableType_; +}; + using ConnectorInsertTableHandlePtr = std::shared_ptr; @@ -688,6 +718,59 @@ class ConnectorFactory { folly::Executor* ioExecutor = nullptr, folly::Executor* cpuExecutor = nullptr) = 0; + virtual std::shared_ptr makeConnectorSplit( + const std::string& connectorId, + const std::string& filePath, + uint64_t start, + uint64_t length, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED("ConnectorSplit not supported by connector", connectorId); + } + + virtual std::shared_ptr makeColumnHandle( + const std::string& connectorId, + const std::string& name, + const TypePtr& type, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "connector::ColumnHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr makeTableHandle( + const std::string& connectorId, + const std::string& tableName, + std::vector> columnHandles, + const folly::dynamic& options) const { + VELOX_UNSUPPORTED( + "ConnectorTableHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr makeInsertTableHandle( + const std::string& connectorId, + std::vector> inputColumns, + std::shared_ptr locationHandle, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "ConnectorInsertTableHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr makeLocationHandle( + const std::string& connectorId, + ConnectorLocationHandle::TableType tableType = + ConnectorLocationHandle::TableType::kNew, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "ConnectorLocationHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr + makePartitionFunctionSpec( + const std::string& connectorId, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "PartitionFunctionSpec not supported by connector: {}", connectorId); + } + private: const std::string name_; }; diff --git a/velox/connectors/ConnectorNames.h b/velox/connectors/ConnectorNames.h new file mode 100644 index 00000000000..05f1dd7f79c --- /dev/null +++ b/velox/connectors/ConnectorNames.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::connector { + +// TODO: Add a demo connector + +constexpr const char* kFuzzerConnectorName = "fuzzer"; +constexpr const char* kHiveConnectorName = "hive"; +constexpr const char* kIcebergConnectorName = "iceberg"; +constexpr const char* kTpchConnectorName = "tpch"; +constexpr const char* kTpcdsConnectorName = "tpcds"; + +} // namespace facebook::velox::connector diff --git a/velox/connectors/RegisterConnectorFactories.cpp b/velox/connectors/RegisterConnectorFactories.cpp new file mode 100644 index 00000000000..cbb645253ce --- /dev/null +++ b/velox/connectors/RegisterConnectorFactories.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/RegisterConnectorFactories.h" + +#ifdef VELOX_ENABLE_HIVE_CONNECTOR +#include "velox/connectors/hive/HiveConnector.h" +#endif +#ifdef VELOX_ENABLE_TPCDS_CONNECTOR +#include "velox/connectors/tpcds/TpcdsConnector.h" +#endif +#ifdef VELOX_ENABLE_TPCH_CONNECTOR +#include "velox/connectors/tpch/TpchConnector.h" +#endif +#ifdef VELOX_ENABLE_FUZZER_CONNECTOR +#include "velox/connectors/fuzzer/FuzzerConnector.h" +#endif + +namespace facebook::velox::connector { + +void registerConnectorFactories() { +#ifdef VELOX_ENABLE_HIVE_CONNECTOR + hive::registerHiveConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_TPCDS_CONNECTOR + tpcds::registerTpcdsConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_TPCH_CONNECTOR + tpch::registerTpchConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_FUZZER_CONNECTOR + fuzzer::registerFuzzerConnectorFactory(); +#endif +} + +void unregisterConnectorFactories() { +#ifdef VELOX_ENABLE_HIVE_CONNECTOR + hive::registerHiveConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_TPCDS_CONNECTOR + tpcds::registerTpcdsConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_TPCH_CONNECTOR + tpch::registerTpchConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_FUZZER_CONNECTOR + fuzzer::registerFuzzerConnectorFactory(); +#endif +} + +} // namespace facebook::velox::connector diff --git a/velox/connectors/RegisterConnectorFactories.h b/velox/connectors/RegisterConnectorFactories.h new file mode 100644 index 00000000000..f7bc3d7fa8f --- /dev/null +++ b/velox/connectors/RegisterConnectorFactories.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::connector { + +void registerConnectorFactories(); + +void unregisterConnectorFactories(); + +} // namespace facebook::velox::connector diff --git a/velox/connectors/fuzzer/FuzzerConnector.cpp b/velox/connectors/fuzzer/FuzzerConnector.cpp index f17b8faefd9..a91ea2314b7 100644 --- a/velox/connectors/fuzzer/FuzzerConnector.cpp +++ b/velox/connectors/fuzzer/FuzzerConnector.cpp @@ -15,6 +15,7 @@ */ #include "velox/connectors/fuzzer/FuzzerConnector.h" +#include "velox/connectors/ConnectorNames.h" #include "velox/vector/fuzzer/VectorFuzzer.h" namespace facebook::velox::connector::fuzzer { @@ -69,4 +70,13 @@ std::optional FuzzerDataSource::next( return outputVector; } +bool registerFuzzerConnectorFactory( + std::unique_ptr factory) { + connector::registerConnectorFactory(std::move(factory)); +} + +bool unregisterFuzzerConnectorFactory() { + connector::unregisterConnectorFactory(connector::kFuzzerConnectorName); +} + } // namespace facebook::velox::connector::fuzzer diff --git a/velox/connectors/fuzzer/FuzzerConnector.h b/velox/connectors/fuzzer/FuzzerConnector.h index 53e94b5f638..dbf117d07d7 100644 --- a/velox/connectors/fuzzer/FuzzerConnector.h +++ b/velox/connectors/fuzzer/FuzzerConnector.h @@ -141,6 +141,13 @@ class FuzzerConnectorFactory : public ConnectorFactory { folly::Executor* cpuExecutor = nullptr) override { return std::make_shared(id, config, ioExecutor); } + + // TODO: Add object makers like makeTableHandle, makeColumnHandle, etc. }; +bool registerFuzzerConnectorFactory( + std::unique_ptr factory); + +bool unregisterFuzzerConnectorFactory(); + } // namespace facebook::velox::connector::fuzzer diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index e04828e83aa..266bc191149 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -16,6 +16,7 @@ #include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/ConnectorNames.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/HiveDataSource.h" @@ -25,6 +26,7 @@ #include using namespace facebook::velox::exec; +using namespace facebook::velox::common; namespace facebook::velox::connector::hive { @@ -99,6 +101,401 @@ void HiveConnector::registerSerDe() { HivePartitionFunctionSpec::registerSerDe(); } +std::shared_ptr HiveConnectorFactory::makeConnectorSplit( + const std::string& connectorId, + const std::string& filePath, + uint64_t start, + uint64_t length, + const folly::dynamic& options) const { + auto builder = HiveConnectorSplitBuilder(filePath) + .start(start) + .length(length) + .connectorId(connectorId); + + dwio::common::FileFormat fileFormat = + (!options.isNull() && options.count("fileFormat")) + ? static_cast(options["fileFormat"].asInt()) + : defaultFileFormat_; + builder.fileFormat(fileFormat); + + int64_t splitWeight = (!options.isNull() && options.count("splitWeight")) + ? options["splitWeight"].asInt() + : 0; + builder.splitWeight(splitWeight); + + bool cacheable = (!options.isNull() && options.count("cacheable")) + ? options["cacheable"].asBool() + : true; + builder.cacheable(cacheable); + + if (!options.isNull() && options.count("infoColumns")) { + for (auto& kv : options["infoColumns"].items()) { + builder.infoColumn(kv.first.asString(), kv.second.asString()); + } + } + + if (!options.isNull() && options.count("partitionKeys")) { + for (auto& kv : options["partitionKeys"].items()) { + builder.partitionKey( + kv.first.asString(), + kv.second.isNull() + ? std::nullopt + : std::optional(kv.second.asString())); + } + } + + if (!options.isNull() && options.count("tableBucketNumber")) { + builder.tableBucketNumber(options["tableBucketNumber"].asInt()); + } + + if (!options.isNull() && options.count("bucketConversion")) { + HiveBucketConversion bucketConversion; + const auto& bucketConversionOption = options["bucketConversion"]; + bucketConversion.tableBucketCount = + bucketConversionOption["tableBucketCount"].asInt(); + bucketConversion.partitionBucketCount = + bucketConversionOption["partitionBucketCount"].asInt(); + for (auto& bucketColumnHandlesOption : + bucketConversionOption["bucketColumnHandles"]) { + bucketConversion.bucketColumnHandles.push_back( + std::const_pointer_cast( + ISerializable::deserialize( + bucketColumnHandlesOption))); + } + builder.bucketConversion(bucketConversion); + } + + if (!options.isNull() && options.count("customSplitInfo")) { + std::unordered_map info; + for (auto& kv : options["customSplitInfo"].items()) { + info[kv.first.asString()] = kv.second.asString(); + } + builder.customSplitInfo(info); + } + + if (!options.isNull() && options.count("extraFileInfo")) { + auto extra = options["extraFileInfo"].isNull() + ? std::shared_ptr() + : std::make_shared(options["extraFileInfo"].asString()); + builder.extraFileInfo(extra); + } + + if (!options.isNull() && options.count("serdeParameters")) { + std::unordered_map serde; + for (auto& kv : options["serdeParameters"].items()) { + serde[kv.first.asString()] = kv.second.asString(); + } + builder.serdeParameters(serde); + } + + if (!options.isNull() && options.count("fileProperties")) { + FileProperties props; + const auto& propertiesOption = options["fileProperties"]; + if (propertiesOption.count("fileSize") && + !propertiesOption["fileSize"].isNull()) { + props.fileSize = propertiesOption["fileSize"].asInt(); + } + if (propertiesOption.count("modificationTime") && + !propertiesOption["modificationTime"].isNull()) { + props.modificationTime = propertiesOption["modificationTime"].asInt(); + } + builder.fileProperties(props); + } + + if (!options.isNull() && options.count("rowIdProperties")) { + RowIdProperties rowIdProperties; + const auto& rowIdPropertiesOption = options["rowIdProperties"]; + rowIdProperties.metadataVersion = + rowIdPropertiesOption["metadataVersion"].asInt(); + rowIdProperties.partitionId = rowIdPropertiesOption["partitionId"].asInt(); + rowIdProperties.tableGuid = rowIdPropertiesOption["tableGuid"].asString(); + builder.rowIdProperties(rowIdProperties); + } + + return builder.build(); +} + +std::shared_ptr HiveConnectorFactory::makeColumnHandle( + const std::string& connectorId, + const std::string& name, + const TypePtr& dataType, + const folly::dynamic& options) const { + using HiveColumnType = hive::HiveColumnHandle::ColumnType; + HiveColumnType hiveColumnType = HiveColumnType::kRegular; + + if (options.isNull()) { + return std::make_shared( + name, hiveColumnType, dataType, dataType); + } + + if (options.count("columnType") && options["columnType"].isString()) { + std::string columnType = options["columnType"].asString(); + // Accept a few friendly spellings, case-insensitive. + folly::toLowerAscii(columnType); + + if (columnType == "kpartitionkey" || columnType == "partition_key" || + columnType == "partitionkey") { + hiveColumnType = HiveColumnType::kPartitionKey; + } else if (columnType == "kregular" || columnType == "regular") { + hiveColumnType = HiveColumnType::kRegular; + } else if ( + columnType == "ksynthesized" || columnType == "synthesized" || + columnType == "synthetic") { + hiveColumnType = HiveColumnType::kSynthesized; + } else if ( + columnType == "krowindex" || columnType == "row_index" || + columnType == "rowindex") { + hiveColumnType = HiveColumnType::kRowIndex; + } else if ( + columnType == "krowid" || columnType == "row_id" || + columnType == "rowid") { + hiveColumnType = HiveColumnType::kRowId; + } + } + + TypePtr hiveType = + options.count("hiveType") ? Type::create(options["hiveType"]) : dataType; + + // subfields would be serialized as a vector of strings; + std::vector subfields; + if (auto rs = options.get_ptr("requiredSubfields")) { + subfields.reserve(rs->size()); + for (auto& v : *rs) { + subfields.emplace_back(v.asString()); + } + } + + HiveColumnHandle::ColumnParseParameters parseParams{ + HiveColumnHandle::ColumnParseParameters::kISO8601}; + if (auto cp = options.get_ptr("columnParseParameters")) { + auto formatName = + cp->getDefault("partitionDateValueFormat", "ISO8601").asString(); + if (formatName == "DaysSinceEpoch" || formatName == "kDaysSinceEpoch") { + parseParams.partitionDateValueFormat = + HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch; + } else { + parseParams.partitionDateValueFormat = + HiveColumnHandle::ColumnParseParameters::kISO8601; + } + } + + return std::make_shared( + name, + hiveColumnType, + dataType, + hiveType, + std::move(subfields), + parseParams); +} + +std::shared_ptr HiveConnectorFactory::makeTableHandle( + const std::string& connectorId, + const std::string& tableName, + std::vector> columnHandles, + const folly::dynamic& options) const { + bool filterPushdownEnabled = + options.getDefault("filterPushdownEnabled", true).asBool(); + + common::SubfieldFilters subfieldFilters; + if (auto sf = options.get_ptr("subfieldFilters")) { + subfieldFilters.reserve(sf->size()); + + for (auto& kv : sf->items()) { + // 1) Parse the key string into a Subfield + // (uses Subfield(const std::string&) and default separators) + Subfield subfield(kv.first.asString()); + + // 2) Deserialize the Filter from its dynamic form. + // Assumes every Filter subclass registered a SerDe entry in + // Filter::registerSerDe(). + auto filter = ISerializable::deserialize( + kv.second, /* context = */ nullptr); + + subfieldFilters.emplace( + std::move(subfield), std::const_pointer_cast(filter)); + } + } + + core::TypedExprPtr remainingFilter = nullptr; + if (auto rf = options.get_ptr("remainingFilter")) { + // assuming rf["expr"] holds the serialized expression + remainingFilter = ISerializable::deserialize(*rf); + } + + std::unordered_map tableParameters; + if (auto tp = options.get_ptr("tableParameters")) { + for (auto& kv : tp->items()) { + tableParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + // build RowTypePtr from columnHandles + std::vector names; + std::vector types; + names.reserve(columnHandles.size()); + types.reserve(columnHandles.size()); + for (auto& col : columnHandles) { + auto hiveCol = std::static_pointer_cast(col); + names.push_back(hiveCol->name()); + types.push_back(hiveCol->dataType()); + } + auto dataColumns = ROW(std::move(names), std::move(types)); + + return std::make_shared( + connectorId, + tableName, + filterPushdownEnabled, + std::move(subfieldFilters), + std::move(remainingFilter), + dataColumns, + std::move(tableParameters)); +} + +std::shared_ptr +HiveConnectorFactory::makeInsertTableHandle( + const std::string& connectorId, + std::vector> inputColumns, + std::shared_ptr locationHandle, + const folly::dynamic& options) const { + // Convert inputColumns + std::vector> inputHiveColumns; + inputHiveColumns.reserve(inputColumns.size()); + for (const auto& handle : inputColumns) { + inputHiveColumns.push_back( + std::static_pointer_cast(handle)); + } + + auto hiveLoc = + std::dynamic_pointer_cast(locationHandle); + VELOX_CHECK( + hiveLoc, + "HiveConnectorFactory::makeInsertTableHandle: " + "expected HiveLocationHandle"); + + auto fmt = + options + .getDefault( + "storageFormat", static_cast(dwio::common::FileFormat::DWRF)) + .asInt(); + auto storageFormat = static_cast(fmt); + + std::shared_ptr bucketProperty = nullptr; + if (auto bp = options.get_ptr("bucketProperty")) { + bucketProperty = HiveBucketProperty::deserialize(*bp, nullptr); + } + + std::optional compressionKind; + if (auto ck = options.get_ptr("compressionKind")) { + compressionKind = static_cast(ck->asInt()); + } + + std::unordered_map serdeParameters; + if (auto sp = options.get_ptr("serdeParameters")) { + for (auto& kv : sp->items()) { + serdeParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + std::shared_ptr writerOptions = nullptr; + if (auto wo = options.get_ptr("writerOptions")) { + writerOptions = dwio::common::WriterOptions::deserialize(*wo); + } + + bool ensureFiles = options.getDefault("ensureFiles", false).asBool(); + + auto fileNameGen = HiveInsertFileNameGenerator::deserialize( + *options.get_ptr("fileNameGenerator"), nullptr); + + return std::make_shared( + std::move(inputHiveColumns), + hiveLoc, + storageFormat, + std::move(bucketProperty), + compressionKind, + std::move(serdeParameters), + std::move(writerOptions), + ensureFiles, + std::move(fileNameGen)); +} + +std::shared_ptr +HiveConnectorFactory::makeLocationHandle( + const std::string& connectorId, + LocationHandle::TableType tableType, + const folly::dynamic& options) const { + VELOX_CHECK(options.isObject(), "Expected options to be a dynamic object"); + + // Required fields + auto targetPath = options.at("targetPath").asString(); + auto writePath = options.at("writePath").asString(); + + // Optional field: targetFileName + std::string targetFileName; + if (options.count("targetFileName") && + !options.at("targetFileName").isNull()) { + targetFileName = options.at("targetFileName").asString(); + } + + return std::make_shared( + std::move(targetPath), + std::move(writePath), + tableType, + std::move(targetFileName), + connectorId); +} + +core::PartitionFunctionSpecPtr HiveConnectorFactory::makePartitionFunctionSpec( + const std::string& /*connectorId*/, + const folly::dynamic& options) const { + VELOX_CHECK(options.isObject(), "Expected options to be a dynamic object"); + + // Required: number of buckets + const int numBuckets = options.at("numBuckets").asInt(); + + // Optional: explicit bucket-to-partition mapping + std::vector bucketToPartition; + if (options.count("bucketToPartition") && + !options["bucketToPartition"].isNull()) { + bucketToPartition = ISerializable::deserialize>( + options["bucketToPartition"]); + } + + // Required: key channels (by column index) for the bucket function + // NOTE: Keep the key name consistent with HashPartitionFunctionSpec + // ("keyChannels") + std::vector channels = + ISerializable::deserialize>( + options.at("keyChannels")); + + // Optional: constants used as key(s) (serialized ConstantTypedExpr[]) + std::vector constValues; + if (options.count("constants") && !options["constants"].isNull()) { + const auto typedConsts = + ISerializable::deserialize>( + options["constants"]); + constValues.reserve(typedConsts.size()); + + for (const auto& c : typedConsts) { + VELOX_CHECK_NOT_NULL(c); + constValues.emplace_back(c->toConstantVector(pool_.get())); + } + } + + if (bucketToPartition.empty()) { + // Spec form where mapping is derived at create() time (round-robin and + // optional local shuffle). + return std::make_shared( + numBuckets, std::move(channels), std::move(constValues)); + } + + // Spec with explicit mapping. + return std::make_shared( + numBuckets, + std::move(bucketToPartition), + std::move(channels), + std::move(constValues)); +} + std::unique_ptr HivePartitionFunctionSpec::create( int numPartitions, bool localExchange) const { @@ -192,4 +589,10 @@ void HivePartitionFunctionSpec::registerSerDe() { "HivePartitionFunctionSpec", HivePartitionFunctionSpec::deserialize); } +bool registerHiveConnectorMetadataFactory( + std::unique_ptr factory) { + hiveConnectorMetadataFactories().push_back(std::move(factory)); + return true; +} + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index c6b91392976..ca6c7ffba23 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -19,6 +19,7 @@ #include "velox/connectors/hive/FileHandle.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/core/PlanNode.h" +#include "velox/dwio/common/Options.h" namespace facebook::velox::dwio::common { class DataSink; @@ -98,6 +99,47 @@ class HiveConnectorFactory : public ConnectorFactory { folly::Executor* cpuExecutor = nullptr) override { return std::make_shared(id, config, ioExecutor); } + + std::shared_ptr makeConnectorSplit( + const std::string& connectorId, + const std::string& filePath, + uint64_t start, + uint64_t length, + const folly::dynamic& options = {}) const override; + + std::shared_ptr makeColumnHandle( + const std::string& connectorId, + const std::string& name, + const TypePtr& type, + const folly::dynamic& options = {}) const override; + + std::shared_ptr makeTableHandle( + const std::string& connectorId, + const std::string& tableName, + std::vector> columnHandles, + const folly::dynamic& options) const override; + + std::shared_ptr makeInsertTableHandle( + const std::string& connectorId, + std::vector> inputColumns, + std::shared_ptr locationHandle, + const folly::dynamic& options = {}) const override; + + std::shared_ptr makeLocationHandle( + const std::string& connectorId, + connector::ConnectorLocationHandle::TableType tableType = + connector::ConnectorLocationHandle::TableType::kNew, + const folly::dynamic& options = {}) const override; + + core::PartitionFunctionSpecPtr makePartitionFunctionSpec( + const std::string& connectorId, + const folly::dynamic& options = {}) const override; + + private: + std::shared_ptr pool_{ + memory::memoryManager()->addLeafPool()}; + dwio::common::FileFormat defaultFileFormat_{ + dwio::common::FileFormat::PARQUET}; }; class HivePartitionFunctionSpec : public core::PartitionFunctionSpec { @@ -151,4 +193,8 @@ class HivePartitionFunctionSpec : public core::PartitionFunctionSpec { const std::vector constValues_; }; +bool registerHiveConnectorFactory(std::unique_ptr); + +bool unregisterHiveConnectorFactory(); + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 3485c2330fa..bd68b431b91 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -17,6 +17,7 @@ #include #include + #include "velox/connectors/Connector.h" #include "velox/connectors/hive/FileProperties.h" #include "velox/connectors/hive/TableHandle.h" diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 36c797e4eaf..9abc1e4f72b 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -291,6 +291,25 @@ HiveBucketProperty::HiveBucketProperty( validate(); } +HiveBucketProperty::HiveBucketProperty( + Kind kind, + int32_t bucketCount, + const std::vector& bucketedBy, + const std::vector& bucketTypes, + const std::vector& sortColumns, + const std::vector& sortOrders) + : kind_(kind), + bucketCount_(bucketCount), + bucketedBy_(bucketedBy), + bucketTypes_(bucketTypes) { + VELOX_CHECK_EQ(sortColumns.size(), sortOrders.size()); + for (size_t i = 0; i < sortColumns.size(); ++i) { + sortedBy_.push_back( + std::make_shared(sortColumns[i], sortOrders[i])); + } + validate(); +} + void HiveBucketProperty::validate() const { VELOX_USER_CHECK_GT(bucketCount_, 0, "Hive bucket count can't be zero"); VELOX_USER_CHECK(!bucketedBy_.empty(), "Hive bucket columns must be set"); @@ -332,9 +351,22 @@ std::shared_ptr HiveBucketProperty::deserialize( ISerializable::deserialize>(obj["bucketedBy"]); const auto bucketedTypes = ISerializable::deserialize>( obj["bucketedTypes"], context); - const auto sortedBy = - ISerializable::deserialize>( - obj["sortedBy"], context); + + std::vector> sortedBy; + if (auto sb = obj.get_ptr("sortedBy")) { + sortedBy = ISerializable::deserialize>(*sb); + } else if (auto sc = obj.get_ptr("sortColumns")) { + auto sortColumns = + ISerializable::deserialize>(*sc); + if (auto so = obj.get_ptr("sortOrders")) { + VELOX_CHECK_EQ(sortColumns.size(), so->size()); + for (int i = 0; i < so->size(); i++) { + auto sortOrder = core::SortOrder::deserialize((*so)[i]); + sortedBy.emplace_back(std::make_shared( + sortColumns[i], sortOrder)); + } + } + } return std::make_shared( kind, bucketCount, buckectedBy, bucketedTypes, sortedBy); } @@ -1201,7 +1233,7 @@ std::string LocationHandle::toString() const { "LocationHandle [targetPath: {}, writePath: {}, tableType: {}, tableFileName: {}]", targetPath_, writePath_, - tableTypeName(tableType_), + tableTypeName(tableType()), targetFileName_); } @@ -1215,7 +1247,7 @@ folly::dynamic LocationHandle::serialize() const { obj["name"] = "LocationHandle"; obj["targetPath"] = targetPath_; obj["writePath"] = writePath_; - obj["tableType"] = tableTypeName(tableType_); + obj["tableType"] = tableTypeName(tableType()); obj["targetFileName"] = targetFileName_; return obj; } diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 8c305b7595d..d955ee5ff62 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -17,6 +17,7 @@ #include "velox/common/compression/Compression.h" #include "velox/connectors/Connector.h" +#include "velox/connectors/ConnectorNames.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/PartitionIdGenerator.h" #include "velox/connectors/hive/TableHandle.h" @@ -31,24 +32,18 @@ class LocationHandle; using LocationHandlePtr = std::shared_ptr; /// Location related properties of the Hive table to be written. -class LocationHandle : public ISerializable { +class LocationHandle : public ConnectorLocationHandle { public: - enum class TableType { - /// Write to a new table to be created. - kNew, - /// Write to an existing table. - kExisting, - }; - LocationHandle( std::string targetPath, std::string writePath, TableType tableType, - std::string targetFileName = "") - : targetPath_(std::move(targetPath)), + std::string targetFileName = "", + const std::string& connectorId = kHiveConnectorName) + : ConnectorLocationHandle(connectorId, tableType), + targetPath_(std::move(targetPath)), targetFileName_(std::move(targetFileName)), - writePath_(std::move(writePath)), - tableType_(tableType) {} + writePath_(std::move(writePath)) {} const std::string& targetPath() const { return targetPath_; @@ -62,16 +57,12 @@ class LocationHandle : public ISerializable { return writePath_; } - TableType tableType() const { - return tableType_; - } + std::string toString() const override; - std::string toString() const; + folly::dynamic serialize() const override; static void registerSerDe(); - folly::dynamic serialize() const override; - static LocationHandlePtr create(const folly::dynamic& obj); static const std::string tableTypeName(LocationHandle::TableType type); @@ -85,8 +76,6 @@ class LocationHandle : public ISerializable { const std::string targetFileName_; // Staging directory path. const std::string writePath_; - // Whether the table to be written is new, already existing or temporary. - const TableType tableType_; }; class HiveSortingColumn : public ISerializable { @@ -129,6 +118,14 @@ class HiveBucketProperty : public ISerializable { const std::vector& bucketedTypes, const std::vector>& sortedBy); + HiveBucketProperty( + Kind kind, + int32_t bucketCount, + const std::vector& bucketedBy, + const std::vector& bucketedTypes, + const std::vector& sortColumns, + const std::vector& sortOrders); + Kind kind() const { return kind_; } @@ -177,7 +174,7 @@ class HiveBucketProperty : public ISerializable { const int32_t bucketCount_; const std::vector bucketedBy_; const std::vector bucketTypes_; - const std::vector> sortedBy_; + std::vector> sortedBy_; }; FOLLY_ALWAYS_INLINE std::ostream& operator<<( diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/GcsMultipleEndpointsTest.cpp b/velox/connectors/hive/storage_adapters/gcs/tests/GcsMultipleEndpointsTest.cpp index 00bb3310a06..0889ce7a92b 100644 --- a/velox/connectors/hive/storage_adapters/gcs/tests/GcsMultipleEndpointsTest.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/tests/GcsMultipleEndpointsTest.cpp @@ -91,6 +91,7 @@ class GcsMultipleEndpointsTest : public testing::Test, 0, {}, {}, + {}, dwio::common::FileFormat::PARQUET, {}, connectorId) diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp index cf446b12d2e..2967b1a52ac 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp @@ -93,6 +93,7 @@ class S3MultipleEndpoints : public S3Test, public ::test::VectorTestBase { 0, {}, {}, + {}, dwio::common::FileFormat::PARQUET, {}, connectorId) diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index 0a24b6b10c4..a6fbb5e2266 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -19,6 +19,7 @@ add_executable( HiveConnectorTest.cpp HiveConnectorUtilTest.cpp HiveConnectorSerDeTest.cpp + HiveObjectFactoryTest.cpp HivePartitionFunctionTest.cpp HivePartitionUtilTest.cpp HiveSplitTest.cpp diff --git a/velox/connectors/hive/tests/HiveObjectFactoryTest.cpp b/velox/connectors/hive/tests/HiveObjectFactoryTest.cpp new file mode 100644 index 00000000000..cdc68498db2 --- /dev/null +++ b/velox/connectors/hive/tests/HiveObjectFactoryTest.cpp @@ -0,0 +1,242 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/memory/MemoryAllocator.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/exec/tests/utils/OperatorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" + +namespace facebook::velox::connector::hive::test { + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive; +using facebook::velox::connector::ConnectorLocationHandle; + +static constexpr char kConnectorId[] = "hive-test"; + +class HiveObjectFactoryTest : public exec::test::OperatorTestBase { + public: + static void SetUpTestCase() { + OperatorTestBase::SetUpTestCase(); + + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + connector::hive::HiveInsertFileNameGenerator::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + Type::registerSerDe(); + } + + void TearDown() override { + factory_.reset(); + } + + std::shared_ptr makeHiveColumnHandle( + const std::string& name, + HiveColumnHandle::ColumnType columnType, + const TypePtr& dataType, + const TypePtr& hiveType, + std::vector requiredSubfields) { + folly::dynamic opts = folly::dynamic::object; + + // columnType would be serialized as int32_t + opts["columnType"] = (int32_t)columnType; + opts["hiveType"] = hiveType->serialize(); + + opts["requiredSubfields"] = folly::dynamic::array(); + for (const auto& subfield : requiredSubfields) { + opts["requiredSubfields"].push_back(subfield); + } + + auto colHandle = + factory_->makeColumnHandle(kConnectorId, "colX", dataType, opts); + + return colHandle; + } + + protected: + std::unique_ptr factory_; +}; + +TEST_F(HiveObjectFactoryTest, MakeConnectorSplitDefaults) { + // No options: only filePath/start/length/connectorId are set + auto splitPtr = factory_->makeConnectorSplit( + kConnectorId, "s3://bucket/path/file.orc", 123, 456); + auto* split = dynamic_cast(splitPtr.get()); + ASSERT_NE(split, nullptr); + EXPECT_EQ(split->filePath, "s3://bucket/path/file.orc"); + EXPECT_EQ(split->start, 123); + EXPECT_EQ(split->length, 456); + EXPECT_EQ(split->connectorId, kConnectorId); + + // Defaults: PARQUET format, weight=0, cacheable=true + // TODO: use constant literal + EXPECT_EQ(split->fileFormat, dwio::common::FileFormat::PARQUET); + EXPECT_EQ(split->splitWeight, 0); + EXPECT_TRUE(split->cacheable); +} + +TEST_F(HiveObjectFactoryTest, MakeConnectorSplitWithOptions) { + folly::dynamic opts = folly::dynamic::object; + + // Basic configuration + opts["fileFormat"] = static_cast(dwio::common::FileFormat::PARQUET); + opts["splitWeight"] = 42; + opts["cacheable"] = true; + + // Info columns map: column name -> info string + opts["infoColumns"] = folly::dynamic::object; + opts["infoColumns"]["colA"] = "infoA"; + opts["infoColumns"]["colB"] = "infoB"; + + // Partition keys map: key -> value + opts["partitionKeys"] = folly::dynamic::object; + opts["partitionKeys"]["p1"] = "v1"; + + auto splitPtr = + factory_->makeConnectorSplit(kConnectorId, "/tmp/f.p", 0, 10, opts); + auto* split = dynamic_cast(splitPtr.get()); + ASSERT_NE(split, nullptr); + + EXPECT_EQ(split->fileFormat, dwio::common::FileFormat::PARQUET); + EXPECT_EQ(split->splitWeight, 42); + EXPECT_TRUE(split->cacheable); + + // infoColumns + auto info = split->infoColumns; + EXPECT_EQ(info.at("colA"), "infoA"); + EXPECT_EQ(info.at("colB"), "infoB"); + + // partitionKeys + auto parts = split->partitionKeys; + ASSERT_EQ(parts.size(), 1); + EXPECT_EQ(parts.count("p1"), 1); + EXPECT_EQ(parts["p1"], "v1"); +} + +TEST_F(HiveObjectFactoryTest, MakeTableHandleWithOptions) { + // Build a RowType for data columns: two ints + auto rowType = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); + + // ColumnHandles + auto c0 = makeHiveColumnHandle( + "c0", + HiveColumnHandle::ColumnType::kRegular, + INTEGER(), + INTEGER(), + {"c0"}); + auto c1 = makeHiveColumnHandle( + "c1", HiveColumnHandle::ColumnType::kRegular, BIGINT(), BIGINT(), {"c1"}); + + // Options: disable filter pushdown, add subfield filter & remaining filter & + // tableParameters + folly::dynamic opts = folly::dynamic::object; + + // Basic configuration + opts["filterPushdownEnabled"] = false; + + // common::SubfieldFilters : std::unordered_map; + auto filter = std::make_unique(-100, 100, false); + opts["subfieldFilters"] = folly::dynamic::object; + opts["subfieldFilters"]["c0"] = filter->serialize(); + + // core::TypedExprPtr remainingFilter : std::shared_ptr; + auto remainingFilter = parseExpr("c1 + 1 > 1", rowType); + opts["remainingFilter"] = remainingFilter->serialize(); + + // Arbitrary table parameters + opts["tableParameters"] = folly::dynamic::object; + opts["tableParameters"]["pA"] = "vA"; + + auto handlePtr = + factory_->makeTableHandle(kConnectorId, "tbl", {c0, c1}, opts); + auto* hiveHandle = dynamic_cast(handlePtr.get()); + ASSERT_NE(hiveHandle, nullptr); + + EXPECT_FALSE(hiveHandle->isFilterPushdownEnabled()); + + const auto& subfieldFilters = hiveHandle->subfieldFilters(); + ASSERT_EQ(subfieldFilters.count(common::Subfield("c0")), 1); + subfieldFilters.at(common::Subfield("c0"))->testingEquals(*filter); + + auto remainingFilterCreated = hiveHandle->remainingFilter(); + ASSERT_NE(remainingFilterCreated, nullptr); + ASSERT_EQ(*remainingFilterCreated, *remainingFilter); + + auto params = hiveHandle->tableParameters(); + ASSERT_EQ(params.size(), 1); + EXPECT_EQ(params.at("pA"), "vA"); +} + +TEST_F(HiveObjectFactoryTest, MakeColumnHandle) { + auto colHandle = makeHiveColumnHandle( + "colX", + HiveColumnHandle::ColumnType::kRegular, + BIGINT(), + BIGINT(), + std::vector({"f1", "f2"})); + + auto* hiveColumnHandle = dynamic_cast(colHandle.get()); + ASSERT_NE(hiveColumnHandle, nullptr); + + EXPECT_EQ(hiveColumnHandle->name(), "colX"); + EXPECT_EQ( + hiveColumnHandle->columnType(), HiveColumnHandle::ColumnType::kRegular); + EXPECT_EQ(hiveColumnHandle->dataType(), BIGINT()); + EXPECT_EQ(hiveColumnHandle->hiveType(), BIGINT()); + + std::vector requiredSubfields; + requiredSubfields.emplace_back("f1"); + requiredSubfields.emplace_back("f2"); + EXPECT_EQ(hiveColumnHandle->requiredSubfields(), requiredSubfields); +} + +TEST_F(HiveObjectFactoryTest, MakeLocationHandle) { + folly::dynamic opts = folly::dynamic::object; + + // Basic configuration + opts["targetPath"] = "/tmp/out1"; + opts["writePath"] = "/tmp/out2"; + opts["targetFileName"] = "test.parquet"; + + // Default: writeDirectory == targetDirectory + auto locationHandle1 = factory_->makeLocationHandle( + kConnectorId, ConnectorLocationHandle::TableType::kNew, opts); + + auto* hiveLocationHandle = + dynamic_cast(locationHandle1.get()); + ASSERT_NE(hiveLocationHandle, nullptr); + + EXPECT_EQ(hiveLocationHandle->targetPath(), "/tmp/out1"); + EXPECT_EQ(hiveLocationHandle->writePath(), "/tmp/out2"); + EXPECT_EQ(hiveLocationHandle->targetFileName(), "test.parquet"); + EXPECT_EQ( + locationHandle1->tableType(), ConnectorLocationHandle::TableType::kNew); +} + +} // namespace facebook::velox::connector::hive::test diff --git a/velox/connectors/tpcds/TpcdsConnector.cpp b/velox/connectors/tpcds/TpcdsConnector.cpp index 5902f49da4c..e5ca847ef8e 100644 --- a/velox/connectors/tpcds/TpcdsConnector.cpp +++ b/velox/connectors/tpcds/TpcdsConnector.cpp @@ -16,6 +16,7 @@ #include +#include "velox/connectors/ConnectorNames.h" #include "velox/connectors/tpcds/TpcdsConnector.h" #include "velox/tpcds/gen/DSDGenIterator.h" @@ -138,4 +139,14 @@ std::optional TpcdsDataSource::next( return projectOutputColumns(outputVector); } + +bool registerTpcdsConnectorFactory( + std::unique_ptr factory) { + connector::registerConnectorFactory(std::move(factory)); +} + +bool unregisterTpcdsConnectorFactory() { + connector::unregisterConnectorFactory(connector::kTpcdsConnectorName); +} + } // namespace facebook::velox::connector::tpcds diff --git a/velox/connectors/tpcds/TpcdsConnector.h b/velox/connectors/tpcds/TpcdsConnector.h index 88ad845ff39..99870fc31d8 100644 --- a/velox/connectors/tpcds/TpcdsConnector.h +++ b/velox/connectors/tpcds/TpcdsConnector.h @@ -179,6 +179,13 @@ class TpcdsConnectorFactory : public ConnectorFactory { folly::Executor* cpuExecutor = nullptr) override { return std::make_shared(id, config, ioExecutor); } + + // TODO: Add object makers like makeTableHandle, makeColumnHandle, etc. }; +bool registerTpchConnectorFactory( + std::unique_ptr factory); + +bool unregisterTpcdsConnectorFactory(); + } // namespace facebook::velox::connector::tpcds diff --git a/velox/connectors/tpch/TpchConnector.cpp b/velox/connectors/tpch/TpchConnector.cpp index cf70061aada..f1c627809df 100644 --- a/velox/connectors/tpch/TpchConnector.cpp +++ b/velox/connectors/tpch/TpchConnector.cpp @@ -15,6 +15,8 @@ */ #include "velox/connectors/tpch/TpchConnector.h" + +#include "velox/connectors/ConnectorNames.h" #include "velox/exec/OperatorUtils.h" #include "velox/expression/Expr.h" #include "velox/tpch/gen/TpchGen.h" @@ -29,6 +31,15 @@ TpchConnector::TpchConnector( folly::Executor* /*executor*/) : Connector(id, std::move(config)) {} +bool registerTpchConnectorFactory( + std::unique_ptr factory) { + connector::registerConnectorFactory(std::move(factory)); +} + +bool unregisterTpcdsConnectorFactory() { + connector::unregisterConnectorFactory(connector::kTpchConnectorName); +} + namespace { RowVectorPtr getTpchData( diff --git a/velox/connectors/tpch/TpchConnector.h b/velox/connectors/tpch/TpchConnector.h index 5d006490bad..7eeca9bbafd 100644 --- a/velox/connectors/tpch/TpchConnector.h +++ b/velox/connectors/tpch/TpchConnector.h @@ -34,7 +34,7 @@ class TpchColumnHandle : public ColumnHandle { public: explicit TpchColumnHandle(const std::string& name) : name_(name) {} - const std::string& name() const { + const std::string& name() const override { return name_; } @@ -196,7 +196,14 @@ class TpchConnectorFactory : public ConnectorFactory { folly::Executor* ioExecutor = nullptr, folly::Executor* cpuExecutor = nullptr) override { return std::make_shared(id, config, ioExecutor); + + // TODO: Add object makers like makeTableHandle, makeColumnHandle, etc. } }; +bool registerTpchConnectorFactory( + std::unique_ptr factory); + +bool unregisterTpcdsConnectorFactory(); + } // namespace facebook::velox::connector::tpch diff --git a/velox/core/PartitionFunction.h b/velox/core/PartitionFunction.h new file mode 100644 index 00000000000..da342ec860f --- /dev/null +++ b/velox/core/PartitionFunction.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::core { + +/// Calculates partition number for each row of the specified vector. +class PartitionFunction { + public: + virtual ~PartitionFunction() = default; + + /// @param input RowVector to split into partitions. + /// @param [out] partitions Computed partition numbers for each row in + /// 'input'. + /// @return Returns partition number in case all rows of 'input' are + /// assigned to the same partition. In this case 'partitions' vector is left + /// unchanged. Used to optimize round-robin partitioning in local exchange. + virtual std::optional partition( + const RowVector& input, + std::vector& partitions) = 0; +}; + +/// Factory class for creating PartitionFunction instances. +class PartitionFunctionSpec : public ISerializable { + public: + /// If 'localExchange' is true, the partition function is used for local + /// exchange within a velox task. + virtual std::unique_ptr create( + int numPartitions, + bool localExchange = false) const = 0; + + virtual ~PartitionFunctionSpec() = default; + + virtual std::string toString() const = 0; +}; + +using PartitionFunctionSpecPtr = std::shared_ptr; + +class GatherPartitionFunctionSpec : public PartitionFunctionSpec { + public: + std::unique_ptr create( + int /*numPartitions*/, + bool /*localExchange*/) const override { + VELOX_UNREACHABLE(); + } + + std::string toString() const override { + return "gather"; + } + + folly::dynamic serialize() const override { + folly::dynamic obj = folly::dynamic::object; + obj["name"] = "GatherPartitionFunctionSpec"; + return obj; + } + + static PartitionFunctionSpecPtr deserialize( + const folly::dynamic& /* obj */, + void* /* context */) { + return std::make_shared(); + } +}; + +} // namespace facebook::velox::core diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 20800bbe30c..248eda3aa6d 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -22,6 +22,7 @@ #include "velox/common/Enums.h" #include "velox/connectors/Connector.h" #include "velox/core/Expressions.h" +#include "velox/core/PartitionFunction.h" #include "velox/core/QueryConfig.h" #include "velox/vector/VectorStream.h" @@ -36,6 +37,7 @@ using PlanNodeId = std::string; /// Generic representation of InsertTable struct InsertTableHandle { + // TODO: Merge into connectors/Connector.h public: InsertTableHandle( const std::string& connectorId, @@ -2340,63 +2342,6 @@ class LocalMergeNode : public PlanNode { using LocalMergeNodePtr = std::shared_ptr; -/// Calculates partition number for each row of the specified vector. -class PartitionFunction { - public: - virtual ~PartitionFunction() = default; - - /// @param input RowVector to split into partitions. - /// @param [out] partitions Computed partition numbers for each row in - /// 'input'. - /// @return Returns partition number in case all rows of 'input' are - /// assigned to the same partition. In this case 'partitions' vector is left - /// unchanged. Used to optimize round-robin partitioning in local exchange. - virtual std::optional partition( - const RowVector& input, - std::vector& partitions) = 0; -}; - -/// Factory class for creating PartitionFunction instances. -class PartitionFunctionSpec : public ISerializable { - public: - /// If 'localExchange' is true, the partition function is used for local - /// exchange within a velox task. - virtual std::unique_ptr create( - int numPartitions, - bool localExchange = false) const = 0; - - virtual ~PartitionFunctionSpec() = default; - - virtual std::string toString() const = 0; -}; - -using PartitionFunctionSpecPtr = std::shared_ptr; - -class GatherPartitionFunctionSpec : public PartitionFunctionSpec { - public: - std::unique_ptr create( - int /*numPartitions*/, - bool /*localExchange*/) const override { - VELOX_UNREACHABLE(); - } - - std::string toString() const override { - return "gather"; - } - - folly::dynamic serialize() const override { - folly::dynamic obj = folly::dynamic::object; - obj["name"] = "GatherPartitionFunctionSpec"; - return obj; - } - - static PartitionFunctionSpecPtr deserialize( - const folly::dynamic& /* obj */, - void* /* context */) { - return std::make_shared(); - } -}; - /// Partitions data using specified partition function. The number of /// partitions is determined by the parallelism of the upstream pipeline. Can /// be used to gather data from multiple sources. diff --git a/velox/dwio/common/Options.cpp b/velox/dwio/common/Options.cpp index 33946f71159..10abb94e9fe 100644 --- a/velox/dwio/common/Options.cpp +++ b/velox/dwio/common/Options.cpp @@ -15,6 +15,7 @@ */ #include "velox/dwio/common/Options.h" +#include "velox/common/compression/Compression.h" namespace facebook::velox::dwio::common { @@ -77,4 +78,88 @@ ColumnReaderOptions makeColumnReaderOptions(const ReaderOptions& options) { return columnReaderOptions; } +folly::dynamic WriterOptions::serialize() const { + folly::dynamic obj = folly::dynamic::object; + + // 1) Schema + if (schema) { + obj["schema"] = schema->serialize(); + } + + // 2) compressionKind + if (compressionKind) { + obj["compressionKind"] = static_cast(*compressionKind); + } + + // 3) serdeParameters + if (!serdeParameters.empty()) { + folly::dynamic mapObj = folly::dynamic::object; + for (auto& [k, v] : serdeParameters) { + mapObj[k] = v; + } + obj["serdeParameters"] = std::move(mapObj); + } + + // 4) sessionTimezoneName + if (!sessionTimezoneName.empty()) { + obj["sessionTimezoneName"] = sessionTimezoneName; + } + + // 5) adjustTimestampToTimezone + obj["adjustTimestampToTimezone"] = adjustTimestampToTimezone; + + // (We do *not* serialize pool, spillConfig, nonReclaimableSection, + // or the factory functions—they must be re‐injected by the host.) + + return obj; +} + +std::shared_ptr WriterOptions::deserialize( + const folly::dynamic& obj) { + auto opts = std::make_shared(); + + // 1) schema + if (auto p = obj.get_ptr("schema")) { + opts->schema = ISerializable::deserialize(*p, nullptr); + } + + // 2) compressionKind + if (auto p = obj.get_ptr("compressionKind")) { + opts->compressionKind = + static_cast(p->asInt()); + } + + // 3) serdeParameters + if (auto p = obj.get_ptr("serdeParameters")) { + opts->serdeParameters.clear(); + for (auto& kv : p->items()) { + opts->serdeParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + // 4) sessionTimezoneName + if (auto p = obj.get_ptr("sessionTimezoneName")) { + opts->sessionTimezoneName = p->asString(); + } + + // 5) adjustTimestampToTimezone + if (auto p = obj.get_ptr("adjustTimestampToTimezone")) { + opts->adjustTimestampToTimezone = p->asBool(); + } + + // pool, spillConfig, nonReclaimableSection, factories remain at default + return opts; +} + +void WriterOptions::registerSerDe() { + auto& registry = DeserializationRegistryForSharedPtr(); + registry.Register("WriterOptions", WriterOptions::deserialize); +} + +// force registration at load‐time +static bool _writerOptionsSerdeRegistered = []() { + WriterOptions::registerSerDe(); + return true; +}(); + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index ef70cb72379..2de86d4e858 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -684,7 +684,7 @@ class ReaderOptions : public io::ReaderOptions { bool allowEmptyFile_{false}; }; -struct WriterOptions { +struct WriterOptions : public ISerializable { TypePtr schema{nullptr}; velox::memory::MemoryPool* memoryPool{nullptr}; const velox::common::SpillConfig* spillConfig{nullptr}; @@ -713,6 +713,10 @@ struct WriterOptions { const config::ConfigBase& session) {} virtual ~WriterOptions() = default; + + folly::dynamic serialize() const override; + static std::shared_ptr deserialize(const folly::dynamic& obj); + static void registerSerDe(); }; // Options for creating a column reader. diff --git a/velox/exec/fuzzer/WriterFuzzer.cpp b/velox/exec/fuzzer/WriterFuzzer.cpp index 9c2132c7403..eae1791d284 100644 --- a/velox/exec/fuzzer/WriterFuzzer.cpp +++ b/velox/exec/fuzzer/WriterFuzzer.cpp @@ -26,6 +26,7 @@ #include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/exec/fuzzer/FuzzerUtil.h" @@ -145,7 +146,10 @@ class WriterFuzzer { int32_t bucketCount, const std::vector& bucketColumns, int32_t sortColumnOffset, - const std::vector>& sortBy, + const std::vector& sortColumnNames, + const std::vector& sortOrders, + // const std::vector>& + // sortBy, const std::shared_ptr& outputDirectoryPath); // Generates table column handles based on table column properties @@ -342,7 +346,9 @@ void WriterFuzzer::go() { int32_t bucketCount = 0; std::vector bucketColumns; int32_t sortColumnOffset = 0; - std::vector> sortBy; + // std::vector> sortBy; + std::vector sortColumnNames; + std::vector sortOrders; // Regular table columns generateColumns(5, "c", kRegularColumnTypes_, 2, names, types); @@ -362,13 +368,11 @@ void WriterFuzzer::go() { auto [sortColumns, offset] = generateSortColumns(3, bucketColumns, names, types); sortColumnOffset -= offset; - sortBy.reserve(sortColumns.size()); - for (const auto& sortByColumn : sortColumns) { - sortBy.push_back(std::make_shared( - sortByColumn, - kSortOrderTypes_.at( - boost::random::uniform_int_distribution( - 0, 1)(rng_)))); + sortColumnNames = std::move(sortColumns); + sortOrders.reserve(sortColumnNames.size()); + for (const auto& sortByColumn : sortColumnNames) { + sortOrders.push_back(kSortOrderTypes_.at( + boost::random::uniform_int_distribution(0, 1)(rng_))); } } } @@ -391,7 +395,8 @@ void WriterFuzzer::go() { bucketCount, bucketColumns, sortColumnOffset, - sortBy, + sortColumnNames, + sortOrders, outputDirPath); LOG(INFO) << "==============================> Done with iteration " @@ -499,7 +504,9 @@ void WriterFuzzer::verifyWriter( const int32_t bucketCount, const std::vector& bucketColumns, const int32_t sortColumnOffset, - const std::vector>& sortBy, + const std::vector& sortColumnNames, + const std::vector& sortOrders, + // const std::vector>& sortBy, const std::shared_ptr& outputDirectoryPath) { const auto plan = PlanBuilder() .values(input) @@ -508,7 +515,8 @@ void WriterFuzzer::verifyWriter( partitionKeys, bucketCount, bucketColumns, - sortBy) + sortColumnNames, + sortOrders) .planNode(); const auto maxDrivers = @@ -588,13 +596,20 @@ void WriterFuzzer::verifyWriter( } // 4. Verifies sorting. - if (sortBy.size() > 0) { - const std::vector sortColumnNames = { - names.begin() + sortColumnOffset, - names.begin() + sortColumnOffset + sortBy.size()}; + if (sortColumnNames.size() > 0) { + // const std::vector sortColumnNames = { + // names.begin() + sortColumnOffset, + // names.begin() + sortColumnOffset + sortBy.size()}; const std::vector sortColumnTypes = { types.begin() + sortColumnOffset, - types.begin() + sortColumnOffset + sortBy.size()}; + types.begin() + sortColumnOffset + sortColumnNames.size()}; + + std::vector> sortBy; + sortBy.reserve(sortColumnNames.size()); + for (size_t i = 0; i < sortColumnNames.size(); ++i) { + sortBy.emplace_back(std::make_shared( + sortColumnNames[i], sortOrders[i])); + } // Read from each file and check if data is sorted as presto sorted // result. diff --git a/velox/exec/tests/PlanBuilderTest.cpp b/velox/exec/tests/PlanBuilderTest.cpp index ca2a2c073f7..11212b193c5 100644 --- a/velox/exec/tests/PlanBuilderTest.cpp +++ b/velox/exec/tests/PlanBuilderTest.cpp @@ -315,6 +315,7 @@ TEST_F(PlanBuilderTest, commitStrategyParameter) { 0, {}, {}, + {}, dwio::common::FileFormat::DWRF, {}, PlanBuilder::kHiveDefaultConnectorId, diff --git a/velox/exec/tests/TableEvolutionFuzzer.cpp b/velox/exec/tests/TableEvolutionFuzzer.cpp index a353cbe5057..9fb7559d8c3 100644 --- a/velox/exec/tests/TableEvolutionFuzzer.cpp +++ b/velox/exec/tests/TableEvolutionFuzzer.cpp @@ -846,7 +846,8 @@ std::unique_ptr TableEvolutionFuzzer::makeWriteTask( /*partitionBy=*/{}, /*bucketCount=*/0, /*bucketedBy=*/{}, - /*sortBy=*/{}, + /*sortColumns=*/{}, + /*sortBys=*/{}, setup.fileFormat, /*aggregates=*/{}, /*connectorId=*/PlanBuilder::kHiveDefaultConnectorId, @@ -866,7 +867,8 @@ std::unique_ptr TableEvolutionFuzzer::makeWriteTask( /*partitionBy=*/{}, setup.bucketCount(), bucketColumnNames, - /*sortBy=*/{}, + /*sortColumns=*/{}, + /*sortBys=*/{}, setup.fileFormat, /*aggregates=*/{}, /*connectorId=*/PlanBuilder::kHiveDefaultConnectorId, @@ -877,7 +879,8 @@ std::unique_ptr TableEvolutionFuzzer::makeWriteTask( /*partitionBy=*/{}, setup.bucketCount(), bucketColumnNames, - /*sortBy=*/{}, + /*sortColumns=*/{}, + /*sortBys=*/{}, setup.fileFormat); } } diff --git a/velox/exec/tests/TableWriterTest.cpp b/velox/exec/tests/TableWriterTest.cpp index 2b91f729c37..4c5b248701d 100644 --- a/velox/exec/tests/TableWriterTest.cpp +++ b/velox/exec/tests/TableWriterTest.cpp @@ -130,6 +130,7 @@ TEST_F(BasicTableWriterTestBase, structAsMap) { 0, {}, {}, + {}, dwio::common::FileFormat::DWRF, {}, PlanBuilder::kHiveDefaultConnectorId, @@ -2932,10 +2933,8 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { {"c0"}, 4, {"c1"}, - { - std::make_shared( - "c2", core::SortOrder{false, false}), - }) + {"c2"}, + {core::SortOrder{false, false}}) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, @@ -3330,10 +3329,8 @@ DEBUG_ONLY_TEST_F( {"c0"}, 4, {"c1"}, - { - std::make_shared( - "c2", core::SortOrder{false, false}), - }) + {"c2"}, + {core::SortOrder{false, false}}) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, diff --git a/velox/exec/tests/VeloxIn10MinDemo.cpp b/velox/exec/tests/VeloxIn10MinDemo.cpp index 87d571a1788..4dd899c1a3b 100644 --- a/velox/exec/tests/VeloxIn10MinDemo.cpp +++ b/velox/exec/tests/VeloxIn10MinDemo.cpp @@ -14,7 +14,9 @@ * limitations under the License. */ #include + #include "velox/common/memory/Memory.h" +#include "velox/connectors/ConnectorNames.h" #include "velox/connectors/tpch/TpchConnector.h" #include "velox/connectors/tpch/TpchConnectorSplit.h" #include "velox/core/Expressions.h" @@ -53,8 +55,7 @@ class VeloxIn10MinDemo : public VectorTestBase { // Create and register a TPC-H connector. auto tpchConnector = - connector::getConnectorFactory( - connector::tpch::TpchConnectorFactory::kTpchConnectorName) + connector::getConnectorFactory(connector::kTpchConnectorName) ->newConnector( kTpchConnectorId, std::make_shared( @@ -64,8 +65,7 @@ class VeloxIn10MinDemo : public VectorTestBase { ~VeloxIn10MinDemo() { connector::unregisterConnector(kTpchConnectorId); - connector::unregisterConnectorFactory( - connector::tpch::TpchConnectorFactory::kTpchConnectorName); + connector::unregisterConnectorFactory(connector::kTpchConnectorName); } /// Parse SQL expression into a typed expression tree using DuckDB SQL parser. diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index cf3ef9dbb10..d6b9b858ddc 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -43,6 +43,8 @@ target_link_libraries( velox_vector_test_lib velox_vector_fuzzer velox_temp_path + velox_connector + velox_registered_connector_factories velox_cursor velox_core velox_exception diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 63995ab948a..984d702f05b 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -15,10 +15,9 @@ */ #include "velox/exec/tests/utils/PlanBuilder.h" -#include "velox/connectors/hive/HiveConnector.h" -#include "velox/connectors/hive/TableHandle.h" #include "velox/connectors/tpcds/TpcdsConnector.h" #include "velox/connectors/tpch/TpchConnector.h" +#include "velox/core/PartitionFunction.h" #include "velox/duckdb/conversion/DuckParser.h" #include "velox/exec/Aggregate.h" #include "velox/exec/HashPartitionFunction.h" @@ -36,7 +35,6 @@ using namespace facebook::velox; using namespace facebook::velox::connector; -using namespace facebook::velox::connector::hive; namespace facebook::velox::exec::test { namespace { @@ -50,23 +48,6 @@ core::TypedExprPtr parseExpr( return core::Expressions::inferTypes(untyped, rowType, pool); } -std::shared_ptr buildHiveBucketProperty( - const RowTypePtr rowType, - int32_t bucketCount, - const std::vector& bucketColumns, - const std::vector>& sortBy) { - std::vector bucketTypes; - bucketTypes.reserve(bucketColumns.size()); - for (const auto& bucketColumn : bucketColumns) { - bucketTypes.push_back(rowType->childAt(rowType->getChildIdx(bucketColumn))); - } - return std::make_shared( - HiveBucketProperty::Kind::kHiveCompatible, - bucketCount, - bucketColumns, - bucketTypes, - sortBy); -} } // namespace PlanBuilder& PlanBuilder::tableScan( @@ -74,8 +55,11 @@ PlanBuilder& PlanBuilder::tableScan( const std::vector& subfieldFilters, const std::string& remainingFilter, const RowTypePtr& dataColumns, - const connector::ColumnHandleMap& assignments) { - return TableScanBuilder(*this) + const connector::ColumnHandleMap& assignments, + const std::string& connectorName, + const std::string& connectorId) { + return TableScanBuilder(*this, connectorName) + .connectorId(connectorId) .filtersAsNode(filtersAsNode_ ? planNodeIdGenerator_ : nullptr) .outputType(outputType) .assignments(assignments) @@ -92,14 +76,16 @@ PlanBuilder& PlanBuilder::tableScan( const std::vector& subfieldFilters, const std::string& remainingFilter, const RowTypePtr& dataColumns, - const connector::ColumnHandleMap& assignments) { - return TableScanBuilder(*this) + const connector::ColumnHandleMap& assignments, + const std::string& connectorName, + const std::string& connectorId) { + return TableScanBuilder(*this, connectorName) + .connectorId(connectorId) .filtersAsNode(filtersAsNode_ ? planNodeIdGenerator_ : nullptr) .tableName(tableName) .outputType(outputType) .columnAliases(columnAliases) .dataColumns(dataColumns) - .subfieldFilters(subfieldFilters) .remainingFilter(remainingFilter) .assignments(assignments) @@ -110,8 +96,11 @@ PlanBuilder& PlanBuilder::tableScanWithPushDown( const RowTypePtr& outputType, const PushdownConfig& pushdownConfig, const RowTypePtr& dataColumns, - const connector::ColumnHandleMap& assignments) { - return TableScanBuilder(*this) + const connector::ColumnHandleMap& assignments, + const std::string& connectorName, + const std::string& connectorId) { + return TableScanBuilder(*this, connectorName) + .connectorId(connectorId) .filtersAsNode(filtersAsNode_ ? planNodeIdGenerator_ : nullptr) .outputType(outputType) .assignments(assignments) @@ -259,29 +248,28 @@ void addConjunct( core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) { VELOX_CHECK_NOT_NULL(outputType_, "outputType must be specified"); + VELOX_CHECK_NOT_NULL( + connectorFactory_, "connectorFactory_ must be specified"); std::unordered_map typedMapping; + std::vector> columnHandles; bool hasAssignments = !(assignments_.empty()); for (uint32_t i = 0; i < outputType_->size(); ++i) { const auto& name = outputType_->nameOf(i); const auto& type = outputType_->childAt(i); - std::string hiveColumnName = name; + std::string columnName = name; auto it = columnAliases_.find(name); if (it != columnAliases_.end()) { - hiveColumnName = it->second; + columnName = it->second; typedMapping.emplace( - name, - std::make_shared(type, hiveColumnName)); + name, std::make_shared(type, columnName)); } if (!hasAssignments) { - assignments_.insert( - {name, - std::make_shared( - hiveColumnName, - HiveColumnHandle::ColumnType::kRegular, - type, - type)}); + auto columnHandle = + connectorFactory_->makeColumnHandle(connectorId_, columnName, type); + columnHandles.push_back(columnHandle); + assignments_.insert({name, columnHandle}); } } @@ -312,13 +300,29 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) { } if (!tableHandle_) { - tableHandle_ = std::make_shared( - connectorId_, - tableName_, - true, - std::move(subfieldFiltersMap_), - remainingFilterExpr, - dataColumns_); + // Build connector-specific table handle through the ObjectFactory. + folly::dynamic options = folly::dynamic::object(); + options["filterPushdownEnabled"] = true; + + // Serialize subfield filters + if (!subfieldFiltersMap_.empty()) { + folly::dynamic sfs = folly::dynamic::object(); + for (const auto& entry : subfieldFiltersMap_) { + const auto& subfield = entry.first; + const auto& filter = entry.second; + sfs[subfield.toString()] = velox::ISerializable::serialize(*filter); + } + options["subfieldFilters"] = std::move(sfs); + } + + // Serialize remaining filters + if (remainingFilterExpr) { + options["remainingFilter"] = + velox::ISerializable::serialize(*remainingFilterExpr); + } + + tableHandle_ = connectorFactory_->makeTableHandle( + connectorId_, tableName_, columnHandles, options); } core::PlanNodePtr result = std::make_shared( id, outputType_, tableHandle_, assignments_); @@ -339,51 +343,69 @@ core::PlanNodePtr PlanBuilder::TableWriterBuilder::build(core::PlanNodeId id) { // upstream operator. auto outputType = outputType_ ? outputType_ : upstreamNode->outputType(); - // If insertHandle_ is not specified, build a HiveInsertTableHandle along with - // columnHandles, bucketProperty and locationHandle. + // If insertHandle_ is not specified, build a ConnectorInsertTableHandle along + // with columnHandles, bucketProperty and locationHandle. if (!insertHandle_) { // Create column handles. - std::vector> - columnHandles; + std::vector> columnHandles; for (auto i = 0; i < outputType->size(); ++i) { const auto column = outputType->nameOf(i); + folly::dynamic options = folly::dynamic::object(); + const bool isPartitionKey = std::find(partitionBy_.begin(), partitionBy_.end(), column) != partitionBy_.end(); - columnHandles.push_back( - std::make_shared( - column, - isPartitionKey - ? connector::hive::HiveColumnHandle::ColumnType::kPartitionKey - : connector::hive::HiveColumnHandle::ColumnType::kRegular, - outputType->childAt(i), - outputType->childAt(i))); + options["columnType"] = isPartitionKey ? "partitionkey" : "regular"; + auto columnHandle = connectorFactory_->makeColumnHandle( + connectorId_, column, outputType->childAt(i), options); + columnHandles.push_back(columnHandle); } - auto locationHandle = std::make_shared( - outputDirectoryPath_, - outputDirectoryPath_, - connector::hive::LocationHandle::TableType::kNew, - outputFileName_); + // 2) Create a connector-specific LocationHandle (e.g. HiveLocationHandle). + folly::dynamic locationHandleOptions = folly::dynamic::object(); + locationHandleOptions["targetPath"] = outputDirectoryPath_; + locationHandleOptions["writePath"] = outputDirectoryPath_; + locationHandleOptions["targetFileName"] = outputFileName_; + auto locationHandle = connectorFactory_->makeLocationHandle( + connectorId_, + connector::ConnectorLocationHandle::TableType::kNew, + locationHandleOptions); + + folly::dynamic insertTableOptions = folly::dynamic::object(); + insertTableOptions["storageFormat"] = static_cast(fileFormat_); + insertTableOptions["compressionKind"] = static_cast(compressionKind_); + if (!serdeParameters_.empty()) { + insertTableOptions["serdeParameters"] = + ISerializable::serialize(serdeParameters_); + } + if (options_) { + insertTableOptions["writerOptions"] = options_->serialize(); + } + insertTableOptions["ensureFiles"] = ensureFiles_; - std::shared_ptr bucketProperty; if (bucketCount_ != 0) { - bucketProperty = buildHiveBucketProperty( - outputType, bucketCount_, bucketedBy_, sortBy_); + folly::dynamic bucketProperty = folly::dynamic::object(); + std::vector bucketTypes; + bucketTypes.reserve(bucketedBy_.size()); + for (const auto& bucketColumn : bucketedBy_) { + bucketTypes.push_back( + outputType->childAt(outputType->getChildIdx(bucketColumn))); + } + bucketProperty["kind"] = 0l; + bucketProperty["bucketCount"] = bucketCount_; + bucketProperty["bucketedBy"] = ISerializable::serialize(bucketedBy_); + bucketProperty["bucketedTypes"] = ISerializable::serialize(bucketTypes); + bucketProperty["sortColumns"] = ISerializable::serialize(sortColumns_); + bucketProperty["sortOrders"] = ISerializable::serialize(sortOrders_); + + insertTableOptions["bucketProperty"] = std::move(bucketProperty); } - auto hiveHandle = std::make_shared( - columnHandles, - locationHandle, - fileFormat_, - bucketProperty, - compressionKind_, - serdeParameters_, - options_, - ensureFiles_); + auto connectorInsertTableHandle = connectorFactory_->makeInsertTableHandle( + connectorId_, columnHandles, locationHandle, insertTableOptions); - insertHandle_ = - std::make_shared(connectorId_, hiveHandle); + insertHandle_ = std::make_shared( + connectorId_, connectorInsertTableHandle); } std::optional columnStatsSpec; @@ -708,7 +730,8 @@ PlanBuilder& PlanBuilder::tableWrite( const std::vector& partitionBy, int32_t bucketCount, const std::vector& bucketedBy, - const std::vector>& sortBy, + const std::vector& sortColumns, + const std::vector& sortOrders, const dwio::common::FileFormat fileFormat, const std::vector& aggregates, const std::string_view& connectorId, @@ -726,7 +749,8 @@ PlanBuilder& PlanBuilder::tableWrite( .partitionBy(partitionBy) .bucketCount(bucketCount) .bucketedBy(bucketedBy) - .sortBy(sortBy) + .sortColumns(sortColumns) + .sortOrders(sortOrders) .fileFormat(fileFormat) .aggregates(aggregates) .connectorId(connectorId) @@ -1298,6 +1322,35 @@ core::PartitionFunctionSpecPtr createPartitionFunctionSpec( } } +core::PartitionFunctionSpecPtr createConnectorPartitionFunctionSpec( + int numBuckets, + const std::vector& channels, + const std::vector& constValues, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = + std::string(PlanBuilder::kHiveDefaultConnectorId)) { + folly::dynamic opts = folly::dynamic::object(); + opts["numBuckets"] = numBuckets; + opts["keyChannels"] = ISerializable::serialize(channels); + + if (!constValues.empty()) { + std::vector constExprs; + constExprs.reserve(constValues.size()); + for (const auto& v : constValues) { + VELOX_CHECK_NOT_NULL(v); + constExprs.emplace_back(v); // wrap VectorPtr as ConstantTypedExpr + } + opts["constants"] = velox::ISerializable::serialize(constExprs); + } + + // Ask the connector factory to build the spec. + auto connectorFactory = connector::getConnectorFactory(connectorName); + auto connectorPartitionSpec = + connectorFactory->makePartitionFunctionSpec(connectorId, opts); + + return connectorPartitionSpec; +} + RowTypePtr concat(const RowTypePtr& a, const RowTypePtr& b) { std::vector names = a->names(); std::vector types = a->children(); @@ -1463,15 +1516,16 @@ PlanBuilder& PlanBuilder::scaleWriterlocalPartition( for (const auto& key : keys) { keyIndices.push_back(planNode_->outputType()->getChildIdx(key)); } - auto hivePartitionFunctionFactory = - std::make_shared( - 1009, keyIndices, std::vector{}); + + auto connectorPartitionSpec = createConnectorPartitionFunctionSpec( + 1009, keyIndices, std::vector{}); planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, true, - hivePartitionFunctionFactory, + connectorPartitionSpec, std::vector{planNode_}); + VELOX_CHECK(!planNode_->supportsBarrier()); return *this; } @@ -1479,39 +1533,17 @@ PlanBuilder& PlanBuilder::scaleWriterlocalPartition( PlanBuilder& PlanBuilder::localPartition( int numBuckets, const std::vector& bucketChannels, - const std::vector& constValues) { - auto hivePartitionFunctionFactory = - std::make_shared( - numBuckets, bucketChannels, constValues); - planNode_ = std::make_shared( - nextPlanNodeId(), - core::LocalPartitionNode::Type::kRepartition, - /*scaleWriter=*/false, - std::move(hivePartitionFunctionFactory), - std::vector{planNode_}); - VELOX_CHECK(planNode_->supportsBarrier()); - return *this; -} + const std::vector& constValues, + const std::string& connectorName, + const std::string& connectorId) { + auto connectorPartitionSpec = createConnectorPartitionFunctionSpec( + numBuckets, bucketChannels, constValues, connectorName, connectorId); -PlanBuilder& PlanBuilder::localPartitionByBucket( - const std::shared_ptr& - bucketProperty) { - VELOX_CHECK_NOT_NULL(planNode_, "LocalPartition cannot be the source node"); - std::vector bucketChannels; - for (const auto& bucketColumn : bucketProperty->bucketedBy()) { - bucketChannels.push_back( - planNode_->outputType()->getChildIdx(bucketColumn)); - } - auto hivePartitionFunctionFactory = - std::make_shared( - bucketProperty->bucketCount(), - bucketChannels, - std::vector{}); planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, /*scaleWriter=*/false, - std::move(hivePartitionFunctionFactory), + std::move(connectorPartitionSpec), std::vector{planNode_}); VELOX_CHECK(planNode_->supportsBarrier()); return *this; diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index dbf27c6b86e..fd34680847b 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -20,7 +20,10 @@ #include #include #include -#include "velox/connectors/hive/HiveDataSink.h" +#include "velox/connectors/Connector.h" +#include "velox/connectors/ConnectorNames.h" +#include "velox/connectors/RegisterConnectorFactories.h" +#include "velox/dwio/common/Options.h" #include "velox/parse/ExpressionsParser.h" #include "velox/parse/IExpr.h" #include "velox/parse/PlanNodeIdGenerator.h" @@ -95,7 +98,9 @@ class PlanBuilder { explicit PlanBuilder( std::shared_ptr planNodeIdGenerator, memory::MemoryPool* pool = nullptr) - : planNodeIdGenerator_{std::move(planNodeIdGenerator)}, pool_{pool} {} + : planNodeIdGenerator_{std::move(planNodeIdGenerator)}, pool_{pool} { + connector::registerConnectorFactories(); + } /// Constructor with no required parameters suitable for creating /// straight-line (e.g. no joins) query plans. @@ -111,9 +116,13 @@ class PlanBuilder { memory::MemoryPool* pool = nullptr) : planNode_(std::move(initialPlanNode)), planNodeIdGenerator_{std::move(planNodeIdGenerator)}, - pool_{pool} {} + pool_{pool} { + connector::registerConnectorFactories(); + } - virtual ~PlanBuilder() = default; + virtual ~PlanBuilder() { + connector::unregisterConnectorFactories(); + } static constexpr const std::string_view kHiveDefaultConnectorId{"test-hive"}; static constexpr const std::string_view kTpchDefaultConnectorId{"test-tpch"}; @@ -146,7 +155,9 @@ class PlanBuilder { const std::vector& subfieldFilters = {}, const std::string& remainingFilter = "", const RowTypePtr& dataColumns = nullptr, - const connector::ColumnHandleMap& assignments = {}); + const connector::ColumnHandleMap& assignments = {}, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)); /// Add a TableScanNode to scan a Hive table. /// @@ -176,7 +187,9 @@ class PlanBuilder { const std::vector& subfieldFilters = {}, const std::string& remainingFilter = "", const RowTypePtr& dataColumns = nullptr, - const connector::ColumnHandleMap& assignments = {}); + const connector::ColumnHandleMap& assignments = {}, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)); /// Add a TableScanNode to scan a Hive table with direct SubfieldFilters. /// @@ -189,7 +202,9 @@ class PlanBuilder { const RowTypePtr& outputType, const PushdownConfig& pushdownConfig, const RowTypePtr& dataColumns = nullptr, - const connector::ColumnHandleMap& assignments = {}); + const connector::ColumnHandleMap& assignments = {}, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)); /// Add a TableScanNode to scan a TPC-H table. /// @@ -229,7 +244,15 @@ class PlanBuilder { /// builder arguments will be ignored. class TableScanBuilder { public: - TableScanBuilder(PlanBuilder& builder) : planBuilder_(builder) {} + TableScanBuilder( + PlanBuilder& builder, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)) + : planBuilder_(builder), + connectorName_(connectorName), + connectorId_(connectorId) { + connectorFactory_ = connector::getConnectorFactory(connectorName); + } /// @param tableName The name of the table to scan. TableScanBuilder& tableName(std::string tableName) { @@ -337,7 +360,9 @@ class PlanBuilder { PlanBuilder& planBuilder_; std::string tableName_{"hive_table"}; - std::string connectorId_{kHiveDefaultConnectorId}; + std::string connectorName_; + std::string connectorId_; + std::shared_ptr connectorFactory_; RowTypePtr outputType_; core::ExprPtr remainingFilter_; RowTypePtr dataColumns_; @@ -449,7 +474,15 @@ class PlanBuilder { /// Uses the Hive connector by default. class TableWriterBuilder { public: - explicit TableWriterBuilder(PlanBuilder& builder) : planBuilder_(builder) {} + explicit TableWriterBuilder( + PlanBuilder& builder, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)) + : planBuilder_(builder), + connectorName_(connectorName), + connectorId_(connectorId) { + connectorFactory_ = connector::getConnectorFactory(connectorName); + } /// @param outputType The schema that will be written to the output file. It /// may reference a subset or change the order of columns from the input @@ -513,11 +546,15 @@ class PlanBuilder { return *this; } - /// @param sortBy Specifies the sort by columns. - TableWriterBuilder& sortBy( - std::vector> - sortBy) { - sortBy_ = std::move(sortBy); + TableWriterBuilder& sortColumns( + const std::vector& sortColumns) { + sortColumns_ = sortColumns; + return *this; + } + + TableWriterBuilder& sortOrders( + const std::vector& sortOrders) { + sortOrders_ = sortOrders; return *this; } @@ -579,15 +616,17 @@ class PlanBuilder { RowTypePtr outputType_; std::string outputDirectoryPath_; std::string outputFileName_; - std::string connectorId_{kHiveDefaultConnectorId}; + std::string connectorName_; + std::string connectorId_; + std::shared_ptr connectorFactory_; std::shared_ptr insertHandle_; std::vector partitionBy_; int32_t bucketCount_{0}; std::vector bucketedBy_; std::vector aggregates_; - std::vector> - sortBy_; + std::vector sortColumns_; + std::vector sortOrders_; std::unordered_map serdeParameters_; std::shared_ptr options_; @@ -822,8 +861,8 @@ class PlanBuilder { const std::vector& partitionBy, int32_t bucketCount, const std::vector& bucketedBy, - const std::vector< - std::shared_ptr>& sortBy, + const std::vector& sortColumns, + const std::vector& sortOrders, const dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF, const std::vector& aggregates = {}, @@ -1182,18 +1221,14 @@ class PlanBuilder { /// current plan node). PlanBuilder& localPartition(const std::vector& keys); - /// A convenience method to add a LocalPartitionNode with hive partition + /// A convenience method to add a LocalPartitionNode with connector partition /// function. PlanBuilder& localPartition( int numBuckets, const std::vector& channels, - const std::vector& constValues); - - /// A convenience method to add a LocalPartitionNode with a single source (the - /// current plan node) and hive bucket property. - PlanBuilder& localPartitionByBucket( - const std::shared_ptr& - bucketProperty); + const std::vector& constValues, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)); /// Add a LocalPartitionNode to partition the input using batch-level /// round-robin. Number of partitions is determined at runtime based on diff --git a/velox/exec/tests/utils/TableWriterTestBase.cpp b/velox/exec/tests/utils/TableWriterTestBase.cpp index 34304d94ab2..c65f9a3bef3 100644 --- a/velox/exec/tests/utils/TableWriterTestBase.cpp +++ b/velox/exec/tests/utils/TableWriterTestBase.cpp @@ -672,36 +672,36 @@ PlanNodePtr TableWriterTestBase::createInsertPlanForBucketTable( std::optional columnStatsSpec) { // Since we might do column rename, so generate bucket property based on // the data type from 'inputPlan'. - std::vector bucketColumns; - bucketColumns.reserve(bucketProperty->bucketedBy().size()); + std::vector bucketColumns(bucketProperty->bucketedBy().size()); + std::vector bucketChannels( + bucketProperty->bucketedBy().size()); for (int i = 0; i < bucketProperty->bucketedBy().size(); ++i) { - bucketColumns.push_back(inputRowType->names()[tableRowType->getChildIdx( - bucketProperty->bucketedBy()[i])]); + auto index = tableRowType->getChildIdx(bucketProperty->bucketedBy()[i]); + bucketColumns.push_back(inputRowType->names()[index]); + bucketChannels.push_back(index); } - auto localPartitionBucketProperty = std::make_shared( - bucketProperty->kind(), - bucketProperty->bucketCount(), - bucketColumns, - bucketProperty->bucketedTypes(), - bucketProperty->sortedBy()); - auto insertPlan = - inputPlan.localPartitionByBucket(localPartitionBucketProperty) - .addNode(addTableWriter( - inputRowType, - tableRowType->names(), - std::nullopt, - createInsertTableHandle( - tableRowType, - outputTableType, - outputDirectoryPath, - partitionedBy, - bucketProperty, - compressionKind), - false, - outputCommitStrategy)) - .capturePlanNodeId(tableWriteNodeId_) - .localPartition({}) - .tableWriteMerge(); + + auto insertPlan = inputPlan + .localPartition( + bucketProperty->bucketCount(), + bucketChannels, + std::vector{}) + .addNode(addTableWriter( + inputRowType, + tableRowType->names(), + std::nullopt, + createInsertTableHandle( + tableRowType, + outputTableType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind), + false, + outputCommitStrategy)) + .capturePlanNodeId(tableWriteNodeId_) + .localPartition({}) + .tableWriteMerge(); if (aggregateResult) { insertPlan.project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( diff --git a/velox/tool/trace/TableWriterReplayer.cpp b/velox/tool/trace/TableWriterReplayer.cpp index d17511f73bb..e1a09c50d0a 100644 --- a/velox/tool/trace/TableWriterReplayer.cpp +++ b/velox/tool/trace/TableWriterReplayer.cpp @@ -16,6 +16,7 @@ #include +#include "velox/connectors/hive/HiveDataSink.h" #include "velox/exec/TableWriter.h" #include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/PlanBuilder.h"