Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1271,9 +1271,9 @@ The following arguments are available:
===================== ========== =============== =======================================================================
Argument Name required type Description
===================== ========== =============== =======================================================================
``schema`` ✔️ string Schema of the table to update.
``schema`` Yes string Schema of the table to update.

``table_name`` ✔️ string Name of the table to update.
``table_name`` Yes string Name of the table to update.

``filter`` string Predicate as a string used for filtering the files. Currently
only rewrite of whole partitions is supported. Filter on partition
Expand Down
45 changes: 25 additions & 20 deletions presto-docs/src/main/sphinx/develop/procedures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -205,25 +205,26 @@ Additionally, the following three abstract methods defined by the base class ``D
.. code-block:: java

/**
* This method creates a connector-specific, or even a distributed procedure subtype-specific, context object.
* In connectors that support distributed procedures, it is invoked at the start of a distributed procedure's execution.
* The generated procedure context is then bound to the current ConnectorMetadata to maintain all contextual information
* involved throughout the execution, which may be utilized when the procedure finishes.
* Creates a connector-specific, or even a distributed procedure subtype-specific context object.
* For connectors that support distributed procedures, this method is invoked at the start of a distributed procedure's execution.
* The generated procedure context is then bound to the current ConnectorMetadata, maintaining all contextual information
* throughout the execution. This context would be accessed during calls to the procedure's {@link #begin} and {@link #finish} methods.
*/
public ConnectorProcedureContext createContext()
public ConnectorProcedureContext createContext(Object... arguments);

/**
* Performs the preparatory work required when starting the execution of this distributed procedure
* Performs the preparatory work required when starting the execution of this distributed procedure.
* */
public abstract ConnectorDistributedProcedureHandle begin(ConnectorSession session,
ConnectorProcedureContext procedureContext,
ConnectorTableLayoutHandle tableLayoutHandle,
Object[] arguments);

/**
* Performs the work required for the final centralized commit, after all distributed execution tasks have completed
* Performs the work required for the final centralized commit, after all distributed execution tasks have completed.
* */
public abstract void finish(ConnectorProcedureContext procedureContext,
public abstract void finish(ConnectorSession session,
ConnectorProcedureContext procedureContext,
ConnectorDistributedProcedureHandle procedureHandle,
Collection<Slice> fragments);

Expand All @@ -242,18 +243,18 @@ As an illustration, the ``TableDataRewriteDistributedProcedure`` subtype, which
{
private final BeginCallDistributedProcedure beginCallDistributedProcedure;
private final FinishCallDistributedProcedure finishCallDistributedProcedure;
private Supplier<ConnectorProcedureContext> contextSupplier;
private final Function<Object[], ConnectorProcedureContext> contextProvider;

public TableDataRewriteDistributedProcedure(String schema, String name,
List<Argument> arguments,
BeginCallDistributedProcedure beginCallDistributedProcedure,
FinishCallDistributedProcedure finishCallDistributedProcedure,
Supplier<ConnectorProcedureContext> contextSupplier)
Function<Object[], ConnectorProcedureContext> contextProvider)
{
super(TABLE_DATA_REWRITE, schema, name, arguments);
this.beginCallDistributedProcedure = requireNonNull(beginCallDistributedProcedure, "beginCallDistributedProcedure is null");
this.finishCallDistributedProcedure = requireNonNull(finishCallDistributedProcedure, "finishCallDistributedProcedure is null");
this.contextSupplier = requireNonNull(contextSupplier, "contextSupplier is null");
this.contextProvider = requireNonNull(contextProvider, "contextProvider is null");

// Performs subtype-specific validation and processing logic on the parameters
......
Expand All @@ -266,15 +267,15 @@ As an illustration, the ``TableDataRewriteDistributedProcedure`` subtype, which
}

@Override
public void finish(ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
public void finish(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
{
this.finishCallDistributedProcedure.finish(procedureContext, procedureHandle, fragments);
this.finishCallDistributedProcedure.finish(session, procedureContext, procedureHandle, fragments);
}

@Override
public ConnectorProcedureContext createContext()
public ConnectorProcedureContext createContext(Object... arguments)
{
return contextSupplier.get();
return contextProvider.apply(arguments);
}

@FunctionalInterface
Expand All @@ -286,7 +287,7 @@ As an illustration, the ``TableDataRewriteDistributedProcedure`` subtype, which
@FunctionalInterface
public interface FinishCallDistributedProcedure
{
void finish(ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments);
void finish(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments);
}
}

Expand Down Expand Up @@ -388,7 +389,7 @@ The parameters required to create a ``DistributedProcedure`` subclass differ, bu
described in the normal ``Procedure`` above.

* ``String schema`` - The schema namespace to which this procedure belongs (typically ``system`` in PrestoDB)
* ``String name`` - The name of this procedure, for example, ``expire_snapshots``
* ``String name`` - The name of this procedure, for example, ``rewrite_data_files``
* ``List<Argument> arguments`` - The parameter declarations list for this procedure

The following code demonstrates how to implement ``rewrite_data_files`` for the Iceberg connector, based on the ``TableDataRewriteDistributedProcedure`` class:
Expand Down Expand Up @@ -422,8 +423,12 @@ The following code demonstrates how to implement ``rewrite_data_files`` for the
new Argument("filter", VARCHAR, false, "TRUE"),
new Argument("options", "map(varchar, varchar)", false, null)),
(session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)),
IcebergProcedureContext::new);
((session, procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, tableHandle, fragments)),
arguments -> {
checkArgument(arguments.length == 2, format("invalid number of arguments: %s (should have %s)", arguments.length, 2));
checkArgument(arguments[0] instanceof Table && arguments[1] instanceof Transaction, "Invalid arguments, required: [Table, Transaction]");
return new IcebergProcedureContext((Table) arguments[0], (Transaction) arguments[1]);
});
}

private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
Expand All @@ -449,7 +454,7 @@ The following code demonstrates how to implement ``rewrite_data_files`` for the
}
}

private void finishCallDistributedProcedure(IcebergProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
private void finishCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
{
if (fragments.isEmpty() &&
procedureContext.getScannedDataFiles().isEmpty() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA;
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class RewriteDataFilesProcedure
Expand Down Expand Up @@ -102,7 +103,7 @@ public DistributedProcedure get()
(session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
((session, procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, tableHandle, fragments)),
arguments -> {
checkArgument(arguments.length == 2, "invalid arguments count: " + arguments.length);
checkArgument(arguments.length == 2, format("invalid number of arguments: %s (should have %s)", arguments.length, 2));
checkArgument(arguments[0] instanceof Table && arguments[1] instanceof Transaction, "Invalid arguments, required: [Table, Transaction]");
return new IcebergProcedureContext((Table) arguments[0], (Transaction) arguments[1]);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,22 @@ public DistributedProcedureType getType()
return type;
}

/**
* Performs the preparatory work required when starting the execution of this distributed procedure.
* */
public abstract ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments);

/**
* Performs the work required for the final centralized commit, after all distributed execution tasks have completed.
* */
public abstract void finish(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments);

/**
* Creates a connector-specific, or even a distributed procedure subtype-specific context object.
* For connectors that support distributed procedures, this method is invoked at the start of a distributed procedure's execution.
* The generated procedure context is then bound to the current ConnectorMetadata, maintaining all contextual information
* throughout the execution. This context would be accessed during calls to the procedure's {@link #begin} and {@link #finish} methods.
*/
public ConnectorProcedureContext createContext(Object... arguments)
{
throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, "createContext not supported");
Expand Down
Loading