From 05022875cba7b59792d201e69ecb5e4d59d83731 Mon Sep 17 00:00:00 2001 From: Ping Liu Date: Tue, 11 Nov 2025 09:53:13 +0000 Subject: [PATCH] Add IcebergConnector --- velox/connectors/hive/HiveConnector.cpp | 11 --- velox/connectors/hive/HiveConnector.h | 2 +- velox/connectors/hive/iceberg/CMakeLists.txt | 1 + .../hive/iceberg/IcebergConnector.cpp | 72 +++++++++++++++ .../hive/iceberg/IcebergConnector.h | 88 +++++++++++++++++++ .../hive/iceberg/IcebergDataSink.cpp | 41 ++++----- .../connectors/hive/iceberg/IcebergDataSink.h | 12 +-- .../hive/iceberg/tests/CMakeLists.txt | 1 + .../iceberg/tests/IcebergConnectorTest.cpp | 70 +++++++++++++++ .../hive/iceberg/tests/IcebergInsertTest.cpp | 8 +- .../hive/iceberg/tests/IcebergTestBase.cpp | 21 +++-- .../hive/iceberg/tests/IcebergTestBase.h | 3 +- .../hive/iceberg/tests/PartitionNameTest.cpp | 4 +- .../hive/iceberg/tests/TransformTest.cpp | 4 +- 14 files changed, 281 insertions(+), 57 deletions(-) create mode 100644 velox/connectors/hive/iceberg/IcebergConnector.cpp create mode 100644 velox/connectors/hive/iceberg/IcebergConnector.h create mode 100644 velox/connectors/hive/iceberg/tests/IcebergConnectorTest.cpp diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 950a84d0011..e04828e83aa 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -20,7 +20,6 @@ #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/HiveDataSource.h" #include "velox/connectors/hive/HivePartitionFunction.h" -#include "velox/connectors/hive/iceberg/IcebergDataSink.h" #include #include @@ -74,16 +73,6 @@ std::unique_ptr HiveConnector::createDataSink( ConnectorInsertTableHandlePtr connectorInsertTableHandle, ConnectorQueryCtx* connectorQueryCtx, CommitStrategy commitStrategy) { - if (auto icebergInsertHandle = - std::dynamic_pointer_cast( - connectorInsertTableHandle)) { - return std::make_unique( - inputType, - icebergInsertHandle, - connectorQueryCtx, - commitStrategy, - hiveConfig_); - } auto hiveInsertHandle = std::dynamic_pointer_cast( connectorInsertTableHandle); diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index 7a4330b3eeb..845461ea412 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -89,7 +89,7 @@ class HiveConnectorFactory : public ConnectorFactory { const std::string& id, std::shared_ptr config, folly::Executor* ioExecutor = nullptr, - folly::Executor* cpuExecutor = nullptr) override { + [[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override { return std::make_shared(id, config, ioExecutor); } }; diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index 218eb593161..079bcba3081 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -14,6 +14,7 @@ velox_add_library( velox_hive_iceberg_splitreader + IcebergConnector.cpp IcebergDataSink.cpp IcebergPartitionName.cpp IcebergSplit.cpp diff --git a/velox/connectors/hive/iceberg/IcebergConnector.cpp b/velox/connectors/hive/iceberg/IcebergConnector.cpp new file mode 100644 index 00000000000..14b88688cb6 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergConnector.cpp @@ -0,0 +1,72 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergConnector.h" + +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/iceberg/IcebergDataSink.h" + +namespace facebook::velox::connector::hive::iceberg { + +const std::string_view kIcebergFunctionPrefixConfig{"presto.iceberg-namespace"}; +const std::string_view kDefaultIcebergFunctionPrefix{"$internal$.iceberg."}; + +namespace { + +// Registers Iceberg partition transform functions with prefix. +// NOTE: These functions are registered for internal transform usage only. +// Upstream engines such as Prestissimo and Gluten should register the same +// functions with different prefixes to avoid conflicts. +void registerIcebergInternalFunctions(const std::string& prefix) { + static std::once_flag registerFlag; + + std::call_once(registerFlag, [prefix]() { + functions::iceberg::registerFunctions(prefix); + }); +} + +} // namespace + +IcebergConnector::IcebergConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* ioExecutor) + : HiveConnector(id, config, ioExecutor), + functionPrefix_(config->get( + std::string(kIcebergFunctionPrefixConfig), + std::string(kDefaultIcebergFunctionPrefix))) { + registerIcebergInternalFunctions(functionPrefix_); +} + +std::unique_ptr IcebergConnector::createDataSink( + RowTypePtr inputType, + ConnectorInsertTableHandlePtr connectorInsertTableHandle, + ConnectorQueryCtx* connectorQueryCtx, + CommitStrategy commitStrategy) { + auto icebergInsertHandle = + checked_pointer_cast( + connectorInsertTableHandle); + + return std::make_unique( + inputType, + icebergInsertHandle, + connectorQueryCtx, + commitStrategy, + hiveConfig_, + functionPrefix_); +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergConnector.h b/velox/connectors/hive/iceberg/IcebergConnector.h new file mode 100644 index 00000000000..ffe0397fa64 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergConnector.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/hive/HiveConnector.h" + +namespace facebook::velox::connector::hive::iceberg { + +/// TODO Add IcebergConfig class and Move these configuration properties to +/// IcebergConfig.h +extern const std::string_view kIcebergFunctionPrefixConfig; +extern const std::string_view kDefaultIcebergFunctionPrefix; + +/// Provides Iceberg table format support. +/// - Creates HiveDataSource instances that use IcebergSplitReader for reading +/// Iceberg tables with support for delete files and schema evolution. +/// - Creates IcebergDataSink instances for writing data with Iceberg-specific +/// partition transforms and commit metadata. +class IcebergConnector final : public HiveConnector { + public: + IcebergConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* ioExecutor); + + /// Creates IcebergDataSink for writing to Iceberg tables. + /// + /// @param inputType The schema of the input data to write. + /// @param connectorInsertTableHandle Must be an IcebergInsertTableHandle + /// containing Iceberg-specific write configuration. + /// @param connectorQueryCtx Query context for the write operation. + /// @param commitStrategy Strategy for committing the write operation. Only + /// CommitStrategy::kNoCommit is supported for Iceberg tables. Files + /// are written directly with their final names and commit metadata is + /// returned for the coordinator to update the Iceberg metadata tables. + /// @return IcebergDataSink instance configured for the write operation. + std::unique_ptr createDataSink( + RowTypePtr inputType, + ConnectorInsertTableHandlePtr connectorInsertTableHandle, + ConnectorQueryCtx* connectorQueryCtx, + CommitStrategy commitStrategy) override; + + private: + const std::string functionPrefix_; +}; + +class IcebergConnectorFactory final : public ConnectorFactory { + public: + static constexpr const char* kIcebergConnectorName = "iceberg"; + + IcebergConnectorFactory() : ConnectorFactory(kIcebergConnectorName) {} + + /// Creates a new IcebergConnector instance. + /// + /// @param id Unique identifier for this connector instance (typically the + /// catalog name). + /// @param config Connector configuration properties + /// @param ioExecutor Optional executor for asynchronous I/O operations such + /// as split preloading and file prefetching. When provided, enables + /// background file operations off the main driver thread. If nullptr, I/O + /// operations run synchronously. + /// @param cpuExecutor ConnectorFactory interface to support other connector + /// types that may need CPU-bound async work. Currently unused by + /// IcebergConnector. + /// @return Shared pointer to the newly created IcebergConnector instance + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* ioExecutor = nullptr, + [[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override { + return std::make_shared(id, config, ioExecutor); + } +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.cpp b/velox/connectors/hive/iceberg/IcebergDataSink.cpp index 59ccbd777bf..28c1a4f1318 100644 --- a/velox/connectors/hive/iceberg/IcebergDataSink.cpp +++ b/velox/connectors/hive/iceberg/IcebergDataSink.cpp @@ -22,17 +22,6 @@ namespace facebook::velox::connector::hive::iceberg { -static constexpr std::string_view kDefaultIcebergFunctionPrefix{ - "$internal$.iceberg."}; - -void registerIcebergInternalFunctions(const std::string_view& prefix) { - static std::once_flag registerFlag; - - std::call_once(registerFlag, [prefix]() { - functions::iceberg::registerFunctions(std::string(prefix)); - }); -} - IcebergInsertTableHandle::IcebergInsertTableHandle( std::vector inputColumns, LocationHandlePtr locationHandle, @@ -148,7 +137,8 @@ IcebergDataSink::IcebergDataSink( IcebergInsertTableHandlePtr insertTableHandle, const ConnectorQueryCtx* connectorQueryCtx, CommitStrategy commitStrategy, - const std::shared_ptr& hiveConfig) + const std::shared_ptr& hiveConfig, + const std::string& functionPrefix) : IcebergDataSink( std::move(inputType), insertTableHandle, @@ -158,7 +148,8 @@ IcebergDataSink::IcebergDataSink( createPartitionChannels( insertTableHandle->inputColumns(), insertTableHandle->partitionSpec()), - createPartitionRowType(insertTableHandle->partitionSpec())) {} + createPartitionRowType(insertTableHandle->partitionSpec()), + functionPrefix) {} IcebergDataSink::IcebergDataSink( RowTypePtr inputType, @@ -167,7 +158,8 @@ IcebergDataSink::IcebergDataSink( CommitStrategy commitStrategy, const std::shared_ptr& hiveConfig, const std::vector& partitionChannels, - RowTypePtr partitionRowType) + RowTypePtr partitionRowType, + const std::string& functionPrefix) : HiveDataSink( inputType, insertTableHandle, @@ -195,22 +187,19 @@ IcebergDataSink::IcebergDataSink( : nullptr), partitionSpec_(insertTableHandle->partitionSpec()), transformEvaluator_( - !partitionChannels.empty() - ? std::make_unique( - TransformExprBuilder::toExpressions( - partitionSpec_, - partitionChannels_, - inputType_, - std::string(kDefaultIcebergFunctionPrefix)), - connectorQueryCtx_) - : nullptr), + !partitionChannels.empty() ? std::make_unique( + TransformExprBuilder::toExpressions( + partitionSpec_, + partitionChannels_, + inputType_, + functionPrefix), + connectorQueryCtx_) + : nullptr), icebergPartitionName_( partitionSpec_ != nullptr ? std::make_unique(partitionSpec_) : nullptr), - partitionRowType_(std::move(partitionRowType)) { - registerIcebergInternalFunctions(std::string(kDefaultIcebergFunctionPrefix)); -} + partitionRowType_(std::move(partitionRowType)) {} std::vector IcebergDataSink::commitMessage() const { std::vector commitTasks; diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.h b/velox/connectors/hive/iceberg/IcebergDataSink.h index dd0f149ffd6..db3372e1028 100644 --- a/velox/connectors/hive/iceberg/IcebergDataSink.h +++ b/velox/connectors/hive/iceberg/IcebergDataSink.h @@ -24,12 +24,6 @@ namespace facebook::velox::connector::hive::iceberg { -/// Registers Iceberg partition transform functions with prefix. -/// NOTE: These functions are registered for internal transform usage only. -/// Upstream engines such as Prestissimo and Gluten should register the same -/// functions with different prefixes to avoid conflicts. -void registerIcebergInternalFunctions(const std::string_view& prefix); - /// Represents a request for Iceberg write. class IcebergInsertTableHandle final : public HiveInsertTableHandle { public: @@ -93,7 +87,8 @@ class IcebergDataSink : public HiveDataSink { IcebergInsertTableHandlePtr insertTableHandle, const ConnectorQueryCtx* connectorQueryCtx, CommitStrategy commitStrategy, - const std::shared_ptr& hiveConfig); + const std::shared_ptr& hiveConfig, + const std::string& functionPrefix); /// Generates Iceberg-specific commit messages for all writers containing /// metadata about written files. Creates a JSON object for each writer @@ -125,7 +120,8 @@ class IcebergDataSink : public HiveDataSink { CommitStrategy commitStrategy, const std::shared_ptr& hiveConfig, const std::vector& partitionChannels, - RowTypePtr partitionRowType); + RowTypePtr partitionRowType, + const std::string& functionPrefix); void computePartitionAndBucketIds(const RowVectorPtr& input) override; diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index df6fef214df..af795dd907a 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -59,6 +59,7 @@ if(NOT VELOX_DISABLE_GOOGLETEST) add_executable( velox_hive_iceberg_insert_test + IcebergConnectorTest.cpp IcebergInsertTest.cpp IcebergTestBase.cpp Main.cpp diff --git a/velox/connectors/hive/iceberg/tests/IcebergConnectorTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergConnectorTest.cpp new file mode 100644 index 00000000000..3966ccd8307 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergConnectorTest.cpp @@ -0,0 +1,70 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergConnector.h" +#include +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h" + +namespace facebook::velox::connector::hive::iceberg { + +namespace { + +class IcebergConnectorTest : public test::IcebergTestBase { + protected: + static void resetIcebergConnector( + const std::shared_ptr& config) { + unregisterConnector(test::kIcebergConnectorId); + + IcebergConnectorFactory factory; + auto icebergConnector = + factory.newConnector(test::kIcebergConnectorId, config); + registerConnector(icebergConnector); + } +}; + +TEST_F(IcebergConnectorTest, connectorConfiguration) { + auto customConfig = std::make_shared( + std::unordered_map{ + {hive::HiveConfig::kEnableFileHandleCache, "true"}, + {hive::HiveConfig::kNumCacheFileHandles, "1000"}}); + + resetIcebergConnector(customConfig); + + // Verify connector was registered successfully with custom config. + auto icebergConnector = getConnector(test::kIcebergConnectorId); + ASSERT_NE(icebergConnector, nullptr); + + auto config = icebergConnector->connectorConfig(); + ASSERT_NE(config, nullptr); + + hive::HiveConfig hiveConfig(config); + ASSERT_TRUE(hiveConfig.isFileHandleCacheEnabled()); + ASSERT_EQ(hiveConfig.numCacheFileHandles(), 1000); +} + +TEST_F(IcebergConnectorTest, connectorProperties) { + auto icebergConnector = getConnector(test::kIcebergConnectorId); + ASSERT_NE(icebergConnector, nullptr); + + ASSERT_TRUE(icebergConnector->canAddDynamicFilter()); + ASSERT_TRUE(icebergConnector->supportsSplitPreload()); + ASSERT_NE(icebergConnector->ioExecutor(), nullptr); +} + +} // namespace + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/IcebergInsertTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergInsertTest.cpp index e7cf026830a..7c7c05cc0bb 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergInsertTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergInsertTest.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "velox/connectors/hive/iceberg/IcebergConnector.h" #include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -36,7 +37,12 @@ class IcebergInsertTest : public test::IcebergTestBase { auto splits = createSplitsForDirectory(dataPath); ASSERT_EQ(splits.size(), commitTasks.size()); - auto plan = exec::test::PlanBuilder().tableScan(rowType).planNode(); + auto plan = exec::test::PlanBuilder() + .startTableScan() + .connectorId(test::kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); exec::test::AssertQueryBuilder(plan).splits(splits).assertResults(vectors); } }; diff --git a/velox/connectors/hive/iceberg/tests/IcebergTestBase.cpp b/velox/connectors/hive/iceberg/tests/IcebergTestBase.cpp index 4f388fe03c7..12c0cd4dbcc 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergTestBase.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergTestBase.cpp @@ -17,14 +17,15 @@ #include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h" #include - +#include "velox/connectors/hive/iceberg/IcebergConnector.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" #include "velox/connectors/hive/iceberg/PartitionSpec.h" #include "velox/expression/Expr.h" -#include "velox/functions/iceberg/Register.h" namespace facebook::velox::connector::hive::iceberg::test { +const std::string kIcebergConnectorId{"test-iceberg"}; + void IcebergTestBase::SetUp() { HiveConnectorTestBase::SetUp(); #ifdef VELOX_ENABLE_PARQUET @@ -33,8 +34,14 @@ void IcebergTestBase::SetUp() { #endif Type::registerSerDe(); - functions::iceberg::registerFunctions( - std::string(kDefaultTestIcebergFunctionNamePrefix)); + // Register IcebergConnector. + IcebergConnectorFactory icebergFactory; + auto icebergConnector = icebergFactory.newConnector( + kIcebergConnectorId, + std::make_shared( + std::unordered_map()), + ioExecutor_.get()); + registerConnector(icebergConnector); connectorSessionProperties_ = std::make_shared( std::unordered_map(), true); @@ -57,6 +64,7 @@ void IcebergTestBase::TearDown() { opPool_.reset(); root_.reset(); queryCtx_.reset(); + unregisterConnector(kIcebergConnectorId); HiveConnectorTestBase::TearDown(); } @@ -187,7 +195,8 @@ std::shared_ptr IcebergTestBase::createDataSink( tableHandle, connectorQueryCtx_.get(), CommitStrategy::kNoCommit, - connectorConfig_); + connectorConfig_, + std::string(kDefaultIcebergFunctionPrefix)); } std::shared_ptr IcebergTestBase::createDataSinkAndAppendData( @@ -232,7 +241,7 @@ IcebergTestBase::createSplitsForDirectory(const std::string& directory) { ->openFileForRead(filePath); splits.push_back( std::make_shared( - exec::test::kHiveConnectorId, + kIcebergConnectorId, filePath, fileFormat_, 0, diff --git a/velox/connectors/hive/iceberg/tests/IcebergTestBase.h b/velox/connectors/hive/iceberg/tests/IcebergTestBase.h index 6c79231419d..9b2612e1438 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergTestBase.h +++ b/velox/connectors/hive/iceberg/tests/IcebergTestBase.h @@ -29,8 +29,7 @@ namespace facebook::velox::connector::hive::iceberg::test { -constexpr std::string_view kDefaultTestIcebergFunctionNamePrefix{ - "$internal$.test_iceberg."}; +extern const std::string kIcebergConnectorId; struct PartitionField { // 0-based column index. diff --git a/velox/connectors/hive/iceberg/tests/PartitionNameTest.cpp b/velox/connectors/hive/iceberg/tests/PartitionNameTest.cpp index 36f98149d7c..30b74fb5ad6 100644 --- a/velox/connectors/hive/iceberg/tests/PartitionNameTest.cpp +++ b/velox/connectors/hive/iceberg/tests/PartitionNameTest.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "velox/connectors/hive/iceberg/IcebergConnector.h" + #include #include "velox/common/encode/Base64.h" @@ -59,7 +61,7 @@ class PartitionNameTest : public test::IcebergTestBase { partitionSpec, partitionChannels, rowType, - std::string(test::kDefaultTestIcebergFunctionNamePrefix)); + std::string(kDefaultIcebergFunctionPrefix)); auto transformEvaluator = std::make_unique( transformExpressions, connectorQueryCtx_.get()); diff --git a/velox/connectors/hive/iceberg/tests/TransformTest.cpp b/velox/connectors/hive/iceberg/tests/TransformTest.cpp index 420eea89d8b..a0e19a916e6 100644 --- a/velox/connectors/hive/iceberg/tests/TransformTest.cpp +++ b/velox/connectors/hive/iceberg/tests/TransformTest.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "velox/connectors/hive/iceberg/IcebergConnector.h" + #include "velox/common/encode/Base64.h" #include "velox/connectors/hive/iceberg/PartitionSpec.h" #include "velox/connectors/hive/iceberg/TransformEvaluator.h" @@ -39,7 +41,7 @@ class TransformTest : public test::IcebergTestBase { spec, partitionChannels, input->rowType(), - std::string(test::kDefaultTestIcebergFunctionNamePrefix)); + std::string(kDefaultIcebergFunctionPrefix)); auto transformEvaluator = std::make_unique( transformExprs, connectorQueryCtx_.get()); auto result = transformEvaluator->evaluate(input);