Skip to content

feat(plugin-iceberg): Add rewrite_data_files procedure#26374

Merged
hantangwangd merged 10 commits intoprestodb:masterfrom
hantangwangd:support_call_distributed_procedure_part2
Dec 8, 2025
Merged

feat(plugin-iceberg): Add rewrite_data_files procedure#26374
hantangwangd merged 10 commits intoprestodb:masterfrom
hantangwangd:support_call_distributed_procedure_part2

Conversation

@hantangwangd
Copy link
Copy Markdown
Member

@hantangwangd hantangwangd commented Oct 21, 2025

Description

This PR is the second part of many PRs to support distributed procedure into Presto. It is a split of the original entire PR which is located here: #22659.

The whole work in this PR includes the following parts:

  1. Re-factor Iceberg connector to support call distributed procedure. Introduce Iceberg's procedure context and expand IcebergSplitManager to support split source planned by IcebergAbstractMetadata.beginCallDistributedProcedure(...). This split source will be set to procedure context, and use procedure context to hold all the files to be rewritten as well.

  2. Support Iceberg rewrite_data_files procedure. It build a customized split source, set the split source to procedure context in order to be used in IcebergSplitManager. And register a file scan task consumer to collector and hold all the scanned files into procedure context. Then finally in the commit stage, get all the data files and delete files that has been rewritten, and all the files that has been newly generated, change and commit their metadata through Iceberg table's RewriteFiles transaction.

Motivation and Context

prestodb/rfcs#12

Impact

N/A

Test Plan

  • Add test cases for validating the result and plan tree shape of iceberg specific distributed procedure: rewrite_data_files

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.
  • If adding new dependencies, verified they have an OpenSSF Scorecard score of 5.0 or higher (or obtained explicit TSC approval for lower scores).

Release Notes

Iceberg Connector Changes
 * Add support for calling distributed procedure in Iceberg connector.
 * Add ``rewrite_data_files`` procedure in Iceberg connector.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @hantangwangd, your pull request is larger than the review limit of 150000 diff characters

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part2 branch 2 times, most recently from e3f4e26 to a6a6101 Compare October 21, 2025 09:37
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part2 branch 3 times, most recently from 7b97da0 to 7ebf2a0 Compare November 4, 2025 05:59
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part2 branch 2 times, most recently from d258f2f to 30f74af Compare November 15, 2025 05:35
@hantangwangd hantangwangd marked this pull request as ready for review November 15, 2025 09:06
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Nov 15, 2025

Reviewer's Guide

This PR refactors the Iceberg connector to enable distributed procedure execution by introducing a procedure context framework, adding a new serialized handle type, extending metadata to manage procedure lifecycles, implementing the rewrite_data_files procedure and its split source, wiring ProcedureRegistry into the connector, and providing extensive tests.

Sequence diagram for distributed procedure lifecycle in Iceberg connector

sequenceDiagram
    participant "Coordinator (Presto Engine)"
    participant "IcebergAbstractMetadata"
    participant "ProcedureRegistry"
    participant "DistributedProcedure (RewriteDataFilesProcedure)"
    participant "IcebergProcedureContext"
    participant "IcebergSplitManager"
    participant "CallDistributedProcedureSplitSource"
    participant "IcebergPageSinkProvider"
    participant "IcebergPageSink"
    participant "IcebergTable"

    "Coordinator (Presto Engine)"->>"IcebergAbstractMetadata": beginCallDistributedProcedure(...)
    "IcebergAbstractMetadata"->>"ProcedureRegistry": resolve(procedureName)
    "ProcedureRegistry"-->>"IcebergAbstractMetadata": DistributedProcedure instance
    "IcebergAbstractMetadata"->>"DistributedProcedure": createContext()
    "DistributedProcedure"-->>"IcebergAbstractMetadata": IcebergProcedureContext
    "IcebergAbstractMetadata"->>"IcebergProcedureContext": setTable(Table)
    "IcebergAbstractMetadata"->>"IcebergProcedureContext": setTransaction(Transaction)
    "IcebergAbstractMetadata"->>"DistributedProcedure": begin(...)
    "DistributedProcedure"->>"IcebergProcedureContext": setConnectorSplitSource(CallDistributedProcedureSplitSource)
    "IcebergAbstractMetadata"-->>"Coordinator (Presto Engine)": IcebergDistributedProcedureHandle

    "Coordinator (Presto Engine)"->>"IcebergSplitManager": getSplits(...)
    "IcebergSplitManager"->>"IcebergAbstractMetadata": getSplitSourceInCurrentCallProcedureTransaction()
    "IcebergAbstractMetadata"-->>"IcebergSplitManager": CallDistributedProcedureSplitSource
    "IcebergSplitManager"-->>"Coordinator (Presto Engine)": splits

    "Coordinator (Presto Engine)"->>"IcebergPageSinkProvider": createPageSink(..., IcebergDistributedProcedureHandle)
    "IcebergPageSinkProvider"->>"IcebergPageSink": createPageSink(...)

    "Coordinator (Presto Engine)"->>"IcebergAbstractMetadata": finishCallDistributedProcedure(...)
    "IcebergAbstractMetadata"->>"ProcedureRegistry": resolve(procedureName)
    "ProcedureRegistry"-->>"IcebergAbstractMetadata": DistributedProcedure instance
    "IcebergAbstractMetadata"->>"DistributedProcedure": finish(...)
    "DistributedProcedure"->>"IcebergProcedureContext": collect scanned files, commit new files
    "IcebergAbstractMetadata"->>"IcebergTable": commitTransaction()
    "IcebergAbstractMetadata"->>"IcebergProcedureContext": destroy()
    "IcebergAbstractMetadata"-->>"Coordinator (Presto Engine)": procedure finished
Loading

ER diagram for new IcebergDistributedProcedureHandle data type

erDiagram
    ICEBERG_DISTRIBUTED_PROCEDURE_HANDLE {
        String schemaName
        IcebergTableName tableName
        PrestoIcebergSchema schema
        PrestoIcebergPartitionSpec partitionSpec
        IcebergColumnHandle inputColumns
        String outputPath
        FileFormat fileFormat
        HiveCompressionCodec compressionCodec
        Map storageProperties
    }
    ICEBERG_DISTRIBUTED_PROCEDURE_HANDLE ||--o| ICEBERG_TABLE_NAME : "tableName"
    ICEBERG_DISTRIBUTED_PROCEDURE_HANDLE ||--o| PRESTO_ICEBERG_SCHEMA : "schema"
    ICEBERG_DISTRIBUTED_PROCEDURE_HANDLE ||--o| PRESTO_ICEBERG_PARTITION_SPEC : "partitionSpec"
    ICEBERG_DISTRIBUTED_PROCEDURE_HANDLE ||--|{ ICEBERG_COLUMN_HANDLE : "inputColumns"
    ICEBERG_DISTRIBUTED_PROCEDURE_HANDLE ||--o| FILE_FORMAT : "fileFormat"
    ICEBERG_DISTRIBUTED_PROCEDURE_HANDLE ||--o| HIVE_COMPRESSION_CODEC : "compressionCodec"
Loading

Class diagram for new and updated Iceberg distributed procedure types

classDiagram
    class IcebergProcedureContext {
        +Set<DataFile> scannedDataFiles
        +Set<DeleteFile> fullyAppliedDeleteFiles
        +Map<String, Object> relevantData
        +Optional<Table> table
        +Transaction transaction
        +Optional<ConnectorSplitSource> connectorSplitSource
        +setTable(Table table)
        +setTransaction(Transaction transaction)
        +getTable()
        +getTransaction()
        +setConnectorSplitSource(ConnectorSplitSource splitSource)
        +getConnectorSplitSource()
        +getScannedDataFiles()
        +getFullyAppliedDeleteFiles()
        +getRelevantData()
        +destroy()
    }
    class IcebergDistributedProcedureHandle {
        +String schemaName
        +IcebergTableName tableName
        +PrestoIcebergSchema schema
        +PrestoIcebergPartitionSpec partitionSpec
        +List<IcebergColumnHandle> inputColumns
        +String outputPath
        +FileFormat fileFormat
        +HiveCompressionCodec compressionCodec
        +Map<String, String> storageProperties
        +IcebergDistributedProcedureHandle(...)
    }
    class IcebergWritableTableHandle {
    }
    IcebergDistributedProcedureHandle --|> IcebergWritableTableHandle
    IcebergDistributedProcedureHandle ..|> ConnectorDistributedProcedureHandle
    class CallDistributedProcedureSplitSource {
        -CloseableIterator<FileScanTask> fileScanTaskIterator
        -Optional<Consumer<FileScanTask>> fileScanTaskConsumer
        -TableScan tableScan
        -Closer closer
        -double minimumAssignedSplitWeight
        -ConnectorSession session
        +getNextBatch(...)
        +isFinished()
        +close()
        -toIcebergSplit(FileScanTask task)
    }
    class RewriteDataFilesProcedure {
        +TypeManager typeManager
        +JsonCodec<CommitTaskData> commitTaskCodec
        +RewriteDataFilesProcedure(...)
        +get()
        -beginCallDistributedProcedure(...)
        -finishCallDistributedProcedure(...)
    }
    IcebergProcedureContext ..|> ConnectorProcedureContext
    CallDistributedProcedureSplitSource ..|> ConnectorSplitSource
    RewriteDataFilesProcedure ..|> Provider
    RewriteDataFilesProcedure ..|> DistributedProcedure
Loading

File-Level Changes

Change Details Files
Add support for IcebergDistributedProcedureHandle serialization
  • Define IcebergDistributedProcedureHandle in C++
  • Implement to_json/from_json in C++
  • Register handle key in presto_protocol_iceberg.yml
  • Introduce Java IcebergDistributedProcedureHandle with JSON annotations
presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp
presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h
presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.yml
presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java
Extend IcebergAbstractMetadata to manage distributed procedures
  • Inject ProcedureRegistry and hold an optional procedureContext
  • Override beginCallDistributedProcedure and finishCallDistributedProcedure
  • Expose split source from the current procedure transaction
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Introduce IcebergProcedureContext to track procedure state
  • Implement context storing split source, scanned data files, delete files, table and transaction
  • Provide lifecycle methods for setup and cleanup
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java
Implement the rewrite_data_files distributed procedure
  • Provide RewriteDataFilesProcedure with TableDataRewriteDistributedProcedure API
  • Register the procedure in IcebergCommonModule
  • Implement begin/finish logic to gather files and commit via RewriteFiles
presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java
Create CallDistributedProcedureSplitSource for split streaming
  • Convert Iceberg FileScanTask to ConnectorSplits
  • Invoke consumer to record scanned and delete files
  • Implement getNextBatch, isFinished and close
presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java
Adapt IcebergSplitManager to use context-provided splits
  • Check procedureContext for a ConnectorSplitSource
  • Return context split source when present
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java
Wire ProcedureRegistry through connector factories
  • Bind ProcedureRegistry in InternalIcebergConnectorFactory
  • Pass registry into native and Hive metadata factories
presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java
Add and update tests for distributed procedure and rewrite logic
  • Extend distributed smoke tests and functional tests for metadata delete
  • Introduce TestRewriteDataFilesProcedure suite
  • Update logical planner tests and plan match patterns
presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java
presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java
presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java
presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java
presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • This PR is very large and touches many layers (protocol, metadata, split manager, planner, C++ bindings, and extensive tests); consider splitting into smaller, focused PRs to simplify review and isolate potential regressions.
  • There’s a lot of duplicated test setup/assertion code in TestRewriteDataFilesProcedure (and related tests); factor out common helpers for table creation, file‐count assertions, and cleanup to reduce maintenance overhead.
  • In IcebergProcedureContext.destroy, you clear splits and file sets but don’t reset the 'table' or 'transaction' fields—consider clearing those as well to fully release resources after procedure completion.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- This PR is very large and touches many layers (protocol, metadata, split manager, planner, C++ bindings, and extensive tests); consider splitting into smaller, focused PRs to simplify review and isolate potential regressions.
- There’s a lot of duplicated test setup/assertion code in TestRewriteDataFilesProcedure (and related tests); factor out common helpers for table creation, file‐count assertions, and cleanup to reduce maintenance overhead.
- In IcebergProcedureContext.destroy, you clear splits and file sets but don’t reset the 'table' or 'transaction' fields—consider clearing those as well to fully release resources after procedure completion.

## Individual Comments

### Comment 1
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java:92-93` </location>
<code_context>
+        this.relevantData.clear();
+        this.scannedDataFiles.clear();
+        this.fullyAppliedDeleteFiles.clear();
+        this.connectorSplitSource.ifPresent(ConnectorSplitSource::close);
+        this.connectorSplitSource = null;
+    }
+}
</code_context>

<issue_to_address>
**issue (bug_risk):** Setting connectorSplitSource to null may lead to NullPointerExceptions.

Assigning null to connectorSplitSource, which is an Optional, breaks expected usage and may cause runtime errors. Use Optional.empty() instead to prevent NullPointerExceptions.
</issue_to_address>

### Comment 2
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java:167-170` </location>
<code_context>
+                    .map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
+                    .collect(toImmutableList());
+
+            org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
+                    .map(field -> field.transform().getResultType(
+                            icebergTable.schema().findType(field.sourceId())))
+                    .toArray(Type[]::new);
+
+            Set<DataFile> newFiles = new HashSet<>();
</code_context>

<issue_to_address>
**issue (bug_risk):** Potential mismatch between partition spec fields and schema types.

If findType returns null for a missing sourceId, this could lead to runtime errors. Please add validation or error handling for cases where the type is not found.
</issue_to_address>

### Comment 3
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java:1074-1083` </location>
<code_context>
+            throw new PrestoException(NOT_SUPPORTED, "This connector do not allow table execute at specified snapshot");
+        }
+
+        transaction = icebergTable.newTransaction();
+        BaseProcedure<?> procedure = procedureRegistry.resolve(
+                new ConnectorId(procedureName.getCatalogName()),
+                new SchemaTableName(
+                        procedureName.getSchemaName(),
+                        procedureName.getObjectName()));
+        verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
+        procedureContext = Optional.of((IcebergProcedureContext) ((DistributedProcedure) procedure).createContext());
+        procedureContext.get().setTable(icebergTable);
+        procedureContext.get().setTransaction(transaction);
+        return ((DistributedProcedure) procedure).begin(session, procedureContext.get(), tableLayoutHandle, arguments);
+    }
</code_context>

<issue_to_address>
**issue (bug_risk):** Transaction is assigned to a field but not cleared after procedure completion.

Since the transaction field remains set after finishCallDistributedProcedure, running multiple procedures may result in stale or incorrect state. Please ensure the transaction field is cleared or properly scoped after each procedure completes.
</issue_to_address>

### Comment 4
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java:1099` </location>
<code_context>
+        verify(procedureContext.isPresent(), "procedure context must be present");
+        ((DistributedProcedure) procedure).finish(procedureContext.get(), procedureHandle, fragments);
+        transaction.commitTransaction();
+        procedureContext.get().destroy();
+    }
+
</code_context>

<issue_to_address>
**issue (bug_risk):** Destroying procedureContext does not reset the Optional field.

Reset procedureContext to Optional.empty() after destroy to prevent unexpected behavior if accessed post-completion.
</issue_to_address>

### Comment 5
<location> `presto-docs/src/main/sphinx/connector/iceberg.rst:1239-1242` </location>
<code_context>
+Rewrite Data Files
+^^^^^^^^^^^^^^^^^^
+
+Iceberg tracks all data files under different partition specs in a table. More data files requires
+more metadata to be stored in manifest files, and small data files can cause unnecessary amount metadata and
+less efficient queries from file open costs. Also, data files under different partition specs can
+prevent metadata level deletion or thorough predicate push down for Presto.
</code_context>

<issue_to_address>
**issue (typo):** Correct verb agreement and missing word in sentence.

The correct sentence is: 'More data files require more metadata to be stored in manifest files, and small data files can cause an unnecessary amount of metadata and less efficient queries due to file open costs.'

```suggestion
Iceberg tracks all data files under different partition specs in a table. More data files require
more metadata to be stored in manifest files, and small data files can cause an unnecessary amount of metadata and
less efficient queries due to file open costs. Also, data files under different partition specs can
prevent metadata level deletion or thorough predicate push down for Presto.
```
</issue_to_address>

### Comment 6
<location> `presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp:745` </location>
<code_context>
 } // namespace facebook::presto::protocol::iceberg
 namespace facebook::presto::protocol::iceberg {
+IcebergDistributedProcedureHandle::
+    IcebergDistributedProcedureHandle() noexcept {
+  _type = "hive-iceberg";
+}
</code_context>

<issue_to_address>
**issue (review_instructions):** Member variable '_type' uses leading underscore, but should use camelCase_ for private/protected members.

The member variable '_type' does not follow the required camelCase_ convention for private/protected members. Please rename it to 'type_' to comply with the coding standard.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`

**Instructions:**
Use camelCase_ for private and protected members variables.

</details>
</issue_to_address>

### Comment 7
<location> `presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp:746` </location>
<code_context>
 namespace facebook::presto::protocol::iceberg {
+IcebergDistributedProcedureHandle::
+    IcebergDistributedProcedureHandle() noexcept {
+  _type = "hive-iceberg";
+}
+
</code_context>

<issue_to_address>
**issue (review_instructions):** Member variable '_type' should use camelCase_ (e.g., 'type_') for private/protected members.

Please update '_type' to 'type_' to match the required naming convention for private/protected member variables.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`

**Instructions:**
Use camelCase_ for private and protected members variables.

</details>
</issue_to_address>

### Comment 8
<location> `presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp:818` </location>
<code_context>
+}
+
+void from_json(const json& j, IcebergDistributedProcedureHandle& p) {
+  p._type = j["@type"];
+  from_json_key(
+      j,
</code_context>

<issue_to_address>
**issue (review_instructions):** Member variable '_type' should use camelCase_ (e.g., 'type_') for private/protected members.

Please update '_type' to 'type_' to match the required naming convention for private/protected member variables.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`

**Instructions:**
Use camelCase_ for private and protected members variables.

</details>
</issue_to_address>

### Comment 9
<location> `presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp:745` </location>
<code_context>
 } // namespace facebook::presto::protocol::iceberg
 namespace facebook::presto::protocol::iceberg {
+IcebergDistributedProcedureHandle::
+    IcebergDistributedProcedureHandle() noexcept {
+  _type = "hive-iceberg";
+}
</code_context>

<issue_to_address>
**issue (review_instructions):** The member variable '_type' uses a leading underscore, which is not camelCase_ as required for private/protected members.

Private/protected member variables should use camelCase_ (e.g., 'type_') rather than a leading underscore. Please rename '_type' to 'type_' for consistency with the coding standard.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`

**Instructions:**
Use camelCase_ for private and protected members variables.

</details>
</issue_to_address>

### Comment 10
<location> `presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp:746` </location>
<code_context>
 namespace facebook::presto::protocol::iceberg {
+IcebergDistributedProcedureHandle::
+    IcebergDistributedProcedureHandle() noexcept {
+  _type = "hive-iceberg";
+}
+
</code_context>

<issue_to_address>
**issue (review_instructions):** The member variable '_type' uses a leading underscore, which is not camelCase_ as required for private/protected members.

Please rename '_type' to 'type_' to follow the camelCase_ convention for private/protected member variables.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`

**Instructions:**
Use camelCase_ for private and protected members variables.

</details>
</issue_to_address>

### Comment 11
<location> `presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp:818` </location>
<code_context>
+}
+
+void from_json(const json& j, IcebergDistributedProcedureHandle& p) {
+  p._type = j["@type"];
+  from_json_key(
+      j,
</code_context>

<issue_to_address>
**issue (review_instructions):** The member variable '_type' uses a leading underscore, which is not camelCase_ as required for private/protected members.

Please update '_type' to 'type_' to comply with the camelCase_ convention for private/protected member variables.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`

**Instructions:**
Use camelCase_ for private and protected members variables.

</details>
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Copy Markdown
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the doc! Just a nit of formatting.

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part2 branch from 30f74af to 9959e0c Compare November 18, 2025 00:56
@hantangwangd
Copy link
Copy Markdown
Member Author

@steveburnett thanks for the review, fixed! Please take a look when you have a minute.

steveburnett
steveburnett previously approved these changes Nov 18, 2025
Copy link
Copy Markdown
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull updated branch, new local doc build. Looks good, thanks!

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part2 branch from 9959e0c to 5b0e05a Compare November 26, 2025 09:01
@tdcmeehan tdcmeehan self-assigned this Nov 26, 2025
private final double minimumAssignedSplitWeight;
private final ConnectorSession session;

public CallDistributedProcedureSplitSource(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the most straightforward thing to do is to simply have a new method in ConnectorSplitSource that explicitly takes in ConnectorProcedureContext and ConnectorDistributedProcedureHandle. That way we can unify the implementation here and just use the existing IcebergSplitSource, and I also think it would be a lot more straightforward to understand compared to overloading the Iceberg Metadata object to also include this custom split source.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great idea and sounds very reasonable. I've made the change following your suggestion, and removed CallDistributedProcedureSplitSource completely. Please take a look when you get a chance, thanks a lot!

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part2 branch 2 times, most recently from 8ec72a0 to 0d136f9 Compare December 1, 2025 06:57
@hantangwangd
Copy link
Copy Markdown
Member Author

@tdcmeehan, it's a really great point for discussion. Thanks for bringing it up.

As far as I can see, there is an implicit constraint in row level DELETE and UPDATE behaviors: they do not alter the existing data file structure. Following this, in our current DELETE implementation framework, a page sink writes delete files only for a specific, targeted data file (and UPDATE works similarly). This differs fundamentally from operations like rewrite_data_files, which involve the concepts of compaction or optimization and can reorganize data across multiple data files.

Given that difference, reusing the existing DELETE/UPDATE framework to track which data files and delete files have been rewritten is a little challenging — it would likely require fairly big changes. Actually, as I understand, rewrite_data_files is closer to INSERT by SUBQUERY in how it reads and writes. And in INSERT by SUBQUERY, the page sink returns an empty referenceDataFile in the CommitTaskData.

Regarding your concern about the exposure of the procedure context, I agree that your concern is very reasonable. In the current design, I provided the IcebergAbstractMetadata.getProcedureContext() interface to allow external components (e.g. IcebergSplitManager) to access and manipulate it — primarily intended to reuse the file-planning behavior of the split source to avoid redundant file planning. However, from a strict visibility and isolation standpoint, this does introduce potential risks. As an alternative, we could avoid exposing the procedure context from IcebergAbstractMetadata entirely and instead perform a dedicated file-planning step within procedure rewrite_data_files itself to achieve the same outcome.

Does this approach sound reasonable to you? Any thoughts or alternatives would be greatly appreciated!

@tdcmeehan
Copy link
Copy Markdown
Contributor

@hantangwangd my point regarding DELETE is not that the operation itself is similar. Of course, I agree, logically it's more like a write. What I meant was that both DELETE and rewrite_data_files must track what files were deleted, as this procedure will in fact delete data files. I meant that it would be nice to follow the same convention to collect what files were written and deleted in order to avoid this context passing paradigm. For instance, if each worker's fragment included the source file path it processed, we could aggregate that information in finish() without needing coordinator-side state during execution.

Can you elaborate on how the dedicated file planning step would work? How would it know what files were rewritten?

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part2 branch 2 times, most recently from 90638b1 to b35da01 Compare December 4, 2025 14:27
@hantangwangd
Copy link
Copy Markdown
Member Author

Can you elaborate on how the dedicated file planning step would work? How would it know what files were rewritten?

Hi @tdcmeehan, I've just appended a new commit to the PR. In the latest change, I avoided exposing the Iceberg procedure context to external components and moved the logic of identifying which data/delete files to rewrite into the beginCallDistributedProcedure method of rewrite_data_files. By this stage, the optimization phase is done—the optimized plan tree and all pushed-down filters are finalized—so we have all the information needed to exactly determine the target files.

Please take a look when you get a chance and let me know what you think. Very glad to discuss and open to any suggestions. Thanks a lot!

private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("Target table does not exist"));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be done during finishCallDistributedProcedure, since the Table stored here itself is versioned by a snapshot ID?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a great idea. Moving the file planning after the successful write operation avoids unnecessary planning work in cases where the actual write might fail. We can simply pass the necessary information for file planning (e.g., the iceberg table layout) to the finishCallDistributedProcedure via the procedure context. Another benefit is that this approach eliminates the need to maintain scannedDataFiles and fullyAppliedDeleteFiles within the IcebergProcedureContext, which simplifies it significantly. Done!

procedureName.getSchemaName(),
procedureName.getObjectName()));
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
procedureContext = Optional.of((IcebergProcedureContext) ((DistributedProcedure) procedure).createContext());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we now make all of these parameters immutable and store them in the constructor?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've refactored it.

@tdcmeehan
Copy link
Copy Markdown
Contributor

Can you elaborate on how the dedicated file planning step would work? How would it know what files were rewritten?

Hi @tdcmeehan, I've just appended a new commit to the PR. In the latest change, I avoided exposing the Iceberg procedure context to external components and moved the logic of identifying which data/delete files to rewrite into the beginCallDistributedProcedure method of rewrite_data_files. By this stage, the optimization phase is done—the optimized plan tree and all pushed-down filters are finalized—so we have all the information needed to exactly determine the target files.

Please take a look when you get a chance and let me know what you think. Very glad to discuss and open to any suggestions. Thanks a lot!

Thanks for looking into that. I agree that this re-scanning approach seems reasonable.

@hantangwangd
Copy link
Copy Markdown
Member Author

@tdcmeehan thank you so much for the review and feedback. I've addressed your comments. Please take a look when you get a chance.

Copy link
Copy Markdown
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hantangwangd. This looks really great. I just have one more question about the context object.

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part2 branch from aa201e9 to c216a9d Compare December 8, 2025 17:53
@tdcmeehan tdcmeehan changed the title feat: Distributed Procedure Support Part 2/X - iceberg part changes feat(plugin-iceberg): Add rewrite_data_files Iceberg procedure Dec 8, 2025
@tdcmeehan tdcmeehan changed the title feat(plugin-iceberg): Add rewrite_data_files Iceberg procedure feat(plugin-iceberg): Add rewrite_data_files procedure Dec 8, 2025
@hantangwangd hantangwangd merged commit fa79e3c into prestodb:master Dec 8, 2025
91 of 95 checks passed
@PingLiuPing
Copy link
Copy Markdown
Contributor

@hantangwangd Sorry for the late response and thanks for the code.
Would you help me understand how this procedure will be executed in Prestissimo worker?
Should Prestissimo just responsible read those small data files and delete files and then write the consolidated data into new data files and then return the file metadata to coordinator? All other parts are accomplished by coordinator.

Thank you very much.

@hantangwangd
Copy link
Copy Markdown
Member Author

Hi @PingLiuPing. For data rewrite type distributed procedures, in Prestissimo we need to plan the CallDistributedProcedureNode in the fragment as a velox TableWriteNode, during which the executeProcedureHandle will be converted into an IcebergInsertTableHandle.

After this, yes, Prestissimo executes the same behavior as it does for an INSERT BY SUBQUERY. As you described, it is responsible for reading the small data and dete files, consolidating the data info new files, and returning the file metadata to the coordinator. All other operations are handled by the coordinator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Distributed Procedure Support - add rewrite_data_files procedure for Iceberg connector

4 participants