diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionData.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionData.java index 3a2727ffde858..a235ff29bdcf9 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionData.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionData.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.StringWriter; import java.io.UncheckedIOException; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.Arrays; @@ -184,7 +185,12 @@ public static Object getValue(JsonNode partitionValue, Type type) throw new UncheckedIOException("Failed during JSON conversion of " + partitionValue, e); } case DECIMAL: - return partitionValue.decimalValue().setScale(((DecimalType) type).scale()); + if (partitionValue.isLong()) { + return BigDecimal.valueOf(partitionValue.asLong(), ((DecimalType) type).scale()); + } + else { + return partitionValue.decimalValue().setScale(((DecimalType) type).scale()); + } } throw new UnsupportedOperationException("Type not supported as partition column: " + type); } diff --git a/presto-native-execution/CMakeLists.txt b/presto-native-execution/CMakeLists.txt index 1e7fce6c7717b..2d532c9bd1900 100644 --- a/presto-native-execution/CMakeLists.txt +++ b/presto-native-execution/CMakeLists.txt @@ -78,6 +78,8 @@ option(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR "Enable Arrow Flight connector" OFF) option(PRESTO_ENABLE_SPATIAL "Enable spatial support" ON) +option(PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION "Enable iceberg native connector insertion" OFF) + # Set all Velox options below and make sure that if we include folly headers or # other dependency headers that include folly headers we turn off the coroutines # and turn on int128. @@ -124,6 +126,10 @@ set(VELOX_ENABLE_GEO ${PRESTO_ENABLE_SPATIAL} CACHE BOOL "Enable Velox Geometry (aka spatial) support") +if(PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION) + add_compile_definitions(PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION) +endif() + set(VELOX_BUILD_TESTING OFF CACHE BOOL "Enable Velox tests") diff --git a/presto-native-execution/Makefile b/presto-native-execution/Makefile index 27a811c11d462..bb53d816847c0 100644 --- a/presto-native-execution/Makefile +++ b/presto-native-execution/Makefile @@ -101,7 +101,12 @@ velox-submodule: #: Check out code for velox submodule submodules: velox-submodule cmake: submodules #: Use CMake to create a Makefile build system - cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS) + @ICEBERG_FLAGS=""; \ + if [ -f "velox/velox/connectors/hive/iceberg/IcebergDataSink.h" ]; then \ + echo "Found velox/velox/connectors/hive/iceberg/IcebergDataSink.h - enabling Iceberg native insertion"; \ + ICEBERG_FLAGS="-DPRESTO_ENABLE_ICEBERG_NATIVE_INSERTION=ON"; \ + fi; \ + cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS) $$ICEBERG_FLAGS build: #: Build the software based in BUILD_DIR and BUILD_TYPE variables cmake --build $(BUILD_BASE_DIR)/$(BUILD_DIR) -j $(NUM_THREADS) diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp index 9f5f4acf0824f..ccde762c2a16e 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp @@ -1493,6 +1493,114 @@ IcebergPrestoToVeloxConnector::createConnectorProtocol() const { return std::make_unique(); } +#ifdef PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION + +std::unique_ptr +IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle( + const protocol::CreateHandle* createHandle, + const TypeParser& typeParser) const { + auto icebergOutputTableHandle = + std::dynamic_pointer_cast( + createHandle->handle.connectorHandle); + + VELOX_CHECK_NOT_NULL( + icebergOutputTableHandle, + "Unexpected output table handle type {}", + createHandle->handle.connectorHandle->_type); + + bool isPartitioned{false}; + const auto inputColumns = toHiveColumns( + icebergOutputTableHandle->inputColumns, typeParser, isPartitioned); + + return std::make_unique< + velox::connector::hive::iceberg::IcebergInsertTableHandle>( + inputColumns, + std::make_shared( + fmt::format("{}/data", icebergOutputTableHandle->outputPath), + fmt::format("{}/data", icebergOutputTableHandle->outputPath), + connector::hive::LocationHandle::TableType::kNew), + toVeloxIcebergPartitionSpec( + icebergOutputTableHandle->partitionSpec, typeParser), + toVeloxFileFormat(icebergOutputTableHandle->fileFormat), + nullptr, + std::optional( + toFileCompressionKind(icebergOutputTableHandle->compressionCodec))); +} + +std::unique_ptr +IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle( + const protocol::InsertHandle* insertHandle, + const TypeParser& typeParser) const { + auto icebergInsertTableHandle = + std::dynamic_pointer_cast( + insertHandle->handle.connectorHandle); + + VELOX_CHECK_NOT_NULL( + icebergInsertTableHandle, + "Unexpected insert table handle type {}", + insertHandle->handle.connectorHandle->_type); + + bool isPartitioned{false}; + const auto inputColumns = toHiveColumns( + icebergInsertTableHandle->inputColumns, typeParser, isPartitioned); + + return std::make_unique( + inputColumns, + std::make_shared( + fmt::format("{}/data", icebergInsertTableHandle->outputPath), + fmt::format("{}/data", icebergInsertTableHandle->outputPath), + connector::hive::LocationHandle::TableType::kExisting), + toVeloxIcebergPartitionSpec( + icebergInsertTableHandle->partitionSpec, typeParser), + toVeloxFileFormat(icebergInsertTableHandle->fileFormat), + nullptr, + std::optional( + toFileCompressionKind(icebergInsertTableHandle->compressionCodec))); +} + +std::vector> +IcebergPrestoToVeloxConnector::toHiveColumns( + const protocol::List& inputColumns, + const TypeParser& typeParser, + bool& hasPartitionColumn) const { + hasPartitionColumn = false; + std::vector> + hiveColumns; + hiveColumns.reserve(inputColumns.size()); + for (const auto& columnHandle : inputColumns) { + hasPartitionColumn |= + columnHandle.columnType == protocol::hive::ColumnType::PARTITION_KEY; + hiveColumns.emplace_back( + std::dynamic_pointer_cast( + std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser)))); + } + return hiveColumns; +} + +connector::hive::iceberg::IcebergPartitionSpec::Field +IcebergPrestoToVeloxConnector::toVeloxIcebergPartitionField( + const protocol::iceberg::IcebergPartitionField& field) const { + return connector::hive::iceberg::IcebergPartitionSpec::Field( + field.name, + static_cast(field.transform), + field.parameter ? *field.parameter : std::optional()); +} + +std::unique_ptr +IcebergPrestoToVeloxConnector::toVeloxIcebergPartitionSpec( + const protocol::iceberg::PrestoIcebergPartitionSpec& spec, + const facebook::presto::TypeParser& typeParser) const { + std::vector fields; + fields.reserve(spec.fields.size()); + for (auto field : spec.fields) { + fields.emplace_back(toVeloxIcebergPartitionField(field)); + } + return std::make_unique( + spec.specId, fields); +} + +#endif + std::unique_ptr TpchPrestoToVeloxConnector::toVeloxSplit( const protocol::ConnectorId& catalogId, diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h index 18183ec86c388..692aea545920b 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h @@ -13,11 +13,15 @@ */ #pragma once +#include "presto_cpp/main/types/PrestoToVeloxExpr.h" #include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h" +#include "presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h" #include "presto_cpp/presto_protocol/core/ConnectorProtocol.h" -#include "presto_cpp/main/types/PrestoToVeloxExpr.h" #include "velox/connectors/Connector.h" #include "velox/connectors/hive/TableHandle.h" +#ifdef PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION +#include "velox/connectors/hive/iceberg/IcebergDataSink.h" +#endif #include "velox/core/PlanNode.h" #include "velox/vector/ComplexVector.h" @@ -59,8 +63,7 @@ class PrestoToVeloxConnector { const protocol::TableHandle& tableHandle, const VeloxExprConverter& exprConverter, const TypeParser& typeParser, - velox::connector::ColumnHandleMap& assignments) - const = 0; + velox::connector::ColumnHandleMap& assignments) const = 0; [[nodiscard]] virtual std::unique_ptr< velox::connector::ConnectorInsertTableHandle> @@ -134,8 +137,7 @@ class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector { const protocol::TableHandle& tableHandle, const VeloxExprConverter& exprConverter, const TypeParser& typeParser, - velox::connector::ColumnHandleMap& assignments) - const final; + velox::connector::ColumnHandleMap& assignments) const final; std::unique_ptr toVeloxInsertTableHandle( @@ -184,11 +186,41 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector { const protocol::TableHandle& tableHandle, const VeloxExprConverter& exprConverter, const TypeParser& typeParser, - velox::connector::ColumnHandleMap& assignments) - const final; + velox::connector::ColumnHandleMap& assignments) const final; - std::unique_ptr createConnectorProtocol() - const final; + std::unique_ptr createConnectorProtocol() + const final; + +#ifdef PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION + + std::unique_ptr + toVeloxInsertTableHandle( + const protocol::CreateHandle* createHandle, + const TypeParser& typeParser) const final; + + std::unique_ptr + toVeloxInsertTableHandle( + const protocol::InsertHandle* insertHandle, + const TypeParser& typeParser) const final; + + private: + std::vector> + toHiveColumns( + const protocol::List& + inputColumns, + const TypeParser& typeParser, + bool& hasPartitionColumn) const; + + velox::connector::hive::iceberg::IcebergPartitionSpec::Field + toVeloxIcebergPartitionField( + const protocol::iceberg::IcebergPartitionField& filed) const; + + std::unique_ptr + toVeloxIcebergPartitionSpec( + const protocol::iceberg::PrestoIcebergPartitionSpec& spec, + const TypeParser& typeParser) const; + +#endif }; class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector { @@ -209,8 +241,7 @@ class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector { const protocol::TableHandle& tableHandle, const VeloxExprConverter& exprConverter, const TypeParser& typeParser, - velox::connector::ColumnHandleMap& assignments) - const final; + velox::connector::ColumnHandleMap& assignments) const final; std::unique_ptr createConnectorProtocol() const final; diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h index 8fbfa5c378110..198d29032dc1d 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h @@ -22,8 +22,15 @@ using IcebergConnectorProtocol = ConnectorProtocolTemplate< IcebergTableHandle, IcebergTableLayoutHandle, IcebergColumnHandle, + +#ifdef PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION + IcebergInsertTableHandle, + IcebergOutputTableHandle, +#else NotImplemented, NotImplemented, +#endif + IcebergSplit, NotImplemented, hive::HiveTransactionHandle,