Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ option(BOLT_ENABLE_ARROW_CONNECTOR "Build Arrow Memory connector." ON)
option(BOLT_ENABLE_PRESTO_FUNCTIONS "Build Presto SQL functions." ON)
option(BOLT_ENABLE_SPARK_FUNCTIONS "Build Spark SQL functions." ON)
option(BOLT_ENABLE_FLINK_FUNCTIONS "Build Flink SQL functions." ON)
option(BOLT_ENABLE_ICEBERG_FUNCTIONS "Build Iceberg functions." ON)
option(BOLT_ENABLE_SKETCH_FUNCTIONS "Build DataSketches SQL functions." OFF)
option(BOLT_ENABLE_COLOCATE_FUNCTIONS "Enable colocate function support" OFF)
option(ALLOW_REMOTE_COLOCATE_REGISTER "Enable remote colocate function register" OFF)
Expand Down
15 changes: 15 additions & 0 deletions bolt/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ bolt_add_library(bolt_hive_config OBJECT HiveConfig.cpp)

target_link_libraries(bolt_hive_config bolt_exception)

add_subdirectory(iceberg)

bolt_add_library(
bolt_hive_connector
FileHandle.cpp
Expand All @@ -39,8 +41,20 @@ bolt_add_library(
HiveDataSink.cpp
HiveDataSource.cpp
HiveObjectFactory.cpp
HivePartitionName.cpp
HivePartitionUtil.cpp
HiveSplitReaderBase.cpp
iceberg/IcebergConfig.cpp
iceberg/IcebergColumnHandle.cpp
iceberg/IcebergConnector.cpp
iceberg/IcebergDataFileStatistics.cpp
iceberg/IcebergDataSink.cpp
iceberg/IcebergDataSource.cpp
iceberg/IcebergPartitionName.cpp
iceberg/IcebergParquetStatsCollector.cpp
iceberg/PartitionSpec.cpp
iceberg/TransformEvaluator.cpp
iceberg/TransformExprBuilder.cpp
PaimonMetadataColumn.cpp
PaimonSplitReader.cpp
paimon_merge_engines/AggregateEngine.cpp
Expand Down Expand Up @@ -85,6 +99,7 @@ target_link_libraries(
bolt_dwio_orc_reader
bolt_dwio_parquet_reader
bolt_dwio_parquet_writer
bolt_hive_iceberg_splitreader
${ENABLED_FILESYSTEMS}
${ENABLED_DWIO_FORMATS}
bolt_dwio_paimon_deletion_vector_reader
Expand Down
2 changes: 1 addition & 1 deletion bolt/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class HiveConnector : public Connector {
std::shared_ptr<ConnectorInsertTableHandle> connectorInsertTableHandle,
ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy,
const core::QueryConfig& queryConfig) override final;
const core::QueryConfig& queryConfig) override;

folly::Executor* FOLLY_NULLABLE executor() const override {
return executor_;
Expand Down
11 changes: 10 additions & 1 deletion bolt/connectors/hive/HiveConnectorSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
serdeParameters[key.asString()] = value.asString();
}

std::unordered_map<std::string, std::string> infoColumns;
const auto& infoColumnsObj = obj.getDefault("infoColumns", nullptr);
if (infoColumnsObj != nullptr) {
for (const auto& [key, value] : infoColumnsObj.items()) {
infoColumns[key.asString()] = value.asString();
}
}

uint64_t fileSize = static_cast<uint64_t>(obj["fileSize"].asInt());

std::unique_ptr<HiveConnectorSplitCacheLimit> hiveConnectorSplitCacheLimit =
Expand Down Expand Up @@ -159,7 +167,8 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
extraFileInfo,
serdeParameters,
fileSize,
rowIdProperties);
rowIdProperties,
infoColumns);
}

// static
Expand Down
6 changes: 6 additions & 0 deletions bolt/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
}
obj["serdeParameters"] = serdeParametersObj;

folly::dynamic infoColumnsObj = folly::dynamic::object;
for (const auto& [key, value] : infoColumns) {
infoColumnsObj[key] = value;
}
obj["infoColumns"] = infoColumnsObj;

if (hiveConnectorSplitCacheLimit)
obj["hiveConnectorSplitCacheLimit"] =
hiveConnectorSplitCacheLimit->serialize();
Expand Down
Loading
Loading