diff --git a/velox/connectors/Connector.cpp b/velox/connectors/Connector.cpp index e7c72af478b..cfc2430c80d 100644 --- a/velox/connectors/Connector.cpp +++ b/velox/connectors/Connector.cpp @@ -169,4 +169,8 @@ folly::dynamic ConnectorTableHandle::serialize() const { return serializeBase("ConnectorTableHandle"); } +// Allow the compiler to emit vtable and typeinfo for ConnectorLocationHandle at +// a single point instead of every translation unit. +ConnectorLocationHandle::~ConnectorLocationHandle() = default; + } // namespace facebook::velox::connector diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index d2826e15336..026ffd5dee8 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -165,6 +165,33 @@ class ConnectorInsertTableHandle : public ISerializable { } }; +class ConnectorLocationHandle : public ISerializable { + public: + enum class TableType { kNew, kExisting, kTemp }; + + ConnectorLocationHandle(const std::string& connectorId, TableType tableType) + : connectorId_{connectorId}, tableType_{tableType} {} + + virtual ~ConnectorLocationHandle(); + + const std::string& connectorId() const { + return connectorId_; + } + + /// New vs existing vs temp. + TableType tableType() const { + return tableType_; + } + + virtual std::string toString() const = 0; + + virtual folly::dynamic serialize() const = 0; + + private: + const std::string connectorId_; + const TableType tableType_; +}; + using ConnectorInsertTableHandlePtr = std::shared_ptr; diff --git a/velox/connectors/ConnectorNames.h b/velox/connectors/ConnectorNames.h new file mode 100644 index 00000000000..a15cc60b754 --- /dev/null +++ b/velox/connectors/ConnectorNames.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +namespace facebook::velox::connector { + +// TODO: Add a demo connector plugin + +constexpr const char* kFuzzerConnectorName = "fuzzer"; +constexpr const char* kHiveConnectorName = "hive"; +constexpr const char* kIcebergConnectorName = "iceberg"; +constexpr const char* kTpchConnectorName = "tpch"; + +} // namespace facebook::velox::connector diff --git a/velox/connectors/ConnectorObjectFactory.cpp b/velox/connectors/ConnectorObjectFactory.cpp new file mode 100644 index 00000000000..0149b5a5b80 --- /dev/null +++ b/velox/connectors/ConnectorObjectFactory.cpp @@ -0,0 +1,61 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/ConnectorObjectFactory.h" + +namespace facebook::velox::connector { + +ConnectorObjectFactory::~ConnectorObjectFactory() = default; + +std::unordered_map>& +connectorObjectFactories() { + static std:: + unordered_map> + factories; + return factories; +} + +bool registerConnectorObjectFactory( + std::shared_ptr factory) { + bool ok = + connectorObjectFactories().insert({factory->name(), factory}).second; + VELOX_CHECK( + ok, + "ConnectorObjectFactory with name '{}' is already registered", + factory->name()); + return true; +} + +bool hasConnectorObjectFactory(const std::string& connectorName) { + return connectorObjectFactories().count(connectorName) == 1; +} + +bool unregisterConnectorObjectFactory(const std::string& connectorName) { + auto count = connectorObjectFactories().erase(connectorName); + return count == 1; +} + +std::shared_ptr getConnectorObjectFactory( + const std::string& connectorName) { + auto it = connectorObjectFactories().find(connectorName); + VELOX_CHECK( + it != connectorObjectFactories().end(), + "ConnectorObjectFactory with name '{}' not registered", + connectorName); + return it->second; +} + +} // namespace facebook::velox::connector diff --git a/velox/connectors/ConnectorObjectFactory.h b/velox/connectors/ConnectorObjectFactory.h new file mode 100644 index 00000000000..d2384cdaa84 --- /dev/null +++ b/velox/connectors/ConnectorObjectFactory.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "velox/connectors/Connector.h" + +namespace facebook::velox::connector { + +class ConnectorObjectFactory { + public: + ConnectorObjectFactory(const std::string& name) : name_(name) {} + + virtual ~ConnectorObjectFactory(); + + const std::string& name() const { + return name_; + } + + virtual std::shared_ptr makeSplit( + const std::string& connectorId, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED("ConnectorSplit not supported by connector", connectorId); + } + + virtual std::shared_ptr makeColumnHandle( + const std::string& connectorId, + const std::string& name, + const TypePtr& type, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "connector::ColumnHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr makeTableHandle( + const std::string& connectorId, + const std::string& tableName, + std::vector columnHandles, + const folly::dynamic& options) const { + VELOX_UNSUPPORTED( + "ConnectorTableHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr makeInsertTableHandle( + const std::string& connectorId, + std::vector inputColumns, + std::shared_ptr locationHandle, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "ConnectorInsertTableHandle not supported by connector", connectorId); + } + + virtual std::shared_ptr makeLocationHandle( + const std::string& connectorId, + ConnectorLocationHandle::TableType tableType = + ConnectorLocationHandle::TableType::kNew, + const folly::dynamic& options = {}) const { + VELOX_UNSUPPORTED( + "ConnectorLocationHandle not supported by connector", connectorId); + } + + private: + const std::string name_; +}; + +bool registerConnectorObjectFactory( + std::shared_ptr factory); + +bool hasConnectorObjectFactory(const std::string& connectorName); + +bool unregisterConnectorObjectFactory(const std::string& connectorName); + +std::shared_ptr getConnectorObjectFactory( + const std::string& connectorName); + +} // namespace facebook::velox::connector diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index e7bbb90fe04..68a685c5d0f 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -27,6 +27,7 @@ velox_add_library( HiveConnectorSplit.cpp HiveDataSink.cpp HiveDataSource.cpp + HiveObjectFactory.cpp HivePartitionUtil.cpp PartitionIdGenerator.cpp SplitReader.cpp diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 3485c2330fa..bd68b431b91 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -17,6 +17,7 @@ #include #include + #include "velox/connectors/Connector.h" #include "velox/connectors/hive/FileProperties.h" #include "velox/connectors/hive/TableHandle.h" diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index b9cf1da0b9e..ec84374cf67 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -1205,7 +1205,7 @@ std::string LocationHandle::toString() const { "LocationHandle [targetPath: {}, writePath: {}, tableType: {}, tableFileName: {}]", targetPath_, writePath_, - tableTypeName(tableType_), + tableTypeName(tableType()), targetFileName_); } @@ -1219,7 +1219,7 @@ folly::dynamic LocationHandle::serialize() const { obj["name"] = "LocationHandle"; obj["targetPath"] = targetPath_; obj["writePath"] = writePath_; - obj["tableType"] = tableTypeName(tableType_); + obj["tableType"] = tableTypeName(tableType()); obj["targetFileName"] = targetFileName_; return obj; } diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 8c305b7595d..09b3820c14f 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -17,6 +17,7 @@ #include "velox/common/compression/Compression.h" #include "velox/connectors/Connector.h" +#include "velox/connectors/ConnectorNames.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/PartitionIdGenerator.h" #include "velox/connectors/hive/TableHandle.h" @@ -31,24 +32,18 @@ class LocationHandle; using LocationHandlePtr = std::shared_ptr; /// Location related properties of the Hive table to be written. -class LocationHandle : public ISerializable { +class LocationHandle : public ConnectorLocationHandle { public: - enum class TableType { - /// Write to a new table to be created. - kNew, - /// Write to an existing table. - kExisting, - }; - LocationHandle( std::string targetPath, std::string writePath, TableType tableType, - std::string targetFileName = "") - : targetPath_(std::move(targetPath)), + std::string targetFileName = "", + const std::string& connectorId = kHiveConnectorName) + : ConnectorLocationHandle(connectorId, tableType), + targetPath_(std::move(targetPath)), targetFileName_(std::move(targetFileName)), - writePath_(std::move(writePath)), - tableType_(tableType) {} + writePath_(std::move(writePath)) {} const std::string& targetPath() const { return targetPath_; @@ -62,16 +57,12 @@ class LocationHandle : public ISerializable { return writePath_; } - TableType tableType() const { - return tableType_; - } + std::string toString() const override; - std::string toString() const; + folly::dynamic serialize() const override; static void registerSerDe(); - folly::dynamic serialize() const override; - static LocationHandlePtr create(const folly::dynamic& obj); static const std::string tableTypeName(LocationHandle::TableType type); @@ -85,8 +76,6 @@ class LocationHandle : public ISerializable { const std::string targetFileName_; // Staging directory path. const std::string writePath_; - // Whether the table to be written is new, already existing or temporary. - const TableType tableType_; }; class HiveSortingColumn : public ISerializable { diff --git a/velox/connectors/hive/HiveObjectFactory.cpp b/velox/connectors/hive/HiveObjectFactory.cpp new file mode 100644 index 00000000000..450eda13c72 --- /dev/null +++ b/velox/connectors/hive/HiveObjectFactory.cpp @@ -0,0 +1,380 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/connectors/hive/HiveObjectFactory.h" + +#include + +#include + +#include "velox/connectors/ConnectorNames.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/HiveDataSink.h" + +namespace facebook::velox::connector::hive { + +using namespace velox::common; +using namespace facebook::velox::connector; + +std::shared_ptr HiveObjectFactory::makeSplit( + const std::string& connectorId, + const folly::dynamic& options) const { + VELOX_CHECK(!options.isNull()); + + // Required options. + VELOX_CHECK(options.count("filePath")); + VELOX_CHECK(options.count("start")); + VELOX_CHECK(options.count("length")); + std::string filePath = options["filePath"].asString(); + uint64_t start = options["start"].asInt(); + uint64_t length = options["length"].asInt(); + + auto builder = HiveConnectorSplitBuilder(filePath) + .start(start) + .length(length) + .connectorId(connectorId); + + // Optional options. + dwio::common::FileFormat fileFormat = options.count("fileFormat") + ? static_cast(options["fileFormat"].asInt()) + : defaultFileFormat_; + builder.fileFormat(fileFormat); + + int64_t splitWeight = + options.count("splitWeight") ? options["splitWeight"].asInt() : 0; + builder.splitWeight(splitWeight); + + bool cacheable = + options.count("cacheable") ? options["cacheable"].asBool() : true; + builder.cacheable(cacheable); + + if (options.count("infoColumns")) { + for (auto& kv : options["infoColumns"].items()) { + builder.infoColumn(kv.first.asString(), kv.second.asString()); + } + } + + if (options.count("partitionKeys")) { + for (auto& kv : options["partitionKeys"].items()) { + builder.partitionKey( + kv.first.asString(), + kv.second.isNull() + ? std::nullopt + : std::optional(kv.second.asString())); + } + } + + if (options.count("tableBucketNumber")) { + builder.tableBucketNumber(options["tableBucketNumber"].asInt()); + } + + if (options.count("bucketConversion")) { + HiveBucketConversion bucketConversion; + const auto& bucketConversionOption = options["bucketConversion"]; + bucketConversion.tableBucketCount = + bucketConversionOption["tableBucketCount"].asInt(); + bucketConversion.partitionBucketCount = + bucketConversionOption["partitionBucketCount"].asInt(); + for (auto& bucketColumnHandlesOption : + bucketConversionOption["bucketColumnHandles"]) { + bucketConversion.bucketColumnHandles.push_back( + std::const_pointer_cast( + ISerializable::deserialize( + bucketColumnHandlesOption))); + } + builder.bucketConversion(bucketConversion); + } + + if (options.count("customSplitInfo")) { + std::unordered_map info; + for (auto& kv : options["customSplitInfo"].items()) { + info[kv.first.asString()] = kv.second.asString(); + } + builder.customSplitInfo(info); + } + + if (options.count("extraFileInfo")) { + auto extra = options["extraFileInfo"].isNull() + ? std::shared_ptr() + : std::make_shared(options["extraFileInfo"].asString()); + builder.extraFileInfo(extra); + } + + if (options.count("serdeParameters")) { + std::unordered_map serde; + for (auto& kv : options["serdeParameters"].items()) { + serde[kv.first.asString()] = kv.second.asString(); + } + builder.serdeParameters(serde); + } + + if (options.count("fileProperties")) { + FileProperties props; + const auto& propertiesOption = options["fileProperties"]; + if (propertiesOption.count("fileSize") && + !propertiesOption["fileSize"].isNull()) { + props.fileSize = propertiesOption["fileSize"].asInt(); + } + if (propertiesOption.count("modificationTime") && + !propertiesOption["modificationTime"].isNull()) { + props.modificationTime = propertiesOption["modificationTime"].asInt(); + } + builder.fileProperties(props); + } + + if (options.count("rowIdProperties")) { + RowIdProperties rowIdProperties; + const auto& rowIdPropertiesOption = options["rowIdProperties"]; + rowIdProperties.metadataVersion = + rowIdPropertiesOption["metadataVersion"].asInt(); + rowIdProperties.partitionId = rowIdPropertiesOption["partitionId"].asInt(); + rowIdProperties.tableGuid = rowIdPropertiesOption["tableGuid"].asString(); + builder.rowIdProperties(rowIdProperties); + } + + return builder.build(); +} + +std::shared_ptr HiveObjectFactory::makeColumnHandle( + const std::string& connectorId, + const std::string& name, + const TypePtr& dataType, + const folly::dynamic& options) const { + using HiveColumnType = hive::HiveColumnHandle::ColumnType; + HiveColumnType hiveColumnType; + + if (options.isNull()) { + return std::make_shared( + name, hiveColumnType, dataType, dataType); + } + + // columnType would be serialized as int32_t + int32_t columnType = + options + .getDefault("columnType", static_cast(HiveColumnType::kRegular)) + .asInt(); + + switch (columnType) { + case static_cast(HiveColumnType::kRegular): + hiveColumnType = HiveColumnType::kRegular; + break; + case static_cast(HiveColumnType::kPartitionKey): + hiveColumnType = HiveColumnType::kPartitionKey; + break; + case static_cast(HiveColumnType::kSynthesized): + hiveColumnType = HiveColumnType::kSynthesized; + break; + case static_cast(HiveColumnType::kRowIndex): + hiveColumnType = HiveColumnType::kRowIndex; + break; + case static_cast(HiveColumnType::kRowId): + hiveColumnType = HiveColumnType::kRowId; + break; + + default: + VELOX_UNSUPPORTED("Unsupported ColumnType ", columnType); + } + + TypePtr hiveType = ISerializable::deserialize( + options.getDefault("hiveType", dataType->serialize()), nullptr); + + // subfields would be serialized as a vector of strings; + std::vector subfields; + if (auto rs = options.get_ptr("requiredSubfields")) { + subfields.reserve(rs->size()); + for (auto& v : *rs) { + subfields.emplace_back(v.asString()); + } + } + + HiveColumnHandle::ColumnParseParameters parseParams{ + HiveColumnHandle::ColumnParseParameters::kISO8601}; + if (auto cp = options.get_ptr("columnParseParameters")) { + auto formatName = + cp->getDefault("partitionDateValueFormat", "ISO8601").asString(); + if (formatName == "DaysSinceEpoch" || formatName == "kDaysSinceEpoch") { + parseParams.partitionDateValueFormat = + HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch; + } else { + parseParams.partitionDateValueFormat = + HiveColumnHandle::ColumnParseParameters::kISO8601; + } + } + + return std::make_shared( + name, + hiveColumnType, + dataType, + hiveType, + std::move(subfields), + parseParams); +} + +std::shared_ptr HiveObjectFactory::makeTableHandle( + const std::string& connectorId, + const std::string& tableName, + std::vector columnHandles, + const folly::dynamic& options) const { + bool filterPushdownEnabled = + options.getDefault("filterPushdownEnabled", true).asBool(); + + common::SubfieldFilters subfieldFilters; + if (auto sf = options.get_ptr("subfieldFilters")) { + subfieldFilters.reserve(sf->size()); + + for (auto& kv : sf->items()) { + // 1) Parse the key string into a Subfield + // (uses Subfield(const std::string&) and default separators) + Subfield subfield(kv.first.asString()); + + // 2) Deserialize the Filter from its dynamic form. + // Assumes every Filter subclass registered a SerDe entry in + // Filter::registerSerDe(). + auto filter = ISerializable::deserialize( + kv.second, /* context = */ nullptr); + + subfieldFilters.emplace( + std::move(subfield), std::const_pointer_cast(filter)); + } + } + + core::TypedExprPtr remainingFilter = nullptr; + if (auto rf = options.get_ptr("remainingFilter")) { + // assuming rf["expr"] holds the serialized expression + remainingFilter = ISerializable::deserialize(*rf); + } + + std::unordered_map tableParameters; + if (auto tp = options.get_ptr("tableParameters")) { + for (auto& kv : tp->items()) { + tableParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + // build RowTypePtr from columnHandles + std::vector names; + std::vector types; + names.reserve(columnHandles.size()); + types.reserve(columnHandles.size()); + for (auto& col : columnHandles) { + auto hiveCol = std::static_pointer_cast(col); + names.push_back(hiveCol->name()); + types.push_back(hiveCol->dataType()); + } + auto dataColumns = ROW(std::move(names), std::move(types)); + + return std::make_shared( + connectorId, + tableName, + filterPushdownEnabled, + std::move(subfieldFilters), + std::move(remainingFilter), + dataColumns, + std::move(tableParameters)); +} + +std::shared_ptr +HiveObjectFactory::makeInsertTableHandle( + const std::string& connectorId, + std::vector inputColumns, + std::shared_ptr locationHandle, + const folly::dynamic& options) const { + // Convert inputColumns + std::vector> inputHiveColumns; + inputHiveColumns.reserve(inputColumns.size()); + for (const auto& handle : inputColumns) { + inputHiveColumns.push_back( + std::static_pointer_cast(handle)); + } + + auto hiveLoc = + std::dynamic_pointer_cast(locationHandle); + VELOX_CHECK( + hiveLoc, + "HiveObjectFactory::makeInsertTableHandle: " + "expected HiveLocationHandle"); + + auto fmt = + options + .getDefault( + "storageFormat", static_cast(dwio::common::FileFormat::DWRF)) + .asInt(); + auto storageFormat = static_cast(fmt); + + std::shared_ptr bucketProperty = nullptr; + if (auto bp = options.get_ptr("bucketProperty")) { + bucketProperty = HiveBucketProperty::deserialize(*bp, nullptr); + } + + std::optional compressionKind; + if (auto ck = options.get_ptr("compressionKind")) { + compressionKind = static_cast(ck->asInt()); + } + + std::unordered_map serdeParameters; + if (auto sp = options.get_ptr("serdeParameters")) { + for (auto& kv : sp->items()) { + serdeParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + std::shared_ptr writerOptions = nullptr; + if (auto wo = options.get_ptr("writerOptions")) { + writerOptions = dwio::common::WriterOptions::deserialize(*wo); + } + + bool ensureFiles = options.getDefault("ensureFiles", false).asBool(); + + auto fileNameGen = HiveInsertFileNameGenerator::deserialize( + *options.get_ptr("fileNameGenerator"), nullptr); + + return std::make_shared( + std::move(inputHiveColumns), + hiveLoc, + storageFormat, + std::move(bucketProperty), + compressionKind, + std::move(serdeParameters), + std::move(writerOptions), + ensureFiles, + std::move(fileNameGen)); +} + +std::shared_ptr HiveObjectFactory::makeLocationHandle( + const std::string& connectorId, + LocationHandle::TableType tableType, + const folly::dynamic& options) const { + VELOX_CHECK(options.isObject(), "Expected options to be a dynamic object"); + + // Required fields + auto targetPath = options.at("targetPath").asString(); + auto writePath = options.at("writePath").asString(); + + // Optional field: targetFileName + std::string targetFileName; + if (options.count("targetFileName") && + !options.at("targetFileName").isNull()) { + targetFileName = options.at("targetFileName").asString(); + } + + return std::make_shared( + std::move(targetPath), + std::move(writePath), + tableType, + std::move(targetFileName), + connectorId); +} + +} // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveObjectFactory.h b/velox/connectors/hive/HiveObjectFactory.h new file mode 100644 index 00000000000..e0290b8a91c --- /dev/null +++ b/velox/connectors/hive/HiveObjectFactory.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/ConnectorObjectFactory.h" +#include "velox/dwio/common/Options.h" + +namespace facebook::velox::connector::hive { + +class HiveObjectFactory : public connector::ConnectorObjectFactory { + public: + HiveObjectFactory(std::shared_ptr pool) : pool_(pool) {} + + ~HiveObjectFactory() override = default; + + std::shared_ptr makeSplit( + const std::string& connectorId, + const folly::dynamic& options = {}) const override; + + std::shared_ptr makeColumnHandle( + const std::string& connectorId, + const std::string& name, + const TypePtr& type, + const folly::dynamic& options = {}) const override; + + std::shared_ptr makeTableHandle( + const std::string& connectorId, + const std::string& tableName, + std::vector columnHandles, + const folly::dynamic& options) const override; + + std::shared_ptr makeInsertTableHandle( + const std::string& connectorId, + std::vector inputColumns, + std::shared_ptr locationHandle, + const folly::dynamic& options = {}) const override; + + std::shared_ptr makeLocationHandle( + const std::string& connectorId, + connector::ConnectorLocationHandle::TableType tableType = + connector::ConnectorLocationHandle::TableType::kNew, + const folly::dynamic& options = {}) const override; + + private: + std::shared_ptr pool_; + dwio::common::FileFormat defaultFileFormat_{ + dwio::common::FileFormat::PARQUET}; +}; + +} // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index 0a24b6b10c4..a6fbb5e2266 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -19,6 +19,7 @@ add_executable( HiveConnectorTest.cpp HiveConnectorUtilTest.cpp HiveConnectorSerDeTest.cpp + HiveObjectFactoryTest.cpp HivePartitionFunctionTest.cpp HivePartitionUtilTest.cpp HiveSplitTest.cpp diff --git a/velox/connectors/hive/tests/HiveObjectFactoryTest.cpp b/velox/connectors/hive/tests/HiveObjectFactoryTest.cpp new file mode 100644 index 00000000000..cb335cdc842 --- /dev/null +++ b/velox/connectors/hive/tests/HiveObjectFactoryTest.cpp @@ -0,0 +1,226 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/memory/MemoryAllocator.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/HiveObjectFactory.h" +#include "velox/exec/tests/utils/OperatorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" + +namespace facebook::velox::connector::hive::test { + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive; +using facebook::velox::connector::ConnectorLocationHandle; + +static constexpr char kConnectorId[] = "hive-test"; + +class HiveObjectFactoryTest : public exec::test::OperatorTestBase { + public: + static void SetUpTestCase() { + OperatorTestBase::SetUpTestCase(); + + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveConnector::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + Type::registerSerDe(); + } + + void SetUp() override { + factory_ = std::make_unique(pool_); + } + + void TearDown() override { + factory_.reset(); + } + + std::shared_ptr makeHiveColumnHandle( + const std::string& name, + HiveColumnHandle::ColumnType columnType, + const TypePtr& dataType, + const TypePtr& hiveType, + std::vector requiredSubfields) { + folly::dynamic opts = folly::dynamic::object; + + // columnType would be serialized as int32_t + opts["columnType"] = (int32_t)columnType; + opts["hiveType"] = hiveType->serialize(); + + opts["requiredSubfields"] = folly::dynamic::array(); + for (const auto& subfield : requiredSubfields) { + opts["requiredSubfields"].push_back(subfield); + } + + auto colHandle = + factory_->makeColumnHandle(kConnectorId, "colX", dataType, opts); + + return colHandle; + } + + protected: + std::unique_ptr factory_; +}; + +TEST_F(HiveObjectFactoryTest, MakeConnectorSplitWithOptions) { + folly::dynamic opts = folly::dynamic::object; + + // Basic configuration + opts["filePath"] = "/tmp/f.p"; + opts["start"] = 10; + opts["length"] = true; + opts["fileFormat"] = static_cast(dwio::common::FileFormat::PARQUET); + opts["splitWeight"] = 42; + opts["cacheable"] = true; + + // Info columns map: column name -> info string + opts["infoColumns"] = folly::dynamic::object; + opts["infoColumns"]["colA"] = "infoA"; + opts["infoColumns"]["colB"] = "infoB"; + + // Partition keys map: key -> value + opts["partitionKeys"] = folly::dynamic::object; + opts["partitionKeys"]["p1"] = "v1"; + + auto splitPtr = factory_->makeSplit(kConnectorId, opts); + auto* split = dynamic_cast(splitPtr.get()); + ASSERT_NE(split, nullptr); + + EXPECT_EQ(split->fileFormat, dwio::common::FileFormat::PARQUET); + EXPECT_EQ(split->splitWeight, 42); + EXPECT_TRUE(split->cacheable); + + // infoColumns + auto info = split->infoColumns; + EXPECT_EQ(info.at("colA"), "infoA"); + EXPECT_EQ(info.at("colB"), "infoB"); + + // partitionKeys + auto parts = split->partitionKeys; + ASSERT_EQ(parts.size(), 1); + EXPECT_EQ(parts.count("p1"), 1); + EXPECT_EQ(parts["p1"], "v1"); +} + +TEST_F(HiveObjectFactoryTest, MakeTableHandleWithOptions) { + // Build a RowType for data columns: two ints + auto rowType = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); + + // ColumnHandles + auto c0 = makeHiveColumnHandle( + "c0", + HiveColumnHandle::ColumnType::kRegular, + INTEGER(), + INTEGER(), + {"c0"}); + auto c1 = makeHiveColumnHandle( + "c1", HiveColumnHandle::ColumnType::kRegular, BIGINT(), BIGINT(), {"c1"}); + + // Options: disable filter pushdown, add subfield filter & remaining filter & + // tableParameters + folly::dynamic opts = folly::dynamic::object; + + // Basic configuration + opts["filterPushdownEnabled"] = false; + + // common::SubfieldFilters : std::unordered_map; + auto filter = std::make_unique(-100, 100, false); + opts["subfieldFilters"] = folly::dynamic::object; + opts["subfieldFilters"]["c0"] = filter->serialize(); + + // core::TypedExprPtr remainingFilter : std::shared_ptr; + auto remainingFilter = parseExpr("c1 + 1 > 1", rowType); + opts["remainingFilter"] = remainingFilter->serialize(); + + // Arbitrary table parameters + opts["tableParameters"] = folly::dynamic::object; + opts["tableParameters"]["pA"] = "vA"; + + auto handlePtr = + factory_->makeTableHandle(kConnectorId, "tbl", {c0, c1}, opts); + auto* hiveHandle = dynamic_cast(handlePtr.get()); + ASSERT_NE(hiveHandle, nullptr); + + EXPECT_FALSE(hiveHandle->isFilterPushdownEnabled()); + + const auto& subfieldFilters = hiveHandle->subfieldFilters(); + ASSERT_EQ(subfieldFilters.count(common::Subfield("c0")), 1); + subfieldFilters.at(common::Subfield("c0"))->testingEquals(*filter); + + auto remainingFilterCreated = hiveHandle->remainingFilter(); + ASSERT_NE(remainingFilterCreated, nullptr); + ASSERT_EQ(*remainingFilterCreated, *remainingFilter); + + auto params = hiveHandle->tableParameters(); + ASSERT_EQ(params.size(), 1); + EXPECT_EQ(params.at("pA"), "vA"); +} + +TEST_F(HiveObjectFactoryTest, MakeColumnHandle) { + auto colHandle = makeHiveColumnHandle( + "colX", + HiveColumnHandle::ColumnType::kRegular, + BIGINT(), + BIGINT(), + std::vector({"f1", "f2"})); + + auto* hiveColumnHandle = dynamic_cast(colHandle.get()); + ASSERT_NE(hiveColumnHandle, nullptr); + + EXPECT_EQ(hiveColumnHandle->name(), "colX"); + EXPECT_EQ( + hiveColumnHandle->columnType(), HiveColumnHandle::ColumnType::kRegular); + EXPECT_EQ(hiveColumnHandle->dataType(), BIGINT()); + EXPECT_EQ(hiveColumnHandle->hiveType(), BIGINT()); + + std::vector requiredSubfields; + requiredSubfields.emplace_back("f1"); + requiredSubfields.emplace_back("f2"); + EXPECT_EQ(hiveColumnHandle->requiredSubfields(), requiredSubfields); +} + +TEST_F(HiveObjectFactoryTest, MakeLocationHandle) { + folly::dynamic opts = folly::dynamic::object; + + // Basic configuration + opts["targetPath"] = "/tmp/out1"; + opts["writePath"] = "/tmp/out2"; + opts["targetFileName"] = "test.parquet"; + + // Default: writeDirectory == targetDirectory + auto locationHandle1 = factory_->makeLocationHandle( + kConnectorId, ConnectorLocationHandle::TableType::kNew, opts); + + auto* hiveLocationHandle = + dynamic_cast(locationHandle1.get()); + ASSERT_NE(hiveLocationHandle, nullptr); + + EXPECT_EQ(hiveLocationHandle->targetPath(), "/tmp/out1"); + EXPECT_EQ(hiveLocationHandle->writePath(), "/tmp/out2"); + EXPECT_EQ(hiveLocationHandle->targetFileName(), "test.parquet"); + EXPECT_EQ( + locationHandle1->tableType(), ConnectorLocationHandle::TableType::kNew); +} + +} // namespace facebook::velox::connector::hive::test diff --git a/velox/dwio/common/Options.cpp b/velox/dwio/common/Options.cpp index 33946f71159..10abb94e9fe 100644 --- a/velox/dwio/common/Options.cpp +++ b/velox/dwio/common/Options.cpp @@ -15,6 +15,7 @@ */ #include "velox/dwio/common/Options.h" +#include "velox/common/compression/Compression.h" namespace facebook::velox::dwio::common { @@ -77,4 +78,88 @@ ColumnReaderOptions makeColumnReaderOptions(const ReaderOptions& options) { return columnReaderOptions; } +folly::dynamic WriterOptions::serialize() const { + folly::dynamic obj = folly::dynamic::object; + + // 1) Schema + if (schema) { + obj["schema"] = schema->serialize(); + } + + // 2) compressionKind + if (compressionKind) { + obj["compressionKind"] = static_cast(*compressionKind); + } + + // 3) serdeParameters + if (!serdeParameters.empty()) { + folly::dynamic mapObj = folly::dynamic::object; + for (auto& [k, v] : serdeParameters) { + mapObj[k] = v; + } + obj["serdeParameters"] = std::move(mapObj); + } + + // 4) sessionTimezoneName + if (!sessionTimezoneName.empty()) { + obj["sessionTimezoneName"] = sessionTimezoneName; + } + + // 5) adjustTimestampToTimezone + obj["adjustTimestampToTimezone"] = adjustTimestampToTimezone; + + // (We do *not* serialize pool, spillConfig, nonReclaimableSection, + // or the factory functions—they must be re‐injected by the host.) + + return obj; +} + +std::shared_ptr WriterOptions::deserialize( + const folly::dynamic& obj) { + auto opts = std::make_shared(); + + // 1) schema + if (auto p = obj.get_ptr("schema")) { + opts->schema = ISerializable::deserialize(*p, nullptr); + } + + // 2) compressionKind + if (auto p = obj.get_ptr("compressionKind")) { + opts->compressionKind = + static_cast(p->asInt()); + } + + // 3) serdeParameters + if (auto p = obj.get_ptr("serdeParameters")) { + opts->serdeParameters.clear(); + for (auto& kv : p->items()) { + opts->serdeParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + // 4) sessionTimezoneName + if (auto p = obj.get_ptr("sessionTimezoneName")) { + opts->sessionTimezoneName = p->asString(); + } + + // 5) adjustTimestampToTimezone + if (auto p = obj.get_ptr("adjustTimestampToTimezone")) { + opts->adjustTimestampToTimezone = p->asBool(); + } + + // pool, spillConfig, nonReclaimableSection, factories remain at default + return opts; +} + +void WriterOptions::registerSerDe() { + auto& registry = DeserializationRegistryForSharedPtr(); + registry.Register("WriterOptions", WriterOptions::deserialize); +} + +// force registration at load‐time +static bool _writerOptionsSerdeRegistered = []() { + WriterOptions::registerSerDe(); + return true; +}(); + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 851f97bee9c..5f173f440c8 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -693,7 +693,7 @@ class ReaderOptions : public io::ReaderOptions { bool allowEmptyFile_{false}; }; -struct WriterOptions { +struct WriterOptions : public ISerializable { TypePtr schema{nullptr}; velox::memory::MemoryPool* memoryPool{nullptr}; const velox::common::SpillConfig* spillConfig{nullptr}; @@ -722,6 +722,10 @@ struct WriterOptions { const config::ConfigBase& session) {} virtual ~WriterOptions() = default; + + folly::dynamic serialize() const override; + static std::shared_ptr deserialize(const folly::dynamic& obj); + static void registerSerDe(); }; // Options for creating a column reader.