From 014b832ed2fc9559ddfee6a2b46e9381feca790a Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Mon, 23 Jun 2025 23:46:46 +0800 Subject: [PATCH 1/6] feat: Make WriterOptions serializable --- velox/dwio/common/Options.cpp | 85 +++++++++++++++++++++++++++++++++++ velox/dwio/common/Options.h | 6 ++- 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/velox/dwio/common/Options.cpp b/velox/dwio/common/Options.cpp index 33946f71159..10abb94e9fe 100644 --- a/velox/dwio/common/Options.cpp +++ b/velox/dwio/common/Options.cpp @@ -15,6 +15,7 @@ */ #include "velox/dwio/common/Options.h" +#include "velox/common/compression/Compression.h" namespace facebook::velox::dwio::common { @@ -77,4 +78,88 @@ ColumnReaderOptions makeColumnReaderOptions(const ReaderOptions& options) { return columnReaderOptions; } +folly::dynamic WriterOptions::serialize() const { + folly::dynamic obj = folly::dynamic::object; + + // 1) Schema + if (schema) { + obj["schema"] = schema->serialize(); + } + + // 2) compressionKind + if (compressionKind) { + obj["compressionKind"] = static_cast(*compressionKind); + } + + // 3) serdeParameters + if (!serdeParameters.empty()) { + folly::dynamic mapObj = folly::dynamic::object; + for (auto& [k, v] : serdeParameters) { + mapObj[k] = v; + } + obj["serdeParameters"] = std::move(mapObj); + } + + // 4) sessionTimezoneName + if (!sessionTimezoneName.empty()) { + obj["sessionTimezoneName"] = sessionTimezoneName; + } + + // 5) adjustTimestampToTimezone + obj["adjustTimestampToTimezone"] = adjustTimestampToTimezone; + + // (We do *not* serialize pool, spillConfig, nonReclaimableSection, + // or the factory functions—they must be re‐injected by the host.) + + return obj; +} + +std::shared_ptr WriterOptions::deserialize( + const folly::dynamic& obj) { + auto opts = std::make_shared(); + + // 1) schema + if (auto p = obj.get_ptr("schema")) { + opts->schema = ISerializable::deserialize(*p, nullptr); + } + + // 2) compressionKind + if (auto p = obj.get_ptr("compressionKind")) { + opts->compressionKind = + static_cast(p->asInt()); + } + + // 3) serdeParameters + if (auto p = obj.get_ptr("serdeParameters")) { + opts->serdeParameters.clear(); + for (auto& kv : p->items()) { + opts->serdeParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + // 4) sessionTimezoneName + if (auto p = obj.get_ptr("sessionTimezoneName")) { + opts->sessionTimezoneName = p->asString(); + } + + // 5) adjustTimestampToTimezone + if (auto p = obj.get_ptr("adjustTimestampToTimezone")) { + opts->adjustTimestampToTimezone = p->asBool(); + } + + // pool, spillConfig, nonReclaimableSection, factories remain at default + return opts; +} + +void WriterOptions::registerSerDe() { + auto& registry = DeserializationRegistryForSharedPtr(); + registry.Register("WriterOptions", WriterOptions::deserialize); +} + +// force registration at load‐time +static bool _writerOptionsSerdeRegistered = []() { + WriterOptions::registerSerDe(); + return true; +}(); + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index ef70cb72379..2de86d4e858 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -684,7 +684,7 @@ class ReaderOptions : public io::ReaderOptions { bool allowEmptyFile_{false}; }; -struct WriterOptions { +struct WriterOptions : public ISerializable { TypePtr schema{nullptr}; velox::memory::MemoryPool* memoryPool{nullptr}; const velox::common::SpillConfig* spillConfig{nullptr}; @@ -713,6 +713,10 @@ struct WriterOptions { const config::ConfigBase& session) {} virtual ~WriterOptions() = default; + + folly::dynamic serialize() const override; + static std::shared_ptr deserialize(const folly::dynamic& obj); + static void registerSerDe(); }; // Options for creating a column reader. From bc0579ae4487f31310e5848cec5536cfc7b4eac8 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Tue, 9 Sep 2025 18:51:58 -0700 Subject: [PATCH 2/6] refactor: Introduce ConnectorNames.h in velox/connectors In order to decouple Hive and other specific connectors from exec and core modules, we need to put the connector names in a central location velox/connectors/ConnectorNames.h. The idea is similar to facebook::velox::dwio::common::FileFormat where all file formats are specified. Modules outside of the connectors module can just reference this central and connector-agnostic header instead of connector specific headers like HiveConnector.h. --- velox/connectors/ConnectorNames.h | 29 +++++++++++++++++++++++++++ velox/exec/tests/VeloxIn10MinDemo.cpp | 8 ++++---- 2 files changed, 33 insertions(+), 4 deletions(-) create mode 100644 velox/connectors/ConnectorNames.h diff --git a/velox/connectors/ConnectorNames.h b/velox/connectors/ConnectorNames.h new file mode 100644 index 00000000000..05f1dd7f79c --- /dev/null +++ b/velox/connectors/ConnectorNames.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::connector { + +// TODO: Add a demo connector + +constexpr const char* kFuzzerConnectorName = "fuzzer"; +constexpr const char* kHiveConnectorName = "hive"; +constexpr const char* kIcebergConnectorName = "iceberg"; +constexpr const char* kTpchConnectorName = "tpch"; +constexpr const char* kTpcdsConnectorName = "tpcds"; + +} // namespace facebook::velox::connector diff --git a/velox/exec/tests/VeloxIn10MinDemo.cpp b/velox/exec/tests/VeloxIn10MinDemo.cpp index 87d571a1788..4dd899c1a3b 100644 --- a/velox/exec/tests/VeloxIn10MinDemo.cpp +++ b/velox/exec/tests/VeloxIn10MinDemo.cpp @@ -14,7 +14,9 @@ * limitations under the License. */ #include + #include "velox/common/memory/Memory.h" +#include "velox/connectors/ConnectorNames.h" #include "velox/connectors/tpch/TpchConnector.h" #include "velox/connectors/tpch/TpchConnectorSplit.h" #include "velox/core/Expressions.h" @@ -53,8 +55,7 @@ class VeloxIn10MinDemo : public VectorTestBase { // Create and register a TPC-H connector. auto tpchConnector = - connector::getConnectorFactory( - connector::tpch::TpchConnectorFactory::kTpchConnectorName) + connector::getConnectorFactory(connector::kTpchConnectorName) ->newConnector( kTpchConnectorId, std::make_shared( @@ -64,8 +65,7 @@ class VeloxIn10MinDemo : public VectorTestBase { ~VeloxIn10MinDemo() { connector::unregisterConnector(kTpchConnectorId); - connector::unregisterConnectorFactory( - connector::tpch::TpchConnectorFactory::kTpchConnectorName); + connector::unregisterConnectorFactory(connector::kTpchConnectorName); } /// Parse SQL expression into a typed expression tree using DuckDB SQL parser. From cb3ce83abb6f3e7458a24195cbf99f1dc55eda95 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Mon, 23 Jun 2025 23:47:41 +0800 Subject: [PATCH 3/6] mis: Enhance ConnectorFactory interface This commit is the first part of the effort to decouple Hive from exec tests, which aims to make VELOX_ENABLE_HIVE_CONNECTOR=OFF build without errors. The content of this commit include: - Enhance velox::connector::ConnectorFactory, adding abstract methods for creating ConnectoSplits, TableHandles, InsertTableHandles,etc. - Enhance HiveConnectorFactory in velox/connectors/hive that implements the newly added function interface. - Add a new HiveObjectFactoryTest suite to verify that dynamic options yield correct Hive-specific objects without leaking connector internals into core or exec tests. --- velox/connectors/Connector.cpp | 6 +- velox/connectors/Connector.h | 83 ++++ velox/connectors/fuzzer/FuzzerConnector.h | 2 + velox/connectors/hive/HiveConnector.cpp | 396 ++++++++++++++++++ velox/connectors/hive/HiveConnector.h | 42 ++ velox/connectors/hive/HiveConnectorSplit.h | 1 + velox/connectors/hive/HiveDataSink.cpp | 42 +- velox/connectors/hive/HiveDataSink.h | 39 +- velox/connectors/hive/tests/CMakeLists.txt | 1 + .../hive/tests/HiveObjectFactoryTest.cpp | 242 +++++++++++ velox/connectors/tpcds/TpcdsConnector.h | 2 + velox/connectors/tpch/TpchConnector.h | 4 +- velox/core/PlanNode.h | 1 + 13 files changed, 832 insertions(+), 29 deletions(-) create mode 100644 velox/connectors/hive/tests/HiveObjectFactoryTest.cpp diff --git a/velox/connectors/Connector.cpp b/velox/connectors/Connector.cpp index e7c72af478b..f345de03cb7 100644 --- a/velox/connectors/Connector.cpp +++ b/velox/connectors/Connector.cpp @@ -58,8 +58,8 @@ bool hasConnectorFactory(const std::string& connectorName) { } bool unregisterConnectorFactory(const std::string& connectorName) { - auto count = connectorFactories().erase(connectorName); - return count == 1; + auto factoryCount = connectorFactories().erase(connectorName); + return factoryCount == 1; } std::shared_ptr getConnectorFactory( @@ -169,4 +169,6 @@ folly::dynamic ConnectorTableHandle::serialize() const { return serializeBase("ConnectorTableHandle"); } +ConnectorLocationHandle::~ConnectorLocationHandle() = default; + } // namespace facebook::velox::connector diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 10cf82b342c..0a6d091b769 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -27,6 +27,7 @@ #include "velox/common/file/TokenProvider.h" #include "velox/common/future/VeloxPromise.h" #include "velox/core/ExpressionEvaluator.h" +#include "velox/core/PartitionFunction.h" #include "velox/type/Filter.h" #include "velox/vector/ComplexVector.h" @@ -93,6 +94,8 @@ struct ConnectorSplit : public ISerializable { class ColumnHandle : public ISerializable { public: + enum class ColumnType { kPartitionKey, kRegular, kSynthesized }; + virtual ~ColumnHandle() = default; virtual const std::string& name() const = 0; @@ -165,6 +168,33 @@ class ConnectorInsertTableHandle : public ISerializable { } }; +class ConnectorLocationHandle : public ISerializable { + public: + enum class TableType { kNew, kExisting, kTemp }; + + ConnectorLocationHandle(const std::string& connectorId, TableType tableType) + : connectorId_{connectorId}, tableType_{tableType} {} + + virtual ~ConnectorLocationHandle(); + + const std::string& connectorId() const { + return connectorId_; + } + + /// New vs existing vs temp. + TableType tableType() const { + return tableType_; + } + + virtual std::string toString() const = 0; + + virtual folly::dynamic serialize() const = 0; + + private: + const std::string connectorId_; + const TableType tableType_; +}; + using ConnectorInsertTableHandlePtr = std::shared_ptr; @@ -688,6 +718,59 @@ class ConnectorFactory { folly::Executor* ioExecutor = nullptr, folly::Executor* cpuExecutor = nullptr) = 0; + virtual std::shared_ptr makeConnectorSplit( + const std::string& connectorId, + const std::string& filePath, + uint64_t start, + uint64_t length, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED("ConnectorSplit not supported by connector", connectorId); + } + + virtual std::shared_ptr makeColumnHandle( + const std::string& connectorId, + const std::string& name, + const TypePtr& type, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "connector::ColumnHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr makeTableHandle( + const std::string& connectorId, + const std::string& tableName, + std::vector> columnHandles, + const folly::dynamic& options) const { + VELOX_UNSUPPORTED( + "ConnectorTableHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr makeInsertTableHandle( + const std::string& connectorId, + std::vector> inputColumns, + std::shared_ptr locationHandle, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "ConnectorInsertTableHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr makeLocationHandle( + const std::string& connectorId, + ConnectorLocationHandle::TableType tableType = + ConnectorLocationHandle::TableType::kNew, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "ConnectorLocationHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr + makePartitionFunctionSpec( + const std::string& connectorId, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "PartitionFunctionSpec not supported by connector: {}", connectorId); + } + private: const std::string name_; }; diff --git a/velox/connectors/fuzzer/FuzzerConnector.h b/velox/connectors/fuzzer/FuzzerConnector.h index 53e94b5f638..1b06234ed52 100644 --- a/velox/connectors/fuzzer/FuzzerConnector.h +++ b/velox/connectors/fuzzer/FuzzerConnector.h @@ -141,6 +141,8 @@ class FuzzerConnectorFactory : public ConnectorFactory { folly::Executor* cpuExecutor = nullptr) override { return std::make_shared(id, config, ioExecutor); } + + // TODO: Add object makers like makeTableHandle, makeColumnHandle, etc. }; } // namespace facebook::velox::connector::fuzzer diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index e04828e83aa..9ba39add07f 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -25,6 +25,7 @@ #include using namespace facebook::velox::exec; +using namespace facebook::velox::common; namespace facebook::velox::connector::hive { @@ -99,6 +100,401 @@ void HiveConnector::registerSerDe() { HivePartitionFunctionSpec::registerSerDe(); } +std::shared_ptr HiveConnectorFactory::makeConnectorSplit( + const std::string& connectorId, + const std::string& filePath, + uint64_t start, + uint64_t length, + const folly::dynamic& options) const { + auto builder = HiveConnectorSplitBuilder(filePath) + .start(start) + .length(length) + .connectorId(connectorId); + + dwio::common::FileFormat fileFormat = + (!options.isNull() && options.count("fileFormat")) + ? static_cast(options["fileFormat"].asInt()) + : defaultFileFormat_; + builder.fileFormat(fileFormat); + + int64_t splitWeight = (!options.isNull() && options.count("splitWeight")) + ? options["splitWeight"].asInt() + : 0; + builder.splitWeight(splitWeight); + + bool cacheable = (!options.isNull() && options.count("cacheable")) + ? options["cacheable"].asBool() + : true; + builder.cacheable(cacheable); + + if (!options.isNull() && options.count("infoColumns")) { + for (auto& kv : options["infoColumns"].items()) { + builder.infoColumn(kv.first.asString(), kv.second.asString()); + } + } + + if (!options.isNull() && options.count("partitionKeys")) { + for (auto& kv : options["partitionKeys"].items()) { + builder.partitionKey( + kv.first.asString(), + kv.second.isNull() + ? std::nullopt + : std::optional(kv.second.asString())); + } + } + + if (!options.isNull() && options.count("tableBucketNumber")) { + builder.tableBucketNumber(options["tableBucketNumber"].asInt()); + } + + if (!options.isNull() && options.count("bucketConversion")) { + HiveBucketConversion bucketConversion; + const auto& bucketConversionOption = options["bucketConversion"]; + bucketConversion.tableBucketCount = + bucketConversionOption["tableBucketCount"].asInt(); + bucketConversion.partitionBucketCount = + bucketConversionOption["partitionBucketCount"].asInt(); + for (auto& bucketColumnHandlesOption : + bucketConversionOption["bucketColumnHandles"]) { + bucketConversion.bucketColumnHandles.push_back( + std::const_pointer_cast( + ISerializable::deserialize( + bucketColumnHandlesOption))); + } + builder.bucketConversion(bucketConversion); + } + + if (!options.isNull() && options.count("customSplitInfo")) { + std::unordered_map info; + for (auto& kv : options["customSplitInfo"].items()) { + info[kv.first.asString()] = kv.second.asString(); + } + builder.customSplitInfo(info); + } + + if (!options.isNull() && options.count("extraFileInfo")) { + auto extra = options["extraFileInfo"].isNull() + ? std::shared_ptr() + : std::make_shared(options["extraFileInfo"].asString()); + builder.extraFileInfo(extra); + } + + if (!options.isNull() && options.count("serdeParameters")) { + std::unordered_map serde; + for (auto& kv : options["serdeParameters"].items()) { + serde[kv.first.asString()] = kv.second.asString(); + } + builder.serdeParameters(serde); + } + + if (!options.isNull() && options.count("fileProperties")) { + FileProperties props; + const auto& propertiesOption = options["fileProperties"]; + if (propertiesOption.count("fileSize") && + !propertiesOption["fileSize"].isNull()) { + props.fileSize = propertiesOption["fileSize"].asInt(); + } + if (propertiesOption.count("modificationTime") && + !propertiesOption["modificationTime"].isNull()) { + props.modificationTime = propertiesOption["modificationTime"].asInt(); + } + builder.fileProperties(props); + } + + if (!options.isNull() && options.count("rowIdProperties")) { + RowIdProperties rowIdProperties; + const auto& rowIdPropertiesOption = options["rowIdProperties"]; + rowIdProperties.metadataVersion = + rowIdPropertiesOption["metadataVersion"].asInt(); + rowIdProperties.partitionId = rowIdPropertiesOption["partitionId"].asInt(); + rowIdProperties.tableGuid = rowIdPropertiesOption["tableGuid"].asString(); + builder.rowIdProperties(rowIdProperties); + } + + return builder.build(); +} + +std::shared_ptr HiveConnectorFactory::makeColumnHandle( + const std::string& connectorId, + const std::string& name, + const TypePtr& dataType, + const folly::dynamic& options) const { + using HiveColumnType = hive::HiveColumnHandle::ColumnType; + HiveColumnType hiveColumnType = HiveColumnType::kRegular; + + if (options.isNull()) { + return std::make_shared( + name, hiveColumnType, dataType, dataType); + } + + if (options.count("columnType") && options["columnType"].isString()) { + std::string columnType = options["columnType"].asString(); + // Accept a few friendly spellings, case-insensitive. + folly::toLowerAscii(columnType); + + if (columnType == "kpartitionkey" || columnType == "partition_key" || + columnType == "partitionkey") { + hiveColumnType = HiveColumnType::kPartitionKey; + } else if (columnType == "kregular" || columnType == "regular") { + hiveColumnType = HiveColumnType::kRegular; + } else if ( + columnType == "ksynthesized" || columnType == "synthesized" || + columnType == "synthetic") { + hiveColumnType = HiveColumnType::kSynthesized; + } else if ( + columnType == "krowindex" || columnType == "row_index" || + columnType == "rowindex") { + hiveColumnType = HiveColumnType::kRowIndex; + } else if ( + columnType == "krowid" || columnType == "row_id" || + columnType == "rowid") { + hiveColumnType = HiveColumnType::kRowId; + } + } + + TypePtr hiveType = + options.count("hiveType") ? Type::create(options["hiveType"]) : dataType; + + // subfields would be serialized as a vector of strings; + std::vector subfields; + if (auto rs = options.get_ptr("requiredSubfields")) { + subfields.reserve(rs->size()); + for (auto& v : *rs) { + subfields.emplace_back(v.asString()); + } + } + + HiveColumnHandle::ColumnParseParameters parseParams{ + HiveColumnHandle::ColumnParseParameters::kISO8601}; + if (auto cp = options.get_ptr("columnParseParameters")) { + auto formatName = + cp->getDefault("partitionDateValueFormat", "ISO8601").asString(); + if (formatName == "DaysSinceEpoch" || formatName == "kDaysSinceEpoch") { + parseParams.partitionDateValueFormat = + HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch; + } else { + parseParams.partitionDateValueFormat = + HiveColumnHandle::ColumnParseParameters::kISO8601; + } + } + + return std::make_shared( + name, + hiveColumnType, + dataType, + hiveType, + std::move(subfields), + parseParams); +} + +std::shared_ptr HiveConnectorFactory::makeTableHandle( + const std::string& connectorId, + const std::string& tableName, + std::vector> columnHandles, + const folly::dynamic& options) const { + bool filterPushdownEnabled = + options.getDefault("filterPushdownEnabled", true).asBool(); + + common::SubfieldFilters subfieldFilters; + if (auto sf = options.get_ptr("subfieldFilters")) { + subfieldFilters.reserve(sf->size()); + + for (auto& kv : sf->items()) { + // 1) Parse the key string into a Subfield + // (uses Subfield(const std::string&) and default separators) + Subfield subfield(kv.first.asString()); + + // 2) Deserialize the Filter from its dynamic form. + // Assumes every Filter subclass registered a SerDe entry in + // Filter::registerSerDe(). + auto filter = ISerializable::deserialize( + kv.second, /* context = */ nullptr); + + subfieldFilters.emplace( + std::move(subfield), std::const_pointer_cast(filter)); + } + } + + core::TypedExprPtr remainingFilter = nullptr; + if (auto rf = options.get_ptr("remainingFilter")) { + // assuming rf["expr"] holds the serialized expression + remainingFilter = ISerializable::deserialize(*rf); + } + + std::unordered_map tableParameters; + if (auto tp = options.get_ptr("tableParameters")) { + for (auto& kv : tp->items()) { + tableParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + // build RowTypePtr from columnHandles + std::vector names; + std::vector types; + names.reserve(columnHandles.size()); + types.reserve(columnHandles.size()); + for (auto& col : columnHandles) { + auto hiveCol = std::static_pointer_cast(col); + names.push_back(hiveCol->name()); + types.push_back(hiveCol->dataType()); + } + auto dataColumns = ROW(std::move(names), std::move(types)); + + return std::make_shared( + connectorId, + tableName, + filterPushdownEnabled, + std::move(subfieldFilters), + std::move(remainingFilter), + dataColumns, + std::move(tableParameters)); +} + +std::shared_ptr +HiveConnectorFactory::makeInsertTableHandle( + const std::string& connectorId, + std::vector> inputColumns, + std::shared_ptr locationHandle, + const folly::dynamic& options) const { + // Convert inputColumns + std::vector> inputHiveColumns; + inputHiveColumns.reserve(inputColumns.size()); + for (const auto& handle : inputColumns) { + inputHiveColumns.push_back( + std::static_pointer_cast(handle)); + } + + auto hiveLoc = + std::dynamic_pointer_cast(locationHandle); + VELOX_CHECK( + hiveLoc, + "HiveConnectorFactory::makeInsertTableHandle: " + "expected HiveLocationHandle"); + + auto fmt = + options + .getDefault( + "storageFormat", static_cast(dwio::common::FileFormat::DWRF)) + .asInt(); + auto storageFormat = static_cast(fmt); + + std::shared_ptr bucketProperty = nullptr; + if (auto bp = options.get_ptr("bucketProperty")) { + bucketProperty = HiveBucketProperty::deserialize(*bp, nullptr); + } + + std::optional compressionKind; + if (auto ck = options.get_ptr("compressionKind")) { + compressionKind = static_cast(ck->asInt()); + } + + std::unordered_map serdeParameters; + if (auto sp = options.get_ptr("serdeParameters")) { + for (auto& kv : sp->items()) { + serdeParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + std::shared_ptr writerOptions = nullptr; + if (auto wo = options.get_ptr("writerOptions")) { + writerOptions = dwio::common::WriterOptions::deserialize(*wo); + } + + bool ensureFiles = options.getDefault("ensureFiles", false).asBool(); + + auto fileNameGen = HiveInsertFileNameGenerator::deserialize( + *options.get_ptr("fileNameGenerator"), nullptr); + + return std::make_shared( + std::move(inputHiveColumns), + hiveLoc, + storageFormat, + std::move(bucketProperty), + compressionKind, + std::move(serdeParameters), + std::move(writerOptions), + ensureFiles, + std::move(fileNameGen)); +} + +std::shared_ptr +HiveConnectorFactory::makeLocationHandle( + const std::string& connectorId, + LocationHandle::TableType tableType, + const folly::dynamic& options) const { + VELOX_CHECK(options.isObject(), "Expected options to be a dynamic object"); + + // Required fields + auto targetPath = options.at("targetPath").asString(); + auto writePath = options.at("writePath").asString(); + + // Optional field: targetFileName + std::string targetFileName; + if (options.count("targetFileName") && + !options.at("targetFileName").isNull()) { + targetFileName = options.at("targetFileName").asString(); + } + + return std::make_shared( + std::move(targetPath), + std::move(writePath), + tableType, + std::move(targetFileName), + connectorId); +} + +core::PartitionFunctionSpecPtr HiveConnectorFactory::makePartitionFunctionSpec( + const std::string& /*connectorId*/, + const folly::dynamic& options) const { + VELOX_CHECK(options.isObject(), "Expected options to be a dynamic object"); + + // Required: number of buckets + const int numBuckets = options.at("numBuckets").asInt(); + + // Optional: explicit bucket-to-partition mapping + std::vector bucketToPartition; + if (options.count("bucketToPartition") && + !options["bucketToPartition"].isNull()) { + bucketToPartition = ISerializable::deserialize>( + options["bucketToPartition"]); + } + + // Required: key channels (by column index) for the bucket function + // NOTE: Keep the key name consistent with HashPartitionFunctionSpec + // ("keyChannels") + std::vector channels = + ISerializable::deserialize>( + options.at("keyChannels")); + + // Optional: constants used as key(s) (serialized ConstantTypedExpr[]) + std::vector constValues; + if (options.count("constants") && !options["constants"].isNull()) { + const auto typedConsts = + ISerializable::deserialize>( + options["constants"]); + constValues.reserve(typedConsts.size()); + + for (const auto& c : typedConsts) { + VELOX_CHECK_NOT_NULL(c); + constValues.emplace_back(c->toConstantVector(pool_.get())); + } + } + + if (bucketToPartition.empty()) { + // Spec form where mapping is derived at create() time (round-robin and + // optional local shuffle). + return std::make_shared( + numBuckets, std::move(channels), std::move(constValues)); + } + + // Spec with explicit mapping. + return std::make_shared( + numBuckets, + std::move(bucketToPartition), + std::move(channels), + std::move(constValues)); +} + std::unique_ptr HivePartitionFunctionSpec::create( int numPartitions, bool localExchange) const { diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index c6b91392976..1c34de0a364 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -19,6 +19,7 @@ #include "velox/connectors/hive/FileHandle.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/core/PlanNode.h" +#include "velox/dwio/common/Options.h" namespace facebook::velox::dwio::common { class DataSink; @@ -98,6 +99,47 @@ class HiveConnectorFactory : public ConnectorFactory { folly::Executor* cpuExecutor = nullptr) override { return std::make_shared(id, config, ioExecutor); } + + std::shared_ptr makeConnectorSplit( + const std::string& connectorId, + const std::string& filePath, + uint64_t start, + uint64_t length, + const folly::dynamic& options = {}) const override; + + std::shared_ptr makeColumnHandle( + const std::string& connectorId, + const std::string& name, + const TypePtr& type, + const folly::dynamic& options = {}) const override; + + std::shared_ptr makeTableHandle( + const std::string& connectorId, + const std::string& tableName, + std::vector> columnHandles, + const folly::dynamic& options) const override; + + std::shared_ptr makeInsertTableHandle( + const std::string& connectorId, + std::vector> inputColumns, + std::shared_ptr locationHandle, + const folly::dynamic& options = {}) const override; + + std::shared_ptr makeLocationHandle( + const std::string& connectorId, + connector::ConnectorLocationHandle::TableType tableType = + connector::ConnectorLocationHandle::TableType::kNew, + const folly::dynamic& options = {}) const override; + + core::PartitionFunctionSpecPtr makePartitionFunctionSpec( + const std::string& connectorId, + const folly::dynamic& options = {}) const override; + + private: + std::shared_ptr pool_{ + memory::memoryManager()->addLeafPool()}; + dwio::common::FileFormat defaultFileFormat_{ + dwio::common::FileFormat::PARQUET}; }; class HivePartitionFunctionSpec : public core::PartitionFunctionSpec { diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 3485c2330fa..bd68b431b91 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -17,6 +17,7 @@ #include #include + #include "velox/connectors/Connector.h" #include "velox/connectors/hive/FileProperties.h" #include "velox/connectors/hive/TableHandle.h" diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 36c797e4eaf..9abc1e4f72b 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -291,6 +291,25 @@ HiveBucketProperty::HiveBucketProperty( validate(); } +HiveBucketProperty::HiveBucketProperty( + Kind kind, + int32_t bucketCount, + const std::vector& bucketedBy, + const std::vector& bucketTypes, + const std::vector& sortColumns, + const std::vector& sortOrders) + : kind_(kind), + bucketCount_(bucketCount), + bucketedBy_(bucketedBy), + bucketTypes_(bucketTypes) { + VELOX_CHECK_EQ(sortColumns.size(), sortOrders.size()); + for (size_t i = 0; i < sortColumns.size(); ++i) { + sortedBy_.push_back( + std::make_shared(sortColumns[i], sortOrders[i])); + } + validate(); +} + void HiveBucketProperty::validate() const { VELOX_USER_CHECK_GT(bucketCount_, 0, "Hive bucket count can't be zero"); VELOX_USER_CHECK(!bucketedBy_.empty(), "Hive bucket columns must be set"); @@ -332,9 +351,22 @@ std::shared_ptr HiveBucketProperty::deserialize( ISerializable::deserialize>(obj["bucketedBy"]); const auto bucketedTypes = ISerializable::deserialize>( obj["bucketedTypes"], context); - const auto sortedBy = - ISerializable::deserialize>( - obj["sortedBy"], context); + + std::vector> sortedBy; + if (auto sb = obj.get_ptr("sortedBy")) { + sortedBy = ISerializable::deserialize>(*sb); + } else if (auto sc = obj.get_ptr("sortColumns")) { + auto sortColumns = + ISerializable::deserialize>(*sc); + if (auto so = obj.get_ptr("sortOrders")) { + VELOX_CHECK_EQ(sortColumns.size(), so->size()); + for (int i = 0; i < so->size(); i++) { + auto sortOrder = core::SortOrder::deserialize((*so)[i]); + sortedBy.emplace_back(std::make_shared( + sortColumns[i], sortOrder)); + } + } + } return std::make_shared( kind, bucketCount, buckectedBy, bucketedTypes, sortedBy); } @@ -1201,7 +1233,7 @@ std::string LocationHandle::toString() const { "LocationHandle [targetPath: {}, writePath: {}, tableType: {}, tableFileName: {}]", targetPath_, writePath_, - tableTypeName(tableType_), + tableTypeName(tableType()), targetFileName_); } @@ -1215,7 +1247,7 @@ folly::dynamic LocationHandle::serialize() const { obj["name"] = "LocationHandle"; obj["targetPath"] = targetPath_; obj["writePath"] = writePath_; - obj["tableType"] = tableTypeName(tableType_); + obj["tableType"] = tableTypeName(tableType()); obj["targetFileName"] = targetFileName_; return obj; } diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 8c305b7595d..d955ee5ff62 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -17,6 +17,7 @@ #include "velox/common/compression/Compression.h" #include "velox/connectors/Connector.h" +#include "velox/connectors/ConnectorNames.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/PartitionIdGenerator.h" #include "velox/connectors/hive/TableHandle.h" @@ -31,24 +32,18 @@ class LocationHandle; using LocationHandlePtr = std::shared_ptr; /// Location related properties of the Hive table to be written. -class LocationHandle : public ISerializable { +class LocationHandle : public ConnectorLocationHandle { public: - enum class TableType { - /// Write to a new table to be created. - kNew, - /// Write to an existing table. - kExisting, - }; - LocationHandle( std::string targetPath, std::string writePath, TableType tableType, - std::string targetFileName = "") - : targetPath_(std::move(targetPath)), + std::string targetFileName = "", + const std::string& connectorId = kHiveConnectorName) + : ConnectorLocationHandle(connectorId, tableType), + targetPath_(std::move(targetPath)), targetFileName_(std::move(targetFileName)), - writePath_(std::move(writePath)), - tableType_(tableType) {} + writePath_(std::move(writePath)) {} const std::string& targetPath() const { return targetPath_; @@ -62,16 +57,12 @@ class LocationHandle : public ISerializable { return writePath_; } - TableType tableType() const { - return tableType_; - } + std::string toString() const override; - std::string toString() const; + folly::dynamic serialize() const override; static void registerSerDe(); - folly::dynamic serialize() const override; - static LocationHandlePtr create(const folly::dynamic& obj); static const std::string tableTypeName(LocationHandle::TableType type); @@ -85,8 +76,6 @@ class LocationHandle : public ISerializable { const std::string targetFileName_; // Staging directory path. const std::string writePath_; - // Whether the table to be written is new, already existing or temporary. - const TableType tableType_; }; class HiveSortingColumn : public ISerializable { @@ -129,6 +118,14 @@ class HiveBucketProperty : public ISerializable { const std::vector& bucketedTypes, const std::vector>& sortedBy); + HiveBucketProperty( + Kind kind, + int32_t bucketCount, + const std::vector& bucketedBy, + const std::vector& bucketedTypes, + const std::vector& sortColumns, + const std::vector& sortOrders); + Kind kind() const { return kind_; } @@ -177,7 +174,7 @@ class HiveBucketProperty : public ISerializable { const int32_t bucketCount_; const std::vector bucketedBy_; const std::vector bucketTypes_; - const std::vector> sortedBy_; + std::vector> sortedBy_; }; FOLLY_ALWAYS_INLINE std::ostream& operator<<( diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index 0a24b6b10c4..a6fbb5e2266 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -19,6 +19,7 @@ add_executable( HiveConnectorTest.cpp HiveConnectorUtilTest.cpp HiveConnectorSerDeTest.cpp + HiveObjectFactoryTest.cpp HivePartitionFunctionTest.cpp HivePartitionUtilTest.cpp HiveSplitTest.cpp diff --git a/velox/connectors/hive/tests/HiveObjectFactoryTest.cpp b/velox/connectors/hive/tests/HiveObjectFactoryTest.cpp new file mode 100644 index 00000000000..cdc68498db2 --- /dev/null +++ b/velox/connectors/hive/tests/HiveObjectFactoryTest.cpp @@ -0,0 +1,242 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/memory/MemoryAllocator.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/exec/tests/utils/OperatorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" + +namespace facebook::velox::connector::hive::test { + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive; +using facebook::velox::connector::ConnectorLocationHandle; + +static constexpr char kConnectorId[] = "hive-test"; + +class HiveObjectFactoryTest : public exec::test::OperatorTestBase { + public: + static void SetUpTestCase() { + OperatorTestBase::SetUpTestCase(); + + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + connector::hive::HiveInsertFileNameGenerator::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + Type::registerSerDe(); + } + + void TearDown() override { + factory_.reset(); + } + + std::shared_ptr makeHiveColumnHandle( + const std::string& name, + HiveColumnHandle::ColumnType columnType, + const TypePtr& dataType, + const TypePtr& hiveType, + std::vector requiredSubfields) { + folly::dynamic opts = folly::dynamic::object; + + // columnType would be serialized as int32_t + opts["columnType"] = (int32_t)columnType; + opts["hiveType"] = hiveType->serialize(); + + opts["requiredSubfields"] = folly::dynamic::array(); + for (const auto& subfield : requiredSubfields) { + opts["requiredSubfields"].push_back(subfield); + } + + auto colHandle = + factory_->makeColumnHandle(kConnectorId, "colX", dataType, opts); + + return colHandle; + } + + protected: + std::unique_ptr factory_; +}; + +TEST_F(HiveObjectFactoryTest, MakeConnectorSplitDefaults) { + // No options: only filePath/start/length/connectorId are set + auto splitPtr = factory_->makeConnectorSplit( + kConnectorId, "s3://bucket/path/file.orc", 123, 456); + auto* split = dynamic_cast(splitPtr.get()); + ASSERT_NE(split, nullptr); + EXPECT_EQ(split->filePath, "s3://bucket/path/file.orc"); + EXPECT_EQ(split->start, 123); + EXPECT_EQ(split->length, 456); + EXPECT_EQ(split->connectorId, kConnectorId); + + // Defaults: PARQUET format, weight=0, cacheable=true + // TODO: use constant literal + EXPECT_EQ(split->fileFormat, dwio::common::FileFormat::PARQUET); + EXPECT_EQ(split->splitWeight, 0); + EXPECT_TRUE(split->cacheable); +} + +TEST_F(HiveObjectFactoryTest, MakeConnectorSplitWithOptions) { + folly::dynamic opts = folly::dynamic::object; + + // Basic configuration + opts["fileFormat"] = static_cast(dwio::common::FileFormat::PARQUET); + opts["splitWeight"] = 42; + opts["cacheable"] = true; + + // Info columns map: column name -> info string + opts["infoColumns"] = folly::dynamic::object; + opts["infoColumns"]["colA"] = "infoA"; + opts["infoColumns"]["colB"] = "infoB"; + + // Partition keys map: key -> value + opts["partitionKeys"] = folly::dynamic::object; + opts["partitionKeys"]["p1"] = "v1"; + + auto splitPtr = + factory_->makeConnectorSplit(kConnectorId, "/tmp/f.p", 0, 10, opts); + auto* split = dynamic_cast(splitPtr.get()); + ASSERT_NE(split, nullptr); + + EXPECT_EQ(split->fileFormat, dwio::common::FileFormat::PARQUET); + EXPECT_EQ(split->splitWeight, 42); + EXPECT_TRUE(split->cacheable); + + // infoColumns + auto info = split->infoColumns; + EXPECT_EQ(info.at("colA"), "infoA"); + EXPECT_EQ(info.at("colB"), "infoB"); + + // partitionKeys + auto parts = split->partitionKeys; + ASSERT_EQ(parts.size(), 1); + EXPECT_EQ(parts.count("p1"), 1); + EXPECT_EQ(parts["p1"], "v1"); +} + +TEST_F(HiveObjectFactoryTest, MakeTableHandleWithOptions) { + // Build a RowType for data columns: two ints + auto rowType = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); + + // ColumnHandles + auto c0 = makeHiveColumnHandle( + "c0", + HiveColumnHandle::ColumnType::kRegular, + INTEGER(), + INTEGER(), + {"c0"}); + auto c1 = makeHiveColumnHandle( + "c1", HiveColumnHandle::ColumnType::kRegular, BIGINT(), BIGINT(), {"c1"}); + + // Options: disable filter pushdown, add subfield filter & remaining filter & + // tableParameters + folly::dynamic opts = folly::dynamic::object; + + // Basic configuration + opts["filterPushdownEnabled"] = false; + + // common::SubfieldFilters : std::unordered_map; + auto filter = std::make_unique(-100, 100, false); + opts["subfieldFilters"] = folly::dynamic::object; + opts["subfieldFilters"]["c0"] = filter->serialize(); + + // core::TypedExprPtr remainingFilter : std::shared_ptr; + auto remainingFilter = parseExpr("c1 + 1 > 1", rowType); + opts["remainingFilter"] = remainingFilter->serialize(); + + // Arbitrary table parameters + opts["tableParameters"] = folly::dynamic::object; + opts["tableParameters"]["pA"] = "vA"; + + auto handlePtr = + factory_->makeTableHandle(kConnectorId, "tbl", {c0, c1}, opts); + auto* hiveHandle = dynamic_cast(handlePtr.get()); + ASSERT_NE(hiveHandle, nullptr); + + EXPECT_FALSE(hiveHandle->isFilterPushdownEnabled()); + + const auto& subfieldFilters = hiveHandle->subfieldFilters(); + ASSERT_EQ(subfieldFilters.count(common::Subfield("c0")), 1); + subfieldFilters.at(common::Subfield("c0"))->testingEquals(*filter); + + auto remainingFilterCreated = hiveHandle->remainingFilter(); + ASSERT_NE(remainingFilterCreated, nullptr); + ASSERT_EQ(*remainingFilterCreated, *remainingFilter); + + auto params = hiveHandle->tableParameters(); + ASSERT_EQ(params.size(), 1); + EXPECT_EQ(params.at("pA"), "vA"); +} + +TEST_F(HiveObjectFactoryTest, MakeColumnHandle) { + auto colHandle = makeHiveColumnHandle( + "colX", + HiveColumnHandle::ColumnType::kRegular, + BIGINT(), + BIGINT(), + std::vector({"f1", "f2"})); + + auto* hiveColumnHandle = dynamic_cast(colHandle.get()); + ASSERT_NE(hiveColumnHandle, nullptr); + + EXPECT_EQ(hiveColumnHandle->name(), "colX"); + EXPECT_EQ( + hiveColumnHandle->columnType(), HiveColumnHandle::ColumnType::kRegular); + EXPECT_EQ(hiveColumnHandle->dataType(), BIGINT()); + EXPECT_EQ(hiveColumnHandle->hiveType(), BIGINT()); + + std::vector requiredSubfields; + requiredSubfields.emplace_back("f1"); + requiredSubfields.emplace_back("f2"); + EXPECT_EQ(hiveColumnHandle->requiredSubfields(), requiredSubfields); +} + +TEST_F(HiveObjectFactoryTest, MakeLocationHandle) { + folly::dynamic opts = folly::dynamic::object; + + // Basic configuration + opts["targetPath"] = "/tmp/out1"; + opts["writePath"] = "/tmp/out2"; + opts["targetFileName"] = "test.parquet"; + + // Default: writeDirectory == targetDirectory + auto locationHandle1 = factory_->makeLocationHandle( + kConnectorId, ConnectorLocationHandle::TableType::kNew, opts); + + auto* hiveLocationHandle = + dynamic_cast(locationHandle1.get()); + ASSERT_NE(hiveLocationHandle, nullptr); + + EXPECT_EQ(hiveLocationHandle->targetPath(), "/tmp/out1"); + EXPECT_EQ(hiveLocationHandle->writePath(), "/tmp/out2"); + EXPECT_EQ(hiveLocationHandle->targetFileName(), "test.parquet"); + EXPECT_EQ( + locationHandle1->tableType(), ConnectorLocationHandle::TableType::kNew); +} + +} // namespace facebook::velox::connector::hive::test diff --git a/velox/connectors/tpcds/TpcdsConnector.h b/velox/connectors/tpcds/TpcdsConnector.h index 88ad845ff39..1990a682042 100644 --- a/velox/connectors/tpcds/TpcdsConnector.h +++ b/velox/connectors/tpcds/TpcdsConnector.h @@ -179,6 +179,8 @@ class TpcdsConnectorFactory : public ConnectorFactory { folly::Executor* cpuExecutor = nullptr) override { return std::make_shared(id, config, ioExecutor); } + + // TODO: Add object makers like makeTableHandle, makeColumnHandle, etc. }; } // namespace facebook::velox::connector::tpcds diff --git a/velox/connectors/tpch/TpchConnector.h b/velox/connectors/tpch/TpchConnector.h index 5d006490bad..608b6a62f13 100644 --- a/velox/connectors/tpch/TpchConnector.h +++ b/velox/connectors/tpch/TpchConnector.h @@ -34,7 +34,7 @@ class TpchColumnHandle : public ColumnHandle { public: explicit TpchColumnHandle(const std::string& name) : name_(name) {} - const std::string& name() const { + const std::string& name() const override { return name_; } @@ -196,6 +196,8 @@ class TpchConnectorFactory : public ConnectorFactory { folly::Executor* ioExecutor = nullptr, folly::Executor* cpuExecutor = nullptr) override { return std::make_shared(id, config, ioExecutor); + + // TODO: Add object makers like makeTableHandle, makeColumnHandle, etc. } }; diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 20800bbe30c..9b5935e8c7f 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -36,6 +36,7 @@ using PlanNodeId = std::string; /// Generic representation of InsertTable struct InsertTableHandle { + // TODO: Merge into connectors/Connector.h public: InsertTableHandle( const std::string& connectorId, From 64f0960adc5e26f95a621ee060dc9c57d1607e20 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Wed, 3 Sep 2025 16:42:18 -0700 Subject: [PATCH 4/6] refactor: Move PartitionFunctionSpecs out of PlanNode.h PartitionFunction and PartitionFunctionSpec are core concepts used in various places like exec and connectors. To make exec PlanBuilder test utilility class connector agnostic, this commit moves them out of PlanNode.h to a standalone file in the same folder, making it a first class interface in velox/core. --- velox/core/PartitionFunction.h | 80 ++++++++++++++++++++++++++ velox/core/PlanNode.h | 58 +------------------ velox/exec/tests/utils/PlanBuilder.cpp | 1 + 3 files changed, 82 insertions(+), 57 deletions(-) create mode 100644 velox/core/PartitionFunction.h diff --git a/velox/core/PartitionFunction.h b/velox/core/PartitionFunction.h new file mode 100644 index 00000000000..da342ec860f --- /dev/null +++ b/velox/core/PartitionFunction.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::core { + +/// Calculates partition number for each row of the specified vector. +class PartitionFunction { + public: + virtual ~PartitionFunction() = default; + + /// @param input RowVector to split into partitions. + /// @param [out] partitions Computed partition numbers for each row in + /// 'input'. + /// @return Returns partition number in case all rows of 'input' are + /// assigned to the same partition. In this case 'partitions' vector is left + /// unchanged. Used to optimize round-robin partitioning in local exchange. + virtual std::optional partition( + const RowVector& input, + std::vector& partitions) = 0; +}; + +/// Factory class for creating PartitionFunction instances. +class PartitionFunctionSpec : public ISerializable { + public: + /// If 'localExchange' is true, the partition function is used for local + /// exchange within a velox task. + virtual std::unique_ptr create( + int numPartitions, + bool localExchange = false) const = 0; + + virtual ~PartitionFunctionSpec() = default; + + virtual std::string toString() const = 0; +}; + +using PartitionFunctionSpecPtr = std::shared_ptr; + +class GatherPartitionFunctionSpec : public PartitionFunctionSpec { + public: + std::unique_ptr create( + int /*numPartitions*/, + bool /*localExchange*/) const override { + VELOX_UNREACHABLE(); + } + + std::string toString() const override { + return "gather"; + } + + folly::dynamic serialize() const override { + folly::dynamic obj = folly::dynamic::object; + obj["name"] = "GatherPartitionFunctionSpec"; + return obj; + } + + static PartitionFunctionSpecPtr deserialize( + const folly::dynamic& /* obj */, + void* /* context */) { + return std::make_shared(); + } +}; + +} // namespace facebook::velox::core diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 9b5935e8c7f..248eda3aa6d 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -22,6 +22,7 @@ #include "velox/common/Enums.h" #include "velox/connectors/Connector.h" #include "velox/core/Expressions.h" +#include "velox/core/PartitionFunction.h" #include "velox/core/QueryConfig.h" #include "velox/vector/VectorStream.h" @@ -2341,63 +2342,6 @@ class LocalMergeNode : public PlanNode { using LocalMergeNodePtr = std::shared_ptr; -/// Calculates partition number for each row of the specified vector. -class PartitionFunction { - public: - virtual ~PartitionFunction() = default; - - /// @param input RowVector to split into partitions. - /// @param [out] partitions Computed partition numbers for each row in - /// 'input'. - /// @return Returns partition number in case all rows of 'input' are - /// assigned to the same partition. In this case 'partitions' vector is left - /// unchanged. Used to optimize round-robin partitioning in local exchange. - virtual std::optional partition( - const RowVector& input, - std::vector& partitions) = 0; -}; - -/// Factory class for creating PartitionFunction instances. -class PartitionFunctionSpec : public ISerializable { - public: - /// If 'localExchange' is true, the partition function is used for local - /// exchange within a velox task. - virtual std::unique_ptr create( - int numPartitions, - bool localExchange = false) const = 0; - - virtual ~PartitionFunctionSpec() = default; - - virtual std::string toString() const = 0; -}; - -using PartitionFunctionSpecPtr = std::shared_ptr; - -class GatherPartitionFunctionSpec : public PartitionFunctionSpec { - public: - std::unique_ptr create( - int /*numPartitions*/, - bool /*localExchange*/) const override { - VELOX_UNREACHABLE(); - } - - std::string toString() const override { - return "gather"; - } - - folly::dynamic serialize() const override { - folly::dynamic obj = folly::dynamic::object; - obj["name"] = "GatherPartitionFunctionSpec"; - return obj; - } - - static PartitionFunctionSpecPtr deserialize( - const folly::dynamic& /* obj */, - void* /* context */) { - return std::make_shared(); - } -}; - /// Partitions data using specified partition function. The number of /// partitions is determined by the parallelism of the upstream pipeline. Can /// be used to gather data from multiple sources. diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 63995ab948a..4d693999924 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -19,6 +19,7 @@ #include "velox/connectors/hive/TableHandle.h" #include "velox/connectors/tpcds/TpcdsConnector.h" #include "velox/connectors/tpch/TpchConnector.h" +#include "velox/core/PartitionFunction.h" #include "velox/duckdb/conversion/DuckParser.h" #include "velox/exec/Aggregate.h" #include "velox/exec/HashPartitionFunction.h" From 923cce85dec5c5f7c14acc17f57abaed25e79d39 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Tue, 9 Sep 2025 21:17:37 -0700 Subject: [PATCH 5/6] build: Make ConnectorFactory registration connector agnostic --- velox/connectors/CMakeLists.txt | 6 ++ .../connectors/RegisterConnectorFactories.cpp | 64 +++++++++++++++++++ velox/connectors/RegisterConnectorFactories.h | 25 ++++++++ velox/connectors/fuzzer/FuzzerConnector.cpp | 10 +++ velox/connectors/fuzzer/FuzzerConnector.h | 5 ++ velox/connectors/hive/HiveConnector.cpp | 7 ++ velox/connectors/hive/HiveConnector.h | 4 ++ velox/connectors/tpcds/TpcdsConnector.cpp | 11 ++++ velox/connectors/tpcds/TpcdsConnector.h | 5 ++ velox/connectors/tpch/TpchConnector.cpp | 11 ++++ velox/connectors/tpch/TpchConnector.h | 5 ++ 11 files changed, 153 insertions(+) create mode 100644 velox/connectors/RegisterConnectorFactories.cpp create mode 100644 velox/connectors/RegisterConnectorFactories.h diff --git a/velox/connectors/CMakeLists.txt b/velox/connectors/CMakeLists.txt index 3fd17dde2d2..492acb57959 100644 --- a/velox/connectors/CMakeLists.txt +++ b/velox/connectors/CMakeLists.txt @@ -32,3 +32,9 @@ endif() if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() + +add_library(velox_registered_connector_factories STATIC RegisterConnectorFactories.cpp) +target_link_libraries( + velox_registered_connector_factories + PUBLIC $<$:velox_dwio_parquet_reader> velox_dwio_common +) diff --git a/velox/connectors/RegisterConnectorFactories.cpp b/velox/connectors/RegisterConnectorFactories.cpp new file mode 100644 index 00000000000..cbb645253ce --- /dev/null +++ b/velox/connectors/RegisterConnectorFactories.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/RegisterConnectorFactories.h" + +#ifdef VELOX_ENABLE_HIVE_CONNECTOR +#include "velox/connectors/hive/HiveConnector.h" +#endif +#ifdef VELOX_ENABLE_TPCDS_CONNECTOR +#include "velox/connectors/tpcds/TpcdsConnector.h" +#endif +#ifdef VELOX_ENABLE_TPCH_CONNECTOR +#include "velox/connectors/tpch/TpchConnector.h" +#endif +#ifdef VELOX_ENABLE_FUZZER_CONNECTOR +#include "velox/connectors/fuzzer/FuzzerConnector.h" +#endif + +namespace facebook::velox::connector { + +void registerConnectorFactories() { +#ifdef VELOX_ENABLE_HIVE_CONNECTOR + hive::registerHiveConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_TPCDS_CONNECTOR + tpcds::registerTpcdsConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_TPCH_CONNECTOR + tpch::registerTpchConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_FUZZER_CONNECTOR + fuzzer::registerFuzzerConnectorFactory(); +#endif +} + +void unregisterConnectorFactories() { +#ifdef VELOX_ENABLE_HIVE_CONNECTOR + hive::registerHiveConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_TPCDS_CONNECTOR + tpcds::registerTpcdsConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_TPCH_CONNECTOR + tpch::registerTpchConnectorFactory(); +#endif +#ifdef VELOX_ENABLE_FUZZER_CONNECTOR + fuzzer::registerFuzzerConnectorFactory(); +#endif +} + +} // namespace facebook::velox::connector diff --git a/velox/connectors/RegisterConnectorFactories.h b/velox/connectors/RegisterConnectorFactories.h new file mode 100644 index 00000000000..f7bc3d7fa8f --- /dev/null +++ b/velox/connectors/RegisterConnectorFactories.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::connector { + +void registerConnectorFactories(); + +void unregisterConnectorFactories(); + +} // namespace facebook::velox::connector diff --git a/velox/connectors/fuzzer/FuzzerConnector.cpp b/velox/connectors/fuzzer/FuzzerConnector.cpp index f17b8faefd9..a91ea2314b7 100644 --- a/velox/connectors/fuzzer/FuzzerConnector.cpp +++ b/velox/connectors/fuzzer/FuzzerConnector.cpp @@ -15,6 +15,7 @@ */ #include "velox/connectors/fuzzer/FuzzerConnector.h" +#include "velox/connectors/ConnectorNames.h" #include "velox/vector/fuzzer/VectorFuzzer.h" namespace facebook::velox::connector::fuzzer { @@ -69,4 +70,13 @@ std::optional FuzzerDataSource::next( return outputVector; } +bool registerFuzzerConnectorFactory( + std::unique_ptr factory) { + connector::registerConnectorFactory(std::move(factory)); +} + +bool unregisterFuzzerConnectorFactory() { + connector::unregisterConnectorFactory(connector::kFuzzerConnectorName); +} + } // namespace facebook::velox::connector::fuzzer diff --git a/velox/connectors/fuzzer/FuzzerConnector.h b/velox/connectors/fuzzer/FuzzerConnector.h index 1b06234ed52..dbf117d07d7 100644 --- a/velox/connectors/fuzzer/FuzzerConnector.h +++ b/velox/connectors/fuzzer/FuzzerConnector.h @@ -145,4 +145,9 @@ class FuzzerConnectorFactory : public ConnectorFactory { // TODO: Add object makers like makeTableHandle, makeColumnHandle, etc. }; +bool registerFuzzerConnectorFactory( + std::unique_ptr factory); + +bool unregisterFuzzerConnectorFactory(); + } // namespace facebook::velox::connector::fuzzer diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 9ba39add07f..266bc191149 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -16,6 +16,7 @@ #include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/ConnectorNames.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/HiveDataSource.h" @@ -588,4 +589,10 @@ void HivePartitionFunctionSpec::registerSerDe() { "HivePartitionFunctionSpec", HivePartitionFunctionSpec::deserialize); } +bool registerHiveConnectorMetadataFactory( + std::unique_ptr factory) { + hiveConnectorMetadataFactories().push_back(std::move(factory)); + return true; +} + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index 1c34de0a364..ca6c7ffba23 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -193,4 +193,8 @@ class HivePartitionFunctionSpec : public core::PartitionFunctionSpec { const std::vector constValues_; }; +bool registerHiveConnectorFactory(std::unique_ptr); + +bool unregisterHiveConnectorFactory(); + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/tpcds/TpcdsConnector.cpp b/velox/connectors/tpcds/TpcdsConnector.cpp index 5902f49da4c..e5ca847ef8e 100644 --- a/velox/connectors/tpcds/TpcdsConnector.cpp +++ b/velox/connectors/tpcds/TpcdsConnector.cpp @@ -16,6 +16,7 @@ #include +#include "velox/connectors/ConnectorNames.h" #include "velox/connectors/tpcds/TpcdsConnector.h" #include "velox/tpcds/gen/DSDGenIterator.h" @@ -138,4 +139,14 @@ std::optional TpcdsDataSource::next( return projectOutputColumns(outputVector); } + +bool registerTpcdsConnectorFactory( + std::unique_ptr factory) { + connector::registerConnectorFactory(std::move(factory)); +} + +bool unregisterTpcdsConnectorFactory() { + connector::unregisterConnectorFactory(connector::kTpcdsConnectorName); +} + } // namespace facebook::velox::connector::tpcds diff --git a/velox/connectors/tpcds/TpcdsConnector.h b/velox/connectors/tpcds/TpcdsConnector.h index 1990a682042..99870fc31d8 100644 --- a/velox/connectors/tpcds/TpcdsConnector.h +++ b/velox/connectors/tpcds/TpcdsConnector.h @@ -183,4 +183,9 @@ class TpcdsConnectorFactory : public ConnectorFactory { // TODO: Add object makers like makeTableHandle, makeColumnHandle, etc. }; +bool registerTpchConnectorFactory( + std::unique_ptr factory); + +bool unregisterTpcdsConnectorFactory(); + } // namespace facebook::velox::connector::tpcds diff --git a/velox/connectors/tpch/TpchConnector.cpp b/velox/connectors/tpch/TpchConnector.cpp index cf70061aada..f1c627809df 100644 --- a/velox/connectors/tpch/TpchConnector.cpp +++ b/velox/connectors/tpch/TpchConnector.cpp @@ -15,6 +15,8 @@ */ #include "velox/connectors/tpch/TpchConnector.h" + +#include "velox/connectors/ConnectorNames.h" #include "velox/exec/OperatorUtils.h" #include "velox/expression/Expr.h" #include "velox/tpch/gen/TpchGen.h" @@ -29,6 +31,15 @@ TpchConnector::TpchConnector( folly::Executor* /*executor*/) : Connector(id, std::move(config)) {} +bool registerTpchConnectorFactory( + std::unique_ptr factory) { + connector::registerConnectorFactory(std::move(factory)); +} + +bool unregisterTpcdsConnectorFactory() { + connector::unregisterConnectorFactory(connector::kTpchConnectorName); +} + namespace { RowVectorPtr getTpchData( diff --git a/velox/connectors/tpch/TpchConnector.h b/velox/connectors/tpch/TpchConnector.h index 608b6a62f13..7eeca9bbafd 100644 --- a/velox/connectors/tpch/TpchConnector.h +++ b/velox/connectors/tpch/TpchConnector.h @@ -201,4 +201,9 @@ class TpchConnectorFactory : public ConnectorFactory { } }; +bool registerTpchConnectorFactory( + std::unique_ptr factory); + +bool unregisterTpcdsConnectorFactory(); + } // namespace facebook::velox::connector::tpch From 8ff964a10e126b62276b0700001d893d097d8355 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Tue, 2 Sep 2025 15:12:19 -0700 Subject: [PATCH 6/6] refactor: Make PlanBuilder connector agnostic: Drop Hive dependency PlanBuilder is a core test util in exec and should be connector agnostic. This commits makes the generic TableScan, TableWriter, and PartitionFunctionSpec connector agnostic. The next step would be removing Tpch and Tpcds spsecific code from PlanBuilder. --- .../gcs/tests/GcsMultipleEndpointsTest.cpp | 1 + .../s3fs/tests/S3MultipleEndpointsTest.cpp | 1 + velox/exec/fuzzer/WriterFuzzer.cpp | 49 ++-- velox/exec/tests/PlanBuilderTest.cpp | 1 + velox/exec/tests/TableEvolutionFuzzer.cpp | 9 +- velox/exec/tests/TableWriterTest.cpp | 13 +- velox/exec/tests/utils/CMakeLists.txt | 2 + velox/exec/tests/utils/PlanBuilder.cpp | 251 ++++++++++-------- velox/exec/tests/utils/PlanBuilder.h | 91 +++++-- .../exec/tests/utils/TableWriterTestBase.cpp | 56 ++-- velox/tool/trace/TableWriterReplayer.cpp | 1 + 11 files changed, 281 insertions(+), 194 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/GcsMultipleEndpointsTest.cpp b/velox/connectors/hive/storage_adapters/gcs/tests/GcsMultipleEndpointsTest.cpp index 00bb3310a06..0889ce7a92b 100644 --- a/velox/connectors/hive/storage_adapters/gcs/tests/GcsMultipleEndpointsTest.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/tests/GcsMultipleEndpointsTest.cpp @@ -91,6 +91,7 @@ class GcsMultipleEndpointsTest : public testing::Test, 0, {}, {}, + {}, dwio::common::FileFormat::PARQUET, {}, connectorId) diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp index cf446b12d2e..2967b1a52ac 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp @@ -93,6 +93,7 @@ class S3MultipleEndpoints : public S3Test, public ::test::VectorTestBase { 0, {}, {}, + {}, dwio::common::FileFormat::PARQUET, {}, connectorId) diff --git a/velox/exec/fuzzer/WriterFuzzer.cpp b/velox/exec/fuzzer/WriterFuzzer.cpp index 9c2132c7403..eae1791d284 100644 --- a/velox/exec/fuzzer/WriterFuzzer.cpp +++ b/velox/exec/fuzzer/WriterFuzzer.cpp @@ -26,6 +26,7 @@ #include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/exec/fuzzer/FuzzerUtil.h" @@ -145,7 +146,10 @@ class WriterFuzzer { int32_t bucketCount, const std::vector& bucketColumns, int32_t sortColumnOffset, - const std::vector>& sortBy, + const std::vector& sortColumnNames, + const std::vector& sortOrders, + // const std::vector>& + // sortBy, const std::shared_ptr& outputDirectoryPath); // Generates table column handles based on table column properties @@ -342,7 +346,9 @@ void WriterFuzzer::go() { int32_t bucketCount = 0; std::vector bucketColumns; int32_t sortColumnOffset = 0; - std::vector> sortBy; + // std::vector> sortBy; + std::vector sortColumnNames; + std::vector sortOrders; // Regular table columns generateColumns(5, "c", kRegularColumnTypes_, 2, names, types); @@ -362,13 +368,11 @@ void WriterFuzzer::go() { auto [sortColumns, offset] = generateSortColumns(3, bucketColumns, names, types); sortColumnOffset -= offset; - sortBy.reserve(sortColumns.size()); - for (const auto& sortByColumn : sortColumns) { - sortBy.push_back(std::make_shared( - sortByColumn, - kSortOrderTypes_.at( - boost::random::uniform_int_distribution( - 0, 1)(rng_)))); + sortColumnNames = std::move(sortColumns); + sortOrders.reserve(sortColumnNames.size()); + for (const auto& sortByColumn : sortColumnNames) { + sortOrders.push_back(kSortOrderTypes_.at( + boost::random::uniform_int_distribution(0, 1)(rng_))); } } } @@ -391,7 +395,8 @@ void WriterFuzzer::go() { bucketCount, bucketColumns, sortColumnOffset, - sortBy, + sortColumnNames, + sortOrders, outputDirPath); LOG(INFO) << "==============================> Done with iteration " @@ -499,7 +504,9 @@ void WriterFuzzer::verifyWriter( const int32_t bucketCount, const std::vector& bucketColumns, const int32_t sortColumnOffset, - const std::vector>& sortBy, + const std::vector& sortColumnNames, + const std::vector& sortOrders, + // const std::vector>& sortBy, const std::shared_ptr& outputDirectoryPath) { const auto plan = PlanBuilder() .values(input) @@ -508,7 +515,8 @@ void WriterFuzzer::verifyWriter( partitionKeys, bucketCount, bucketColumns, - sortBy) + sortColumnNames, + sortOrders) .planNode(); const auto maxDrivers = @@ -588,13 +596,20 @@ void WriterFuzzer::verifyWriter( } // 4. Verifies sorting. - if (sortBy.size() > 0) { - const std::vector sortColumnNames = { - names.begin() + sortColumnOffset, - names.begin() + sortColumnOffset + sortBy.size()}; + if (sortColumnNames.size() > 0) { + // const std::vector sortColumnNames = { + // names.begin() + sortColumnOffset, + // names.begin() + sortColumnOffset + sortBy.size()}; const std::vector sortColumnTypes = { types.begin() + sortColumnOffset, - types.begin() + sortColumnOffset + sortBy.size()}; + types.begin() + sortColumnOffset + sortColumnNames.size()}; + + std::vector> sortBy; + sortBy.reserve(sortColumnNames.size()); + for (size_t i = 0; i < sortColumnNames.size(); ++i) { + sortBy.emplace_back(std::make_shared( + sortColumnNames[i], sortOrders[i])); + } // Read from each file and check if data is sorted as presto sorted // result. diff --git a/velox/exec/tests/PlanBuilderTest.cpp b/velox/exec/tests/PlanBuilderTest.cpp index ca2a2c073f7..11212b193c5 100644 --- a/velox/exec/tests/PlanBuilderTest.cpp +++ b/velox/exec/tests/PlanBuilderTest.cpp @@ -315,6 +315,7 @@ TEST_F(PlanBuilderTest, commitStrategyParameter) { 0, {}, {}, + {}, dwio::common::FileFormat::DWRF, {}, PlanBuilder::kHiveDefaultConnectorId, diff --git a/velox/exec/tests/TableEvolutionFuzzer.cpp b/velox/exec/tests/TableEvolutionFuzzer.cpp index a353cbe5057..9fb7559d8c3 100644 --- a/velox/exec/tests/TableEvolutionFuzzer.cpp +++ b/velox/exec/tests/TableEvolutionFuzzer.cpp @@ -846,7 +846,8 @@ std::unique_ptr TableEvolutionFuzzer::makeWriteTask( /*partitionBy=*/{}, /*bucketCount=*/0, /*bucketedBy=*/{}, - /*sortBy=*/{}, + /*sortColumns=*/{}, + /*sortBys=*/{}, setup.fileFormat, /*aggregates=*/{}, /*connectorId=*/PlanBuilder::kHiveDefaultConnectorId, @@ -866,7 +867,8 @@ std::unique_ptr TableEvolutionFuzzer::makeWriteTask( /*partitionBy=*/{}, setup.bucketCount(), bucketColumnNames, - /*sortBy=*/{}, + /*sortColumns=*/{}, + /*sortBys=*/{}, setup.fileFormat, /*aggregates=*/{}, /*connectorId=*/PlanBuilder::kHiveDefaultConnectorId, @@ -877,7 +879,8 @@ std::unique_ptr TableEvolutionFuzzer::makeWriteTask( /*partitionBy=*/{}, setup.bucketCount(), bucketColumnNames, - /*sortBy=*/{}, + /*sortColumns=*/{}, + /*sortBys=*/{}, setup.fileFormat); } } diff --git a/velox/exec/tests/TableWriterTest.cpp b/velox/exec/tests/TableWriterTest.cpp index 2b91f729c37..4c5b248701d 100644 --- a/velox/exec/tests/TableWriterTest.cpp +++ b/velox/exec/tests/TableWriterTest.cpp @@ -130,6 +130,7 @@ TEST_F(BasicTableWriterTestBase, structAsMap) { 0, {}, {}, + {}, dwio::common::FileFormat::DWRF, {}, PlanBuilder::kHiveDefaultConnectorId, @@ -2932,10 +2933,8 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { {"c0"}, 4, {"c1"}, - { - std::make_shared( - "c2", core::SortOrder{false, false}), - }) + {"c2"}, + {core::SortOrder{false, false}}) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, @@ -3330,10 +3329,8 @@ DEBUG_ONLY_TEST_F( {"c0"}, 4, {"c1"}, - { - std::make_shared( - "c2", core::SortOrder{false, false}), - }) + {"c2"}, + {core::SortOrder{false, false}}) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index cf3ef9dbb10..d6b9b858ddc 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -43,6 +43,8 @@ target_link_libraries( velox_vector_test_lib velox_vector_fuzzer velox_temp_path + velox_connector + velox_registered_connector_factories velox_cursor velox_core velox_exception diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 4d693999924..984d702f05b 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -15,8 +15,6 @@ */ #include "velox/exec/tests/utils/PlanBuilder.h" -#include "velox/connectors/hive/HiveConnector.h" -#include "velox/connectors/hive/TableHandle.h" #include "velox/connectors/tpcds/TpcdsConnector.h" #include "velox/connectors/tpch/TpchConnector.h" #include "velox/core/PartitionFunction.h" @@ -37,7 +35,6 @@ using namespace facebook::velox; using namespace facebook::velox::connector; -using namespace facebook::velox::connector::hive; namespace facebook::velox::exec::test { namespace { @@ -51,23 +48,6 @@ core::TypedExprPtr parseExpr( return core::Expressions::inferTypes(untyped, rowType, pool); } -std::shared_ptr buildHiveBucketProperty( - const RowTypePtr rowType, - int32_t bucketCount, - const std::vector& bucketColumns, - const std::vector>& sortBy) { - std::vector bucketTypes; - bucketTypes.reserve(bucketColumns.size()); - for (const auto& bucketColumn : bucketColumns) { - bucketTypes.push_back(rowType->childAt(rowType->getChildIdx(bucketColumn))); - } - return std::make_shared( - HiveBucketProperty::Kind::kHiveCompatible, - bucketCount, - bucketColumns, - bucketTypes, - sortBy); -} } // namespace PlanBuilder& PlanBuilder::tableScan( @@ -75,8 +55,11 @@ PlanBuilder& PlanBuilder::tableScan( const std::vector& subfieldFilters, const std::string& remainingFilter, const RowTypePtr& dataColumns, - const connector::ColumnHandleMap& assignments) { - return TableScanBuilder(*this) + const connector::ColumnHandleMap& assignments, + const std::string& connectorName, + const std::string& connectorId) { + return TableScanBuilder(*this, connectorName) + .connectorId(connectorId) .filtersAsNode(filtersAsNode_ ? planNodeIdGenerator_ : nullptr) .outputType(outputType) .assignments(assignments) @@ -93,14 +76,16 @@ PlanBuilder& PlanBuilder::tableScan( const std::vector& subfieldFilters, const std::string& remainingFilter, const RowTypePtr& dataColumns, - const connector::ColumnHandleMap& assignments) { - return TableScanBuilder(*this) + const connector::ColumnHandleMap& assignments, + const std::string& connectorName, + const std::string& connectorId) { + return TableScanBuilder(*this, connectorName) + .connectorId(connectorId) .filtersAsNode(filtersAsNode_ ? planNodeIdGenerator_ : nullptr) .tableName(tableName) .outputType(outputType) .columnAliases(columnAliases) .dataColumns(dataColumns) - .subfieldFilters(subfieldFilters) .remainingFilter(remainingFilter) .assignments(assignments) @@ -111,8 +96,11 @@ PlanBuilder& PlanBuilder::tableScanWithPushDown( const RowTypePtr& outputType, const PushdownConfig& pushdownConfig, const RowTypePtr& dataColumns, - const connector::ColumnHandleMap& assignments) { - return TableScanBuilder(*this) + const connector::ColumnHandleMap& assignments, + const std::string& connectorName, + const std::string& connectorId) { + return TableScanBuilder(*this, connectorName) + .connectorId(connectorId) .filtersAsNode(filtersAsNode_ ? planNodeIdGenerator_ : nullptr) .outputType(outputType) .assignments(assignments) @@ -260,29 +248,28 @@ void addConjunct( core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) { VELOX_CHECK_NOT_NULL(outputType_, "outputType must be specified"); + VELOX_CHECK_NOT_NULL( + connectorFactory_, "connectorFactory_ must be specified"); std::unordered_map typedMapping; + std::vector> columnHandles; bool hasAssignments = !(assignments_.empty()); for (uint32_t i = 0; i < outputType_->size(); ++i) { const auto& name = outputType_->nameOf(i); const auto& type = outputType_->childAt(i); - std::string hiveColumnName = name; + std::string columnName = name; auto it = columnAliases_.find(name); if (it != columnAliases_.end()) { - hiveColumnName = it->second; + columnName = it->second; typedMapping.emplace( - name, - std::make_shared(type, hiveColumnName)); + name, std::make_shared(type, columnName)); } if (!hasAssignments) { - assignments_.insert( - {name, - std::make_shared( - hiveColumnName, - HiveColumnHandle::ColumnType::kRegular, - type, - type)}); + auto columnHandle = + connectorFactory_->makeColumnHandle(connectorId_, columnName, type); + columnHandles.push_back(columnHandle); + assignments_.insert({name, columnHandle}); } } @@ -313,13 +300,29 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) { } if (!tableHandle_) { - tableHandle_ = std::make_shared( - connectorId_, - tableName_, - true, - std::move(subfieldFiltersMap_), - remainingFilterExpr, - dataColumns_); + // Build connector-specific table handle through the ObjectFactory. + folly::dynamic options = folly::dynamic::object(); + options["filterPushdownEnabled"] = true; + + // Serialize subfield filters + if (!subfieldFiltersMap_.empty()) { + folly::dynamic sfs = folly::dynamic::object(); + for (const auto& entry : subfieldFiltersMap_) { + const auto& subfield = entry.first; + const auto& filter = entry.second; + sfs[subfield.toString()] = velox::ISerializable::serialize(*filter); + } + options["subfieldFilters"] = std::move(sfs); + } + + // Serialize remaining filters + if (remainingFilterExpr) { + options["remainingFilter"] = + velox::ISerializable::serialize(*remainingFilterExpr); + } + + tableHandle_ = connectorFactory_->makeTableHandle( + connectorId_, tableName_, columnHandles, options); } core::PlanNodePtr result = std::make_shared( id, outputType_, tableHandle_, assignments_); @@ -340,51 +343,69 @@ core::PlanNodePtr PlanBuilder::TableWriterBuilder::build(core::PlanNodeId id) { // upstream operator. auto outputType = outputType_ ? outputType_ : upstreamNode->outputType(); - // If insertHandle_ is not specified, build a HiveInsertTableHandle along with - // columnHandles, bucketProperty and locationHandle. + // If insertHandle_ is not specified, build a ConnectorInsertTableHandle along + // with columnHandles, bucketProperty and locationHandle. if (!insertHandle_) { // Create column handles. - std::vector> - columnHandles; + std::vector> columnHandles; for (auto i = 0; i < outputType->size(); ++i) { const auto column = outputType->nameOf(i); + folly::dynamic options = folly::dynamic::object(); + const bool isPartitionKey = std::find(partitionBy_.begin(), partitionBy_.end(), column) != partitionBy_.end(); - columnHandles.push_back( - std::make_shared( - column, - isPartitionKey - ? connector::hive::HiveColumnHandle::ColumnType::kPartitionKey - : connector::hive::HiveColumnHandle::ColumnType::kRegular, - outputType->childAt(i), - outputType->childAt(i))); + options["columnType"] = isPartitionKey ? "partitionkey" : "regular"; + auto columnHandle = connectorFactory_->makeColumnHandle( + connectorId_, column, outputType->childAt(i), options); + columnHandles.push_back(columnHandle); } - auto locationHandle = std::make_shared( - outputDirectoryPath_, - outputDirectoryPath_, - connector::hive::LocationHandle::TableType::kNew, - outputFileName_); + // 2) Create a connector-specific LocationHandle (e.g. HiveLocationHandle). + folly::dynamic locationHandleOptions = folly::dynamic::object(); + locationHandleOptions["targetPath"] = outputDirectoryPath_; + locationHandleOptions["writePath"] = outputDirectoryPath_; + locationHandleOptions["targetFileName"] = outputFileName_; + auto locationHandle = connectorFactory_->makeLocationHandle( + connectorId_, + connector::ConnectorLocationHandle::TableType::kNew, + locationHandleOptions); + + folly::dynamic insertTableOptions = folly::dynamic::object(); + insertTableOptions["storageFormat"] = static_cast(fileFormat_); + insertTableOptions["compressionKind"] = static_cast(compressionKind_); + if (!serdeParameters_.empty()) { + insertTableOptions["serdeParameters"] = + ISerializable::serialize(serdeParameters_); + } + if (options_) { + insertTableOptions["writerOptions"] = options_->serialize(); + } + insertTableOptions["ensureFiles"] = ensureFiles_; - std::shared_ptr bucketProperty; if (bucketCount_ != 0) { - bucketProperty = buildHiveBucketProperty( - outputType, bucketCount_, bucketedBy_, sortBy_); + folly::dynamic bucketProperty = folly::dynamic::object(); + std::vector bucketTypes; + bucketTypes.reserve(bucketedBy_.size()); + for (const auto& bucketColumn : bucketedBy_) { + bucketTypes.push_back( + outputType->childAt(outputType->getChildIdx(bucketColumn))); + } + bucketProperty["kind"] = 0l; + bucketProperty["bucketCount"] = bucketCount_; + bucketProperty["bucketedBy"] = ISerializable::serialize(bucketedBy_); + bucketProperty["bucketedTypes"] = ISerializable::serialize(bucketTypes); + bucketProperty["sortColumns"] = ISerializable::serialize(sortColumns_); + bucketProperty["sortOrders"] = ISerializable::serialize(sortOrders_); + + insertTableOptions["bucketProperty"] = std::move(bucketProperty); } - auto hiveHandle = std::make_shared( - columnHandles, - locationHandle, - fileFormat_, - bucketProperty, - compressionKind_, - serdeParameters_, - options_, - ensureFiles_); + auto connectorInsertTableHandle = connectorFactory_->makeInsertTableHandle( + connectorId_, columnHandles, locationHandle, insertTableOptions); - insertHandle_ = - std::make_shared(connectorId_, hiveHandle); + insertHandle_ = std::make_shared( + connectorId_, connectorInsertTableHandle); } std::optional columnStatsSpec; @@ -709,7 +730,8 @@ PlanBuilder& PlanBuilder::tableWrite( const std::vector& partitionBy, int32_t bucketCount, const std::vector& bucketedBy, - const std::vector>& sortBy, + const std::vector& sortColumns, + const std::vector& sortOrders, const dwio::common::FileFormat fileFormat, const std::vector& aggregates, const std::string_view& connectorId, @@ -727,7 +749,8 @@ PlanBuilder& PlanBuilder::tableWrite( .partitionBy(partitionBy) .bucketCount(bucketCount) .bucketedBy(bucketedBy) - .sortBy(sortBy) + .sortColumns(sortColumns) + .sortOrders(sortOrders) .fileFormat(fileFormat) .aggregates(aggregates) .connectorId(connectorId) @@ -1299,6 +1322,35 @@ core::PartitionFunctionSpecPtr createPartitionFunctionSpec( } } +core::PartitionFunctionSpecPtr createConnectorPartitionFunctionSpec( + int numBuckets, + const std::vector& channels, + const std::vector& constValues, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = + std::string(PlanBuilder::kHiveDefaultConnectorId)) { + folly::dynamic opts = folly::dynamic::object(); + opts["numBuckets"] = numBuckets; + opts["keyChannels"] = ISerializable::serialize(channels); + + if (!constValues.empty()) { + std::vector constExprs; + constExprs.reserve(constValues.size()); + for (const auto& v : constValues) { + VELOX_CHECK_NOT_NULL(v); + constExprs.emplace_back(v); // wrap VectorPtr as ConstantTypedExpr + } + opts["constants"] = velox::ISerializable::serialize(constExprs); + } + + // Ask the connector factory to build the spec. + auto connectorFactory = connector::getConnectorFactory(connectorName); + auto connectorPartitionSpec = + connectorFactory->makePartitionFunctionSpec(connectorId, opts); + + return connectorPartitionSpec; +} + RowTypePtr concat(const RowTypePtr& a, const RowTypePtr& b) { std::vector names = a->names(); std::vector types = a->children(); @@ -1464,15 +1516,16 @@ PlanBuilder& PlanBuilder::scaleWriterlocalPartition( for (const auto& key : keys) { keyIndices.push_back(planNode_->outputType()->getChildIdx(key)); } - auto hivePartitionFunctionFactory = - std::make_shared( - 1009, keyIndices, std::vector{}); + + auto connectorPartitionSpec = createConnectorPartitionFunctionSpec( + 1009, keyIndices, std::vector{}); planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, true, - hivePartitionFunctionFactory, + connectorPartitionSpec, std::vector{planNode_}); + VELOX_CHECK(!planNode_->supportsBarrier()); return *this; } @@ -1480,39 +1533,17 @@ PlanBuilder& PlanBuilder::scaleWriterlocalPartition( PlanBuilder& PlanBuilder::localPartition( int numBuckets, const std::vector& bucketChannels, - const std::vector& constValues) { - auto hivePartitionFunctionFactory = - std::make_shared( - numBuckets, bucketChannels, constValues); - planNode_ = std::make_shared( - nextPlanNodeId(), - core::LocalPartitionNode::Type::kRepartition, - /*scaleWriter=*/false, - std::move(hivePartitionFunctionFactory), - std::vector{planNode_}); - VELOX_CHECK(planNode_->supportsBarrier()); - return *this; -} + const std::vector& constValues, + const std::string& connectorName, + const std::string& connectorId) { + auto connectorPartitionSpec = createConnectorPartitionFunctionSpec( + numBuckets, bucketChannels, constValues, connectorName, connectorId); -PlanBuilder& PlanBuilder::localPartitionByBucket( - const std::shared_ptr& - bucketProperty) { - VELOX_CHECK_NOT_NULL(planNode_, "LocalPartition cannot be the source node"); - std::vector bucketChannels; - for (const auto& bucketColumn : bucketProperty->bucketedBy()) { - bucketChannels.push_back( - planNode_->outputType()->getChildIdx(bucketColumn)); - } - auto hivePartitionFunctionFactory = - std::make_shared( - bucketProperty->bucketCount(), - bucketChannels, - std::vector{}); planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, /*scaleWriter=*/false, - std::move(hivePartitionFunctionFactory), + std::move(connectorPartitionSpec), std::vector{planNode_}); VELOX_CHECK(planNode_->supportsBarrier()); return *this; diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index dbf27c6b86e..fd34680847b 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -20,7 +20,10 @@ #include #include #include -#include "velox/connectors/hive/HiveDataSink.h" +#include "velox/connectors/Connector.h" +#include "velox/connectors/ConnectorNames.h" +#include "velox/connectors/RegisterConnectorFactories.h" +#include "velox/dwio/common/Options.h" #include "velox/parse/ExpressionsParser.h" #include "velox/parse/IExpr.h" #include "velox/parse/PlanNodeIdGenerator.h" @@ -95,7 +98,9 @@ class PlanBuilder { explicit PlanBuilder( std::shared_ptr planNodeIdGenerator, memory::MemoryPool* pool = nullptr) - : planNodeIdGenerator_{std::move(planNodeIdGenerator)}, pool_{pool} {} + : planNodeIdGenerator_{std::move(planNodeIdGenerator)}, pool_{pool} { + connector::registerConnectorFactories(); + } /// Constructor with no required parameters suitable for creating /// straight-line (e.g. no joins) query plans. @@ -111,9 +116,13 @@ class PlanBuilder { memory::MemoryPool* pool = nullptr) : planNode_(std::move(initialPlanNode)), planNodeIdGenerator_{std::move(planNodeIdGenerator)}, - pool_{pool} {} + pool_{pool} { + connector::registerConnectorFactories(); + } - virtual ~PlanBuilder() = default; + virtual ~PlanBuilder() { + connector::unregisterConnectorFactories(); + } static constexpr const std::string_view kHiveDefaultConnectorId{"test-hive"}; static constexpr const std::string_view kTpchDefaultConnectorId{"test-tpch"}; @@ -146,7 +155,9 @@ class PlanBuilder { const std::vector& subfieldFilters = {}, const std::string& remainingFilter = "", const RowTypePtr& dataColumns = nullptr, - const connector::ColumnHandleMap& assignments = {}); + const connector::ColumnHandleMap& assignments = {}, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)); /// Add a TableScanNode to scan a Hive table. /// @@ -176,7 +187,9 @@ class PlanBuilder { const std::vector& subfieldFilters = {}, const std::string& remainingFilter = "", const RowTypePtr& dataColumns = nullptr, - const connector::ColumnHandleMap& assignments = {}); + const connector::ColumnHandleMap& assignments = {}, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)); /// Add a TableScanNode to scan a Hive table with direct SubfieldFilters. /// @@ -189,7 +202,9 @@ class PlanBuilder { const RowTypePtr& outputType, const PushdownConfig& pushdownConfig, const RowTypePtr& dataColumns = nullptr, - const connector::ColumnHandleMap& assignments = {}); + const connector::ColumnHandleMap& assignments = {}, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)); /// Add a TableScanNode to scan a TPC-H table. /// @@ -229,7 +244,15 @@ class PlanBuilder { /// builder arguments will be ignored. class TableScanBuilder { public: - TableScanBuilder(PlanBuilder& builder) : planBuilder_(builder) {} + TableScanBuilder( + PlanBuilder& builder, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)) + : planBuilder_(builder), + connectorName_(connectorName), + connectorId_(connectorId) { + connectorFactory_ = connector::getConnectorFactory(connectorName); + } /// @param tableName The name of the table to scan. TableScanBuilder& tableName(std::string tableName) { @@ -337,7 +360,9 @@ class PlanBuilder { PlanBuilder& planBuilder_; std::string tableName_{"hive_table"}; - std::string connectorId_{kHiveDefaultConnectorId}; + std::string connectorName_; + std::string connectorId_; + std::shared_ptr connectorFactory_; RowTypePtr outputType_; core::ExprPtr remainingFilter_; RowTypePtr dataColumns_; @@ -449,7 +474,15 @@ class PlanBuilder { /// Uses the Hive connector by default. class TableWriterBuilder { public: - explicit TableWriterBuilder(PlanBuilder& builder) : planBuilder_(builder) {} + explicit TableWriterBuilder( + PlanBuilder& builder, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)) + : planBuilder_(builder), + connectorName_(connectorName), + connectorId_(connectorId) { + connectorFactory_ = connector::getConnectorFactory(connectorName); + } /// @param outputType The schema that will be written to the output file. It /// may reference a subset or change the order of columns from the input @@ -513,11 +546,15 @@ class PlanBuilder { return *this; } - /// @param sortBy Specifies the sort by columns. - TableWriterBuilder& sortBy( - std::vector> - sortBy) { - sortBy_ = std::move(sortBy); + TableWriterBuilder& sortColumns( + const std::vector& sortColumns) { + sortColumns_ = sortColumns; + return *this; + } + + TableWriterBuilder& sortOrders( + const std::vector& sortOrders) { + sortOrders_ = sortOrders; return *this; } @@ -579,15 +616,17 @@ class PlanBuilder { RowTypePtr outputType_; std::string outputDirectoryPath_; std::string outputFileName_; - std::string connectorId_{kHiveDefaultConnectorId}; + std::string connectorName_; + std::string connectorId_; + std::shared_ptr connectorFactory_; std::shared_ptr insertHandle_; std::vector partitionBy_; int32_t bucketCount_{0}; std::vector bucketedBy_; std::vector aggregates_; - std::vector> - sortBy_; + std::vector sortColumns_; + std::vector sortOrders_; std::unordered_map serdeParameters_; std::shared_ptr options_; @@ -822,8 +861,8 @@ class PlanBuilder { const std::vector& partitionBy, int32_t bucketCount, const std::vector& bucketedBy, - const std::vector< - std::shared_ptr>& sortBy, + const std::vector& sortColumns, + const std::vector& sortOrders, const dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF, const std::vector& aggregates = {}, @@ -1182,18 +1221,14 @@ class PlanBuilder { /// current plan node). PlanBuilder& localPartition(const std::vector& keys); - /// A convenience method to add a LocalPartitionNode with hive partition + /// A convenience method to add a LocalPartitionNode with connector partition /// function. PlanBuilder& localPartition( int numBuckets, const std::vector& channels, - const std::vector& constValues); - - /// A convenience method to add a LocalPartitionNode with a single source (the - /// current plan node) and hive bucket property. - PlanBuilder& localPartitionByBucket( - const std::shared_ptr& - bucketProperty); + const std::vector& constValues, + const std::string& connectorName = connector::kHiveConnectorName, + const std::string& connectorId = std::string(kHiveDefaultConnectorId)); /// Add a LocalPartitionNode to partition the input using batch-level /// round-robin. Number of partitions is determined at runtime based on diff --git a/velox/exec/tests/utils/TableWriterTestBase.cpp b/velox/exec/tests/utils/TableWriterTestBase.cpp index 34304d94ab2..c65f9a3bef3 100644 --- a/velox/exec/tests/utils/TableWriterTestBase.cpp +++ b/velox/exec/tests/utils/TableWriterTestBase.cpp @@ -672,36 +672,36 @@ PlanNodePtr TableWriterTestBase::createInsertPlanForBucketTable( std::optional columnStatsSpec) { // Since we might do column rename, so generate bucket property based on // the data type from 'inputPlan'. - std::vector bucketColumns; - bucketColumns.reserve(bucketProperty->bucketedBy().size()); + std::vector bucketColumns(bucketProperty->bucketedBy().size()); + std::vector bucketChannels( + bucketProperty->bucketedBy().size()); for (int i = 0; i < bucketProperty->bucketedBy().size(); ++i) { - bucketColumns.push_back(inputRowType->names()[tableRowType->getChildIdx( - bucketProperty->bucketedBy()[i])]); + auto index = tableRowType->getChildIdx(bucketProperty->bucketedBy()[i]); + bucketColumns.push_back(inputRowType->names()[index]); + bucketChannels.push_back(index); } - auto localPartitionBucketProperty = std::make_shared( - bucketProperty->kind(), - bucketProperty->bucketCount(), - bucketColumns, - bucketProperty->bucketedTypes(), - bucketProperty->sortedBy()); - auto insertPlan = - inputPlan.localPartitionByBucket(localPartitionBucketProperty) - .addNode(addTableWriter( - inputRowType, - tableRowType->names(), - std::nullopt, - createInsertTableHandle( - tableRowType, - outputTableType, - outputDirectoryPath, - partitionedBy, - bucketProperty, - compressionKind), - false, - outputCommitStrategy)) - .capturePlanNodeId(tableWriteNodeId_) - .localPartition({}) - .tableWriteMerge(); + + auto insertPlan = inputPlan + .localPartition( + bucketProperty->bucketCount(), + bucketChannels, + std::vector{}) + .addNode(addTableWriter( + inputRowType, + tableRowType->names(), + std::nullopt, + createInsertTableHandle( + tableRowType, + outputTableType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind), + false, + outputCommitStrategy)) + .capturePlanNodeId(tableWriteNodeId_) + .localPartition({}) + .tableWriteMerge(); if (aggregateResult) { insertPlan.project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( diff --git a/velox/tool/trace/TableWriterReplayer.cpp b/velox/tool/trace/TableWriterReplayer.cpp index d17511f73bb..e1a09c50d0a 100644 --- a/velox/tool/trace/TableWriterReplayer.cpp +++ b/velox/tool/trace/TableWriterReplayer.cpp @@ -16,6 +16,7 @@ #include +#include "velox/connectors/hive/HiveDataSink.h" #include "velox/exec/TableWriter.h" #include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/PlanBuilder.h"