Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;

Expand Down Expand Up @@ -184,7 +185,12 @@ public static Object getValue(JsonNode partitionValue, Type type)
throw new UncheckedIOException("Failed during JSON conversion of " + partitionValue, e);
}
case DECIMAL:
return partitionValue.decimalValue().setScale(((DecimalType) type).scale());
if (partitionValue.isLong()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change need to be in the PR for Iceberg insertion ? We can make it an independent change I feel. Can you open a new PR for it ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is part of the insertion. This piece of code is used to handle the partition value which will be wrote to iceberg manifest file.

return BigDecimal.valueOf(partitionValue.asLong(), ((DecimalType) type).scale());
}
else {
return partitionValue.decimalValue().setScale(((DecimalType) type).scale());
}
}
throw new UnsupportedOperationException("Type not supported as partition column: " + type);
}
Expand Down
6 changes: 6 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ option(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR "Enable Arrow Flight connector" OFF)

option(PRESTO_ENABLE_SPATIAL "Enable spatial support" ON)

option(PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION "Enable iceberg native connector insertion" OFF)

# Set all Velox options below and make sure that if we include folly headers or
# other dependency headers that include folly headers we turn off the coroutines
# and turn on int128.
Expand Down Expand Up @@ -124,6 +126,10 @@ set(VELOX_ENABLE_GEO
${PRESTO_ENABLE_SPATIAL}
CACHE BOOL "Enable Velox Geometry (aka spatial) support")

if(PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION)
add_compile_definitions(PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION)
endif()

set(VELOX_BUILD_TESTING
OFF
CACHE BOOL "Enable Velox tests")
Expand Down
7 changes: 6 additions & 1 deletion presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ velox-submodule: #: Check out code for velox submodule
submodules: velox-submodule

cmake: submodules #: Use CMake to create a Makefile build system
cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS)
@ICEBERG_FLAGS=""; \
if [ -f "velox/velox/connectors/hive/iceberg/IcebergDataSink.h" ]; then \
echo "Found velox/velox/connectors/hive/iceberg/IcebergDataSink.h - enabling Iceberg native insertion"; \
ICEBERG_FLAGS="-DPRESTO_ENABLE_ICEBERG_NATIVE_INSERTION=ON"; \
fi; \
cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS) $$ICEBERG_FLAGS

build: #: Build the software based in BUILD_DIR and BUILD_TYPE variables
cmake --build $(BUILD_BASE_DIR)/$(BUILD_DIR) -j $(NUM_THREADS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,114 @@ IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
return std::make_unique<protocol::iceberg::IcebergConnectorProtocol>();
}

#ifdef PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move IcebergPrestoToVeloxConnector to its own file ?
We could do that to the remaining PrestoToVeloxConnectors as well as a follow-up cleanup.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit Thanks for the comment.
Yes, we have same observations. And I opened a PR #25631 days ago.
I will follow-up this once the iceberg insertion related code merged in velox. And to split the iceberg code with Hive code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PingLiuPing : You can move IcebergPrestoToVeloxConnector in its current state to its own file as a first step already (before adding the write parts). That PR should get approved in OSS.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. Sure, I can open a separate PR and refactor this first.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened PR #26237

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has a using namespace directive for the velox namespace. So "velox" namespace can be removed from variable names.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Let me fix this.

IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
Comment thread
PingLiuPing marked this conversation as resolved.
const protocol::CreateHandle* createHandle,
const TypeParser& typeParser) const {
auto icebergOutputTableHandle =
std::dynamic_pointer_cast<protocol::iceberg::IcebergOutputTableHandle>(
createHandle->handle.connectorHandle);

VELOX_CHECK_NOT_NULL(
icebergOutputTableHandle,
"Unexpected output table handle type {}",
createHandle->handle.connectorHandle->_type);

bool isPartitioned{false};
const auto inputColumns = toHiveColumns(
icebergOutputTableHandle->inputColumns, typeParser, isPartitioned);

return std::make_unique<
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
inputColumns,
std::make_shared<connector::hive::LocationHandle>(
Comment thread
yingsu00 marked this conversation as resolved.
fmt::format("{}/data", icebergOutputTableHandle->outputPath),
fmt::format("{}/data", icebergOutputTableHandle->outputPath),
connector::hive::LocationHandle::TableType::kNew),
toVeloxIcebergPartitionSpec(
icebergOutputTableHandle->partitionSpec, typeParser),
toVeloxFileFormat(icebergOutputTableHandle->fileFormat),
nullptr,
std::optional(
toFileCompressionKind(icebergOutputTableHandle->compressionCodec)));
}

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
const protocol::InsertHandle* insertHandle,
const TypeParser& typeParser) const {
auto icebergInsertTableHandle =
std::dynamic_pointer_cast<protocol::iceberg::IcebergInsertTableHandle>(
insertHandle->handle.connectorHandle);

VELOX_CHECK_NOT_NULL(
icebergInsertTableHandle,
"Unexpected insert table handle type {}",
insertHandle->handle.connectorHandle->_type);

bool isPartitioned{false};
Comment thread
aditi-pandit marked this conversation as resolved.
const auto inputColumns = toHiveColumns(
icebergInsertTableHandle->inputColumns, typeParser, isPartitioned);

return std::make_unique<connector::hive::iceberg::IcebergInsertTableHandle>(
inputColumns,
std::make_shared<connector::hive::LocationHandle>(
Comment thread
yingsu00 marked this conversation as resolved.
fmt::format("{}/data", icebergInsertTableHandle->outputPath),
fmt::format("{}/data", icebergInsertTableHandle->outputPath),
connector::hive::LocationHandle::TableType::kExisting),
toVeloxIcebergPartitionSpec(
icebergInsertTableHandle->partitionSpec, typeParser),
toVeloxFileFormat(icebergInsertTableHandle->fileFormat),
nullptr,
std::optional(
toFileCompressionKind(icebergInsertTableHandle->compressionCodec)));
}

std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
IcebergPrestoToVeloxConnector::toHiveColumns(
const protocol::List<protocol::iceberg::IcebergColumnHandle>& inputColumns,
const TypeParser& typeParser,
bool& hasPartitionColumn) const {
Comment thread
aditi-pandit marked this conversation as resolved.
hasPartitionColumn = false;
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
hiveColumns;
hiveColumns.reserve(inputColumns.size());
for (const auto& columnHandle : inputColumns) {
hasPartitionColumn |=
columnHandle.columnType == protocol::hive::ColumnType::PARTITION_KEY;
hiveColumns.emplace_back(
std::dynamic_pointer_cast<connector::hive::HiveColumnHandle>(
std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser))));
}
return hiveColumns;
}

connector::hive::iceberg::IcebergPartitionSpec::Field
IcebergPrestoToVeloxConnector::toVeloxIcebergPartitionField(
const protocol::iceberg::IcebergPartitionField& field) const {
return connector::hive::iceberg::IcebergPartitionSpec::Field(
field.name,
static_cast<connector::hive::iceberg::TransformType>(field.transform),
field.parameter ? *field.parameter : std::optional<int32_t>());
}

std::unique_ptr<velox::connector::hive::iceberg::IcebergPartitionSpec>
IcebergPrestoToVeloxConnector::toVeloxIcebergPartitionSpec(
const protocol::iceberg::PrestoIcebergPartitionSpec& spec,
const facebook::presto::TypeParser& typeParser) const {
std::vector<connector::hive::iceberg::IcebergPartitionSpec::Field> fields;
fields.reserve(spec.fields.size());
for (auto field : spec.fields) {
fields.emplace_back(toVeloxIcebergPartitionField(field));
}
return std::make_unique<connector::hive::iceberg::IcebergPartitionSpec>(
spec.specId, fields);
}

#endif

std::unique_ptr<velox::connector::ConnectorSplit>
TpchPrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
*/
#pragma once

#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h"
#include "presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h"
#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h"
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/TableHandle.h"
#ifdef PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
#endif
#include "velox/core/PlanNode.h"
#include "velox/vector/ComplexVector.h"

Expand Down Expand Up @@ -59,8 +63,7 @@ class PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments)
const = 0;
velox::connector::ColumnHandleMap& assignments) const = 0;

[[nodiscard]] virtual std::unique_ptr<
velox::connector::ConnectorInsertTableHandle>
Expand Down Expand Up @@ -134,8 +137,7 @@ class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments)
const final;
velox::connector::ColumnHandleMap& assignments) const final;

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
Expand Down Expand Up @@ -184,11 +186,41 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments)
const final;
velox::connector::ColumnHandleMap& assignments) const final;

std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;

#ifdef PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this class to a separate include file ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment.
In #25631, I have separated this class with other classes.
Can I do it in this follow up PR? If you insist I can move it out in this PR.
After separate them, this macro is not needed as we can conditionally compile the file in CMakeFile.


std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::CreateHandle* createHandle,
const TypeParser& typeParser) const final;

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::InsertHandle* insertHandle,
const TypeParser& typeParser) const final;

private:
std::vector<std::shared_ptr<const velox::connector::hive::HiveColumnHandle>>
toHiveColumns(
const protocol::List<protocol::iceberg::IcebergColumnHandle>&
inputColumns,
const TypeParser& typeParser,
bool& hasPartitionColumn) const;

velox::connector::hive::iceberg::IcebergPartitionSpec::Field
toVeloxIcebergPartitionField(
const protocol::iceberg::IcebergPartitionField& filed) const;

std::unique_ptr<velox::connector::hive::iceberg::IcebergPartitionSpec>
toVeloxIcebergPartitionSpec(
const protocol::iceberg::PrestoIcebergPartitionSpec& spec,
const TypeParser& typeParser) const;

#endif
};

class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector {
Expand All @@ -209,8 +241,7 @@ class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments)
const final;
velox::connector::ColumnHandleMap& assignments) const final;

std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;
Expand Down
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment.
I didn't regenerate the presto_protocol because I didn't change .yml file in this PR.

FYI, I run make presto_protocol based on this PR though, and I do see some file been changed. But after careful check I think they are just format changes. And most likely has nothing to do with code change in this PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no changes in files in presto_protocol/connector/iceberg folder ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment.
When I run make presto_protocol there are files been changed in presto_protocol/connector/iceberg folder. But I deem all of those are just code format change. So, I choose not to commit them.

Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,15 @@ using IcebergConnectorProtocol = ConnectorProtocolTemplate<
IcebergTableHandle,
IcebergTableLayoutHandle,
IcebergColumnHandle,

#ifdef PRESTO_ENABLE_ICEBERG_NATIVE_INSERTION
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think its worth adding these ifdefs in the protocol layer here. Should be fine to just include IcebergInsertTableHandle and IcebergOutputTableHandle in the protocol files nonetheless.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment.
Let me test it and see if the error message are expected.

Copy link
Copy Markdown
Contributor Author

@PingLiuPing PingLiuPing Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the velox does not support iceberg insertion, and removed this macro I get following error messages:
using IcebergConnectorProtocol = ConnectorProtocolTemplate<
...
IcebergInsertTableHandle,
IcebergOutputTableHandle,
...

presto:iceberg> insert into year_t1 values (TIMESTAMP '2025-02-28 14:00:02', 11, DATE '2025-02-28', 125);

Query 20251002_094514_00000_7gkvm, FAILED, 2 nodes
Splits: 1 total, 0 done (0.00%)
[Latency: client-side: 441ms, server-side: 427ms] [0 rows, 0B] [0 rows/s, 0B/s]

Query 20251002_094514_00000_7gkvm failed:  Unsupported table writer handle: {"@type":"InsertHandle","handle":{"connectorHandle":{"@type":"hive-iceberg","compressionCodec":"GZIP","fileFormat":"PARQUET","inputColumns":[{"@type":"hive-iceberg","columnIdentity":{"children":[],"id":1,"name":"c_timestamp","typeCategory":"PRIMITIVE"},"columnType":"REGULAR","requiredSubfields":[],"type":"timestamp"},{"@type":"hive-iceberg","columnIdentity":{"children":[],"id":2,"name":"c_int","typeCategory":"PRIMITIVE"},"columnType":"PARTITION_KEY","requiredSubfields":[],"type":"integer"},{"@type":"hive-iceberg","columnIdentity":{"children":[],"id":3,"name":"c_date","typeCategory":"PRIMITIVE"},"columnType":"REGULAR","requiredSubfields":[],"type":"date"},{"@type":"hive-iceberg","columnIdentity":{"children":[],"id":4,"name":"c_bigint","typeCategory":"PRIMITIVE"},"columnType":"REGULAR","requiredSubfields":[],"type":"bigint"}],"outputPath":"file:/Users/pingliu/work/opensource/presto/presto-native-execution/data/iceberg_data/HIVE/iceberg/year_t1","partitionSpec":{"fields":[{"fieldId":1000,"name":"c_int","sourceId":2,"transform":"IDENTITY"}],"schema":{"aliases":{},"columnNameToIdMapping":{"c_bigint":4,"c_date":3,"c_int":2,"c_timestamp":1},"columns":[{"id":1,"name":"c_timestamp","optional":true,"prestoType":"timestamp"},{"id":2,"name":"c_int","optional":true,"prestoType":"integer"},{"id":3,"name":"c_date","optional":true,"prestoType":"date"},{"id":4,"name":"c_bigint","optional":true,"prestoType":"bigint"}],"identifierFieldIds":[],"schemaId":0},"specId":0},"schema":{"aliases":{},"columnNameToIdMapping":{"c_bigint":4,"c_date":3,"c_int":2,"c_timestamp":1},"columns":[{"id":1,"name":"c_timestamp","optional":true,"prestoType":"timestamp"},{"id":2,"name":"c_int","optional":true,"prestoType":"integer"},{"id":3,"name":"c_date","optional":true,"prestoType":"date"},{"id":4,"name":"c_bigint","optional":true,"prestoType":"bigint"}],"identifierFieldIds":[],"schemaId":0},"schemaName":"iceberg","sortOrder":[],"storageProperties":{"commit.retry.num-retries":"4","read.split.target-size":"134217728","write.delete.mode":"merge-on-read","write.format.default":"PARQUET","write.metadata.delete-after-commit.enabled":"false","write.metadata.metrics.max-inferred-column-defaults":"100","write.metadata.previous-versions-max":"100","write.parquet.compression-codec":"GZIP","write.update.mode":"merge-on-read"},"tableName":{"snapshotId":5359319739731907098,"tableName":"year_t1","tableType":"DATA"}},"connectorId":"iceberg","transactionHandle":{"@type":"hive","uuid":"451ae372-1bbe-4410-9f93-40a24b60c27c"}},"schemaTableName":{"schema":"iceberg","table":"year_t1"}}

When using NotImplemented

using IcebergConnectorProtocol = ConnectorProtocolTemplate<
    ...
    NotImplemented,
    NotImplemented,
    ...
>;

I got following error message:

presto:iceberg> insert into year_t1 values (TIMESTAMP '2025-02-28 14:00:02', 11, DATE '2025-02-28', 125);
Query 20251002_095454_00000_mpr4s failed:  Not implemented: N8facebook6presto8protocol26ConnectorInsertTableHandleE

Seems we should use the last one as it is more precise and shorter. What's your opinion?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see where you are coming from, but I feel that's not how protocol conversion should be used. Protocol conversion from Java -> C++ artifacts should happen just as choosing the language backend. If we have some unsupported logic in the Presto -> Velox layer, then its reasonable to error in the PrestoToVelox code. The error message "Unsupported table writer handle" with the handle value is a reasonable behavior imo.

Just adding "IcebergInsertTableHandle,
IcebergOutputTableHandle" and updating the protocol layer can be submit independently imo.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with @aditi-pandit. We can document this limitation so it's easier for folks to understand.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan @aditi-pandit Thanks for the comments. I will fix this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add this change to #26237

IcebergInsertTableHandle,
IcebergOutputTableHandle,
#else
NotImplemented,
NotImplemented,
#endif

IcebergSplit,
NotImplemented,
hive::HiveTransactionHandle,
Expand Down
Loading