Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 82 additions & 83 deletions docs/src/main/sphinx/develop/supporting-merge.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@ Supporting ``MERGE``
====================

The Trino engine provides APIs to support row-level SQL ``MERGE``.
To implement ``MERGE``, a connector must provide an implementation
of ``ConnectorMergeSink``, which is typically layered on top of a
``ConnectorPageSink``, and define ``ConnectorMetadata``
methods to get a "rowId" column handle; get the row change paradigm;
and to start and complete the ``MERGE`` operation.

The Trino engine machinery used to implement SQL ``MERGE`` has now
been used to support SQL ``DELETE`` and ``UPDATE``, replacing the
previous implementations. This means that all a connector needs to
do is implement support for SQL ``MERGE``, and the connector gets
all the DML operations.
To implement ``MERGE``, a connector must provide the following:

* An implementation of ``ConnectorMergeSink``, which is typically
layered on top of a ``ConnectorPageSink``.
* Methods in ``ConnectorMetadata`` to get a "rowId" column handle, get the
row change paradigm, and to start and complete the ``MERGE`` operation.

The Trino engine machinery used to implement SQL ``MERGE`` is also used to
support SQL ``DELETE`` and ``UPDATE``. This means that all a connector needs to
do is implement support for SQL ``MERGE``, and the connector gets all the Data
Modification Language (DML) operations.

Standard SQL ``MERGE``
----------------------

Different query engines support varying definitions of SQL ``MERGE``.
Trino supports the strict SQL specification ``ISO/IEC 9075``, published
in 2016. As a simple example, given tables ``target_table`` and
in 2016. As a simple example, given tables ``target_table`` and
``source_table`` defined as::

CREATE TABLE accounts (
Expand Down Expand Up @@ -50,7 +50,7 @@ Here is a possible ``MERGE`` operation, from ``monthly_accounts_update`` to
INSERT (customer, purchases, address)
VALUES (s.customer, s.purchases, s.address);

SQL ``MERGE`` tries to match each ``WHEN`` clause in source order. When
SQL ``MERGE`` tries to match each ``WHEN`` clause in source order. When
a match is found, the corresponding ``DELETE``, ``INSERT`` or ``UPDATE``
is executed and subsequent ``WHEN`` clauses are ignored.

Expand All @@ -61,48 +61,48 @@ when a row from the source table or query matches a row in the target table:
* ``DELETE``, in which the target row is deleted.

In the ``NOT MATCHED`` case, SQL ``MERGE`` supports only ``INSERT``
operations. The values inserted are arbitrary but usually come from
operations. The values inserted are arbitrary but usually come from
the unmatched row of the source table or query.

``RowChangeParadigm``
---------------------

Different connectors have different ways of representing row updates,
imposed by the underlying storage systems. The Trino engine classifies
imposed by the underlying storage systems. The Trino engine classifies
these different paradigms as elements of the ``RowChangeParadigm``
enumeration, returned by enumeration, returned by method
``ConnectorMetadata.getRowChangeParadigm(...)``.

The ``RowChangeParadigm`` enumeration values are:

* ``CHANGE_ONLY_UPDATED_COLUMNS``, intended for connectors that can update
individual columns of rows identified by a ``rowId``. The corresponding
individual columns of rows identified by a ``rowId``. The corresponding
merge processor class is ``ChangeOnlyUpdatedColumnsMergeProcessor``.
* ``DELETE_ROW_AND_INSERT_ROW``, intended for connectors that represent a
row change as a row deletion paired with a row insertion. The corresponding
row change as a row deletion paired with a row insertion. The corresponding
merge processor class is ``DeleteAndInsertMergeProcessor``.

Overview of ``MERGE`` processing
--------------------------------

A ``MERGE`` statement is processed by creating a ``RIGHT JOIN`` between the
target table and the source, on the ``MERGE`` criteria. The source may be
a table or an arbitrary query. For each row in the source table or query,
target table and the source, on the ``MERGE`` criteria. The source may be
a table or an arbitrary query. For each row in the source table or query,
``MERGE`` produces a ``ROW`` object containing:

* the data column values from the ``UPDATE`` or ``INSERT`` cases. For the
* the data column values from the ``UPDATE`` or ``INSERT`` cases. For the
``DELETE`` cases, only the partition columns, which determine
partitioning and bucketing, are non-NULL.
partitioning and bucketing, are non-null.
* a boolean column containing ``true`` for source rows that matched some
target row, and ``false`` otherwise.
* an integer that identifies whether the merge case operation is ``UPDATE``,
``DELETE`` or ``INSERT``, or a source row for which no case matched. If a
source row does not match any merge case, all data column values except
``DELETE`` or ``INSERT``, or a source row for which no case matched. If a
source row doesn't match any merge case, all data column values except
those that determine distribution are null, and the operation number
is -1.

A ``SearchedCaseExpression`` is constructed from ``RIGHT JOIN`` result
to represent the ``WHEN`` clauses of the ``MERGE``. In the example above
to represent the ``WHEN`` clauses of the ``MERGE``. In the example preceding
the ``MERGE`` is executed as if the ``SearchedCaseExpression`` were written as::

SELECT
Expand Down Expand Up @@ -133,10 +133,10 @@ row, and ultimately creates a sequence of pages to be routed to the node that
runs the ``ConnectorMergeSink.storeMergedRows(...)`` method.

Like ``DELETE`` and ``UPDATE``, ``MERGE`` target table rows are identified by
a connector-specific ``rowId`` column handle. For ``MERGE``, the ``rowId``
a connector-specific ``rowId`` column handle. For ``MERGE``, the ``rowId``
handle is returned by ``ConnectorMetadata.getMergeRowIdColumnHandle(...)``.

``MERGE`` Redistribution
``MERGE`` redistribution
------------------------

The Trino ``MERGE`` implementation allows ``UPDATE`` to change
Expand All @@ -147,20 +147,20 @@ bucketing columns.

Since the ``MERGE`` process in general requires redistribution of
merged rows among Trino nodes, the order of rows in pages to be stored
are indeterminate. Connectors like Hive that depend on an ascending
are indeterminate. Connectors like Hive that depend on an ascending
rowId order for deleted rows must sort the deleted rows before storing
them.

To ensure that all inserted rows for a given partition end up on a
single node, the redistribution hash on the partition key/bucket column(s)
is applied to the page partition key(s). As a result of the hash, all
single node, the redistribution hash on the partition key/bucket columns
is applied to the page partition keys. As a result of the hash, all
rows for a specific partition/bucket hash together, whether they
were ``MATCHED`` rows or ``NOT MATCHED`` rows.

For connectors whose ``RowChangeParadigm`` is ``DELETE_ROW_AND_INSERT_ROW``,
inserted rows are distributed using the layout supplied by
``ConnectorMetadata.getInsertLayout()``. For some connectors, the same
layout is used for updated rows. Other connectors require a special
``ConnectorMetadata.getInsertLayout()``. For some connectors, the same
layout is used for updated rows. Other connectors require a special
layout for updated rows, supplied by ``ConnectorMetadata.getUpdateLayout()``.

Connector support for ``MERGE``
Expand All @@ -173,12 +173,12 @@ To start ``MERGE`` processing, the Trino engine calls:
* ``ConnectorMetadata.getRowChangeParadigm(...)`` to get the paradigm
supported by the connector for changing existing table rows.
* ``ConnectorMetadata.beginMerge(...)`` to get the a
``ConnectorMergeTableHandle`` for the merge operation. That
``ConnectorMergeTableHandle`` for the merge operation. That
``ConnectorMergeTableHandle`` object contains whatever information the
connector needs to specify the ``MERGE`` operation.
* ``ConnectorMetadata.getInsertLayout(...)``, from which it extracts the
the list of partition or table columns that impact write redistribution.
* ``ConnectorMetadata.getUpdateLayout(...)``. If that layout is non-empty,
* ``ConnectorMetadata.getUpdateLayout(...)``. If that layout is non-empty,
it is used to distribute updated rows resulting from the ``MERGE``
operation.

Expand All @@ -187,29 +187,29 @@ On nodes that are targets of the hash, the Trino engine calls
``ConnectorMergeSink``.

To write out each page of merged rows, the Trino engine calls
``ConnectorMergeSink.storeMergedRows(Page)``. The ``storeMergedRows(Page)``
``ConnectorMergeSink.storeMergedRows(Page)``. The ``storeMergedRows(Page)``
method iterates over the rows in the page, performing updates and deletes
in the ``MATCHED`` cases, and inserts in the ``NOT MATCHED`` cases.

For some ``RowChangeParadigm``s, ``UPDATE`` operations translated into the
corresponding ``DELETE`` and ``INSERT`` operations before
``storeMergedRows(Page)`` is called.
When using ``RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW``, the engine
translates ``UPDATE`` operations into a pair of ``DELETE`` and ``INSERT``
operations before ``storeMergedRows(Page)`` is called.

To complete the ``MERGE`` operation, the Trino engine calls
``ConnectorMetadata.finishMerge(...)``, passing the table handle
and a collection of JSON objects encoded as ``Slice`` instances. These
and a collection of JSON objects encoded as ``Slice`` instances. These
objects contain connector-specific information specifying what was changed
by the ``MERGE`` operation. Typically this JSON object contains the files
by the ``MERGE`` operation. Typically this JSON object contains the files
written and table and partition statistics generated by the ``MERGE``
operation. The connector takes appropriate actions, if any.
operation. The connector takes appropriate actions, if any.

``RowChangeProcessor`` implementation for ``MERGE``
---------------------------------------------------

In the ``MERGE`` implementation, each ``RowChangeParadigm``
corresponds to an internal Trino engine class that implements interface
``RowChangeProcessor``. ``RowChangeProcessor`` has one interesting method:
``Page transformPage(Page)``. The format of the output page depends
``RowChangeProcessor``. ``RowChangeProcessor`` has one interesting method:
``Page transformPage(Page)``. The format of the output page depends
on the ``RowChangeParadigm``.

The connector has no access to the ``RowChangeProcessor`` instance -- it
Expand All @@ -224,7 +224,7 @@ The page supplied to ``transformPage()`` consists of:
null if not matched
* The merge case ``RowBlock``
* The integer case number block
* The byte is_distinct block, with value 0 if not distinct.
* The byte ``is_distinct`` block, with value 0 if not distinct.

The merge case ``RowBlock`` has the following layout:

Expand All @@ -245,7 +245,7 @@ The page returned from ``transformPage`` consists of:
* The merge case operation block.
* The rowId block.
* A byte block containing 1 if the row is an insert derived from an
update operation, and 0 otherwise. This block is used to correctly
update operation, and 0 otherwise. This block is used to correctly
calculate the count of rows changed for connectors that represent
updates and deletes plus inserts.

Expand All @@ -258,15 +258,15 @@ Detecting duplicate matching target rows

The SQL ``MERGE`` specification requires that in each ``MERGE`` case,
a single target table row must match at most one source row, after
applying the ``MERGE`` case condition expression. The first step
applying the ``MERGE`` case condition expression. The first step
toward finding these error is done by labeling each row in the target
table with a unique id, using an ``AssignUniqueId`` node above the
target table scan. The projected results from the ``RIGHT JOIN``
target table scan. The projected results from the ``RIGHT JOIN``
have these unique ids for matched target table rows as well as
the ``WHEN`` clause number. A ``MarkDistinct`` node adds an
"is_distinct" column which is true if no other row has the same
unique id and ``WHEN`` clause number, and false otherwise. If
any row has "is_distinct" = false, a
the ``WHEN`` clause number. A ``MarkDistinct`` node adds an
``is_distinct`` column which is true if no other row has the same
unique id and ``WHEN`` clause number, and false otherwise. If
any row has ``is_distinct`` equal to false, a
``MERGE_TARGET_ROW_MULTIPLE_MATCHES`` exception is raised and
the ``MERGE`` operation fails.

Expand All @@ -280,7 +280,7 @@ originally passed to ``ConnectorMetadata.beginMerge()``.
``ConnectorPageSinkProvider`` API
---------------------------------

To support SQL ``MERGE``,, ``ConnectorPageSinkProvider`` must implement
To support SQL ``MERGE``, ``ConnectorPageSinkProvider`` must implement
the method that creates the ``ConnectorMergeSink``:

* ``createMergeSink``::
Expand All @@ -293,7 +293,7 @@ the method that creates the ``ConnectorMergeSink``:
``ConnectorMergeSink`` API
--------------------------

As mentioned above, to support ``MERGE``, the connector must define an
To support ``MERGE``, the connector must define an
implementation of ``ConnectorMergeSink``, usually layered over the
connector's ``ConnectorPageSink``.

Expand All @@ -311,23 +311,22 @@ The only interesting methods are:
``ConnectorPageSinkProvider.createMergeSink()``, passing the page
generated by the ``RowChangeProcessor.transformPage()`` method.
That page consists of all table columns, in table column order,
followed by the rowId column, followed by the operation column
from the merge case ``RowBlock``.
followed by the ``TINYINT`` operation column, followed by the rowId column.

The job of ``storeMergedRows()`` is iterate over the rows in the page,
and process them based on the value of the operation column, ``INSERT``,
``DELETE``, ``UPDATE``, or ignore the row. By choosing appropriate
``DELETE``, ``UPDATE``, or ignore the row. By choosing appropriate
paradigm, the connector can request that the UPDATE operation be
transformed into ``DELETE`` and ``INSERT`` operations.

* ``finish``::

``CompletableFuture<Collection<Slice>> finish()``
CompletableFuture<Collection<Slice>> finish()

The Trino engine calls ``finish()`` when all the data has been processed by
a specific ``ConnectorMergeSink`` instance. The connector returns a future
a specific ``ConnectorMergeSink`` instance. The connector returns a future
containing a collection of ``Slice``, representing connector-specific
information about the rows processed. Usually this includes the row count,
information about the rows processed. Usually this includes the row count,
and might include information like the files or partitions created or
changed.

Expand All @@ -344,9 +343,11 @@ methods.
ConnectorTableHandle tableHandle)

This method is called as the engine starts processing a ``MERGE`` statement.
The connector must return a ``RowChangeParadigm`` enum instance. If the
connector does not support ``MERGE`` it should throw a ``NOT_SUPPORTED``
exception, meaning that SQL ``MERGE`` is not supported by the connector.
The connector must return a ``RowChangeParadigm`` enumeration instance. If
the connector doesn't support ``MERGE``, then it should throw a
``NOT_SUPPORTED`` exception to indicate that SQL ``MERGE`` isn't supported by
the connector. Note that the default implementation already throws this
exception when the method isn't implemented.

* ``getMergeRowIdColumnHandle()``::

Expand All @@ -355,7 +356,7 @@ methods.
ConnectorTableHandle tableHandle)

This method is called in the early stages of query planning for ``MERGE``
statements. The ColumnHandle returned provides the ``rowId`` used by the
statements. The ColumnHandle returned provides the ``rowId`` used by the
connector to identify rows to be merged, as well as any other fields of
the row that the connector needs to complete the ``MERGE`` operation.

Expand All @@ -365,43 +366,41 @@ methods.
ConnectorSession session,
ConnectorTableHandle tableHandle)

This method is called during query planning to get the table layout to be
used for rows inserted by the ``MERGE`` operation. For some connectors,
this layout will be used for rows deleted as well.
This method is called during query planning to get the table layout to be
used for rows inserted by the ``MERGE`` operation. For some connectors,
this layout is used for rows deleted as well.

* ``getUpdateLayout()``::

Optional<ConnectorTableLayout> getUpdateLayout(
ConnectorSession session,
ConnectorTableHandle tableHandle)

This method is called during query planning to get the table layout to
be used for rows deleted by the ``MERGE`` operation. If the optional
return value is present, the Trino engine will use the layout for
updated rows. Otherwise, it will use the result of
``ConnectorMetadata.getInsertLayout`` to distribute updated rows.
This method is called during query planning to get the table layout to be
used for rows deleted by the ``MERGE`` operation. If the optional return
value is present, the Trino engine uses the layout for updated rows.
Otherwise, it uses the result of ``ConnectorMetadata.getInsertLayout`` to
distribute updated rows.

* ``beginMerge()``::

ConnectorMergeTableHandle beginMerge(
ConnectorSession session,
ConnectorTableHandle tableHandle,
MergeDetails mergeDetails)
ConnectorTableHandle tableHandle)

As the last step in creating the ``MERGE`` execution plan, the connector's
``beginMerge()`` method is called, passing the ``session``, the
``tableHandle`` and the ``MergeDetails`` object.
``beginMerge()`` method is called, passing the ``session``, and the
``tableHandle``.

``beginMerge()`` performs any orchestration needed in the connector to
start processing the ``MERGE``. This orchestration varies from connector
to connector. In the case of Hive connector operating on ACID tables,
start processing the ``MERGE``. This orchestration varies from connector
to connector. In the case of Hive connector operating on transactional tables,
for example, ``beginMerge()`` checks that the table is transactional and
that all updated columns are writable, and starts a Hive Metastore
transaction.
starts a Hive Metastore transaction.

``beginMerge()`` returns a ``ConnectorMergeTableHandle`` with any added
information the connector needs when the handle is passed back to
``finishMerge()`` and the split generation machinery. For most
``finishMerge()`` and the split generation machinery. For most
connectors, the returned table handle contains at least a flag identifying
the table handle as a table handle for a ``MERGE`` operation.

Expand All @@ -413,9 +412,9 @@ methods.
Collection<Slice> fragments)

During ``MERGE`` processing, the Trino engine accumulates the ``Slice``
collections returned by ``ConnectorMergeSink.finish()``. The engine calls
collections returned by ``ConnectorMergeSink.finish()``. The engine calls
``finishMerge()``, passing the table handle and that collection of
``Slice`` fragments. In response, the connector takes appropriate actions
to complete the ``MERGE`` operation. Those actions might include
committing an underlying transaction (if any) or freeing any other
``Slice`` fragments. In response, the connector takes appropriate actions
to complete the ``MERGE`` operation. Those actions might include
committing an underlying transaction, if any, or freeing any other
resources.