From 09b49674b92dbd50887918da31603d508cd55df0 Mon Sep 17 00:00:00 2001 From: wangd Date: Sat, 3 Jan 2026 12:18:04 +0800 Subject: [PATCH] misc: Add method comments and update docs for distributed procedures --- .../src/main/sphinx/connector/iceberg.rst | 4 +- .../src/main/sphinx/develop/procedures.rst | 45 ++++++++++--------- .../procedure/RewriteDataFilesProcedure.java | 3 +- .../spi/procedure/DistributedProcedure.java | 12 +++++ 4 files changed, 41 insertions(+), 23 deletions(-) diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index f41c85411c579..ea5fcf52dd3c3 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1271,9 +1271,9 @@ The following arguments are available: ===================== ========== =============== ======================================================================= Argument Name required type Description ===================== ========== =============== ======================================================================= -``schema`` ✔️ string Schema of the table to update. +``schema`` Yes string Schema of the table to update. -``table_name`` ✔️ string Name of the table to update. +``table_name`` Yes string Name of the table to update. ``filter`` string Predicate as a string used for filtering the files. Currently only rewrite of whole partitions is supported. Filter on partition diff --git a/presto-docs/src/main/sphinx/develop/procedures.rst b/presto-docs/src/main/sphinx/develop/procedures.rst index 47ba0782ef9e9..6142b3dccd9af 100644 --- a/presto-docs/src/main/sphinx/develop/procedures.rst +++ b/presto-docs/src/main/sphinx/develop/procedures.rst @@ -205,15 +205,15 @@ Additionally, the following three abstract methods defined by the base class ``D .. code-block:: java /** - * This method creates a connector-specific, or even a distributed procedure subtype-specific, context object. - * In connectors that support distributed procedures, it is invoked at the start of a distributed procedure's execution. - * The generated procedure context is then bound to the current ConnectorMetadata to maintain all contextual information - * involved throughout the execution, which may be utilized when the procedure finishes. + * Creates a connector-specific, or even a distributed procedure subtype-specific context object. + * For connectors that support distributed procedures, this method is invoked at the start of a distributed procedure's execution. + * The generated procedure context is then bound to the current ConnectorMetadata, maintaining all contextual information + * throughout the execution. This context would be accessed during calls to the procedure's {@link #begin} and {@link #finish} methods. */ - public ConnectorProcedureContext createContext() + public ConnectorProcedureContext createContext(Object... arguments); /** - * Performs the preparatory work required when starting the execution of this distributed procedure + * Performs the preparatory work required when starting the execution of this distributed procedure. * */ public abstract ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, @@ -221,9 +221,10 @@ Additionally, the following three abstract methods defined by the base class ``D Object[] arguments); /** - * Performs the work required for the final centralized commit, after all distributed execution tasks have completed + * Performs the work required for the final centralized commit, after all distributed execution tasks have completed. * */ - public abstract void finish(ConnectorProcedureContext procedureContext, + public abstract void finish(ConnectorSession session, + ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments); @@ -242,18 +243,18 @@ As an illustration, the ``TableDataRewriteDistributedProcedure`` subtype, which { private final BeginCallDistributedProcedure beginCallDistributedProcedure; private final FinishCallDistributedProcedure finishCallDistributedProcedure; - private Supplier contextSupplier; + private final Function contextProvider; public TableDataRewriteDistributedProcedure(String schema, String name, List arguments, BeginCallDistributedProcedure beginCallDistributedProcedure, FinishCallDistributedProcedure finishCallDistributedProcedure, - Supplier contextSupplier) + Function contextProvider) { super(TABLE_DATA_REWRITE, schema, name, arguments); this.beginCallDistributedProcedure = requireNonNull(beginCallDistributedProcedure, "beginCallDistributedProcedure is null"); this.finishCallDistributedProcedure = requireNonNull(finishCallDistributedProcedure, "finishCallDistributedProcedure is null"); - this.contextSupplier = requireNonNull(contextSupplier, "contextSupplier is null"); + this.contextProvider = requireNonNull(contextProvider, "contextProvider is null"); // Performs subtype-specific validation and processing logic on the parameters ...... @@ -266,15 +267,15 @@ As an illustration, the ``TableDataRewriteDistributedProcedure`` subtype, which } @Override - public void finish(ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments) + public void finish(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments) { - this.finishCallDistributedProcedure.finish(procedureContext, procedureHandle, fragments); + this.finishCallDistributedProcedure.finish(session, procedureContext, procedureHandle, fragments); } @Override - public ConnectorProcedureContext createContext() + public ConnectorProcedureContext createContext(Object... arguments) { - return contextSupplier.get(); + return contextProvider.apply(arguments); } @FunctionalInterface @@ -286,7 +287,7 @@ As an illustration, the ``TableDataRewriteDistributedProcedure`` subtype, which @FunctionalInterface public interface FinishCallDistributedProcedure { - void finish(ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments); + void finish(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments); } } @@ -388,7 +389,7 @@ The parameters required to create a ``DistributedProcedure`` subclass differ, bu described in the normal ``Procedure`` above. * ``String schema`` - The schema namespace to which this procedure belongs (typically ``system`` in PrestoDB) -* ``String name`` - The name of this procedure, for example, ``expire_snapshots`` +* ``String name`` - The name of this procedure, for example, ``rewrite_data_files`` * ``List arguments`` - The parameter declarations list for this procedure The following code demonstrates how to implement ``rewrite_data_files`` for the Iceberg connector, based on the ``TableDataRewriteDistributedProcedure`` class: @@ -422,8 +423,12 @@ The following code demonstrates how to implement ``rewrite_data_files`` for the new Argument("filter", VARCHAR, false, "TRUE"), new Argument("options", "map(varchar, varchar)", false, null)), (session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments), - ((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)), - IcebergProcedureContext::new); + ((session, procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, tableHandle, fragments)), + arguments -> { + checkArgument(arguments.length == 2, format("invalid number of arguments: %s (should have %s)", arguments.length, 2)); + checkArgument(arguments[0] instanceof Table && arguments[1] instanceof Transaction, "Invalid arguments, required: [Table, Transaction]"); + return new IcebergProcedureContext((Table) arguments[0], (Transaction) arguments[1]); + }); } private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments) @@ -449,7 +454,7 @@ The following code demonstrates how to implement ``rewrite_data_files`` for the } } - private void finishCallDistributedProcedure(IcebergProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments) + private void finishCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments) { if (fragments.isEmpty() && procedureContext.getScannedDataFiles().isEmpty() && diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java index 50dc0d92cff6e..9873e5f89d3ed 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java @@ -71,6 +71,7 @@ import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA; import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME; import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class RewriteDataFilesProcedure @@ -102,7 +103,7 @@ public DistributedProcedure get() (session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments), ((session, procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, tableHandle, fragments)), arguments -> { - checkArgument(arguments.length == 2, "invalid arguments count: " + arguments.length); + checkArgument(arguments.length == 2, format("invalid number of arguments: %s (should have %s)", arguments.length, 2)); checkArgument(arguments[0] instanceof Table && arguments[1] instanceof Transaction, "Invalid arguments, required: [Table, Transaction]"); return new IcebergProcedureContext((Table) arguments[0], (Transaction) arguments[1]); }); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/procedure/DistributedProcedure.java b/presto-spi/src/main/java/com/facebook/presto/spi/procedure/DistributedProcedure.java index c400bb6311b9f..5d97d9fb33bab 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/procedure/DistributedProcedure.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/procedure/DistributedProcedure.java @@ -44,10 +44,22 @@ public DistributedProcedureType getType() return type; } + /** + * Performs the preparatory work required when starting the execution of this distributed procedure. + * */ public abstract ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments); + /** + * Performs the work required for the final centralized commit, after all distributed execution tasks have completed. + * */ public abstract void finish(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments); + /** + * Creates a connector-specific, or even a distributed procedure subtype-specific context object. + * For connectors that support distributed procedures, this method is invoked at the start of a distributed procedure's execution. + * The generated procedure context is then bound to the current ConnectorMetadata, maintaining all contextual information + * throughout the execution. This context would be accessed during calls to the procedure's {@link #begin} and {@link #finish} methods. + */ public ConnectorProcedureContext createContext(Object... arguments) { throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, "createContext not supported");