diff --git a/docs/src/main/sphinx/develop/supporting-merge.rst b/docs/src/main/sphinx/develop/supporting-merge.rst index 793288f18a77..ca63dfc1d5ee 100644 --- a/docs/src/main/sphinx/develop/supporting-merge.rst +++ b/docs/src/main/sphinx/develop/supporting-merge.rst @@ -3,24 +3,24 @@ Supporting ``MERGE`` ==================== The Trino engine provides APIs to support row-level SQL ``MERGE``. -To implement ``MERGE``, a connector must provide an implementation -of ``ConnectorMergeSink``, which is typically layered on top of a -``ConnectorPageSink``, and define ``ConnectorMetadata`` -methods to get a "rowId" column handle; get the row change paradigm; -and to start and complete the ``MERGE`` operation. - -The Trino engine machinery used to implement SQL ``MERGE`` has now -been used to support SQL ``DELETE`` and ``UPDATE``, replacing the -previous implementations. This means that all a connector needs to -do is implement support for SQL ``MERGE``, and the connector gets -all the DML operations. +To implement ``MERGE``, a connector must provide the following: + +* An implementation of ``ConnectorMergeSink``, which is typically + layered on top of a ``ConnectorPageSink``. +* Methods in ``ConnectorMetadata`` to get a "rowId" column handle, get the + row change paradigm, and to start and complete the ``MERGE`` operation. + +The Trino engine machinery used to implement SQL ``MERGE`` is also used to +support SQL ``DELETE`` and ``UPDATE``. This means that all a connector needs to +do is implement support for SQL ``MERGE``, and the connector gets all the Data +Modification Language (DML) operations. Standard SQL ``MERGE`` ---------------------- Different query engines support varying definitions of SQL ``MERGE``. Trino supports the strict SQL specification ``ISO/IEC 9075``, published -in 2016. As a simple example, given tables ``target_table`` and +in 2016. As a simple example, given tables ``target_table`` and ``source_table`` defined as:: CREATE TABLE accounts ( @@ -50,7 +50,7 @@ Here is a possible ``MERGE`` operation, from ``monthly_accounts_update`` to INSERT (customer, purchases, address) VALUES (s.customer, s.purchases, s.address); -SQL ``MERGE`` tries to match each ``WHEN`` clause in source order. When +SQL ``MERGE`` tries to match each ``WHEN`` clause in source order. When a match is found, the corresponding ``DELETE``, ``INSERT`` or ``UPDATE`` is executed and subsequent ``WHEN`` clauses are ignored. @@ -61,14 +61,14 @@ when a row from the source table or query matches a row in the target table: * ``DELETE``, in which the target row is deleted. In the ``NOT MATCHED`` case, SQL ``MERGE`` supports only ``INSERT`` -operations. The values inserted are arbitrary but usually come from +operations. The values inserted are arbitrary but usually come from the unmatched row of the source table or query. ``RowChangeParadigm`` --------------------- Different connectors have different ways of representing row updates, -imposed by the underlying storage systems. The Trino engine classifies +imposed by the underlying storage systems. The Trino engine classifies these different paradigms as elements of the ``RowChangeParadigm`` enumeration, returned by enumeration, returned by method ``ConnectorMetadata.getRowChangeParadigm(...)``. @@ -76,33 +76,33 @@ enumeration, returned by enumeration, returned by method The ``RowChangeParadigm`` enumeration values are: * ``CHANGE_ONLY_UPDATED_COLUMNS``, intended for connectors that can update - individual columns of rows identified by a ``rowId``. The corresponding + individual columns of rows identified by a ``rowId``. The corresponding merge processor class is ``ChangeOnlyUpdatedColumnsMergeProcessor``. * ``DELETE_ROW_AND_INSERT_ROW``, intended for connectors that represent a - row change as a row deletion paired with a row insertion. The corresponding + row change as a row deletion paired with a row insertion. The corresponding merge processor class is ``DeleteAndInsertMergeProcessor``. Overview of ``MERGE`` processing -------------------------------- A ``MERGE`` statement is processed by creating a ``RIGHT JOIN`` between the -target table and the source, on the ``MERGE`` criteria. The source may be -a table or an arbitrary query. For each row in the source table or query, +target table and the source, on the ``MERGE`` criteria. The source may be +a table or an arbitrary query. For each row in the source table or query, ``MERGE`` produces a ``ROW`` object containing: -* the data column values from the ``UPDATE`` or ``INSERT`` cases. For the +* the data column values from the ``UPDATE`` or ``INSERT`` cases. For the ``DELETE`` cases, only the partition columns, which determine - partitioning and bucketing, are non-NULL. + partitioning and bucketing, are non-null. * a boolean column containing ``true`` for source rows that matched some target row, and ``false`` otherwise. * an integer that identifies whether the merge case operation is ``UPDATE``, - ``DELETE`` or ``INSERT``, or a source row for which no case matched. If a - source row does not match any merge case, all data column values except + ``DELETE`` or ``INSERT``, or a source row for which no case matched. If a + source row doesn't match any merge case, all data column values except those that determine distribution are null, and the operation number is -1. A ``SearchedCaseExpression`` is constructed from ``RIGHT JOIN`` result -to represent the ``WHEN`` clauses of the ``MERGE``. In the example above +to represent the ``WHEN`` clauses of the ``MERGE``. In the example preceding the ``MERGE`` is executed as if the ``SearchedCaseExpression`` were written as:: SELECT @@ -133,10 +133,10 @@ row, and ultimately creates a sequence of pages to be routed to the node that runs the ``ConnectorMergeSink.storeMergedRows(...)`` method. Like ``DELETE`` and ``UPDATE``, ``MERGE`` target table rows are identified by -a connector-specific ``rowId`` column handle. For ``MERGE``, the ``rowId`` +a connector-specific ``rowId`` column handle. For ``MERGE``, the ``rowId`` handle is returned by ``ConnectorMetadata.getMergeRowIdColumnHandle(...)``. -``MERGE`` Redistribution +``MERGE`` redistribution ------------------------ The Trino ``MERGE`` implementation allows ``UPDATE`` to change @@ -147,20 +147,20 @@ bucketing columns. Since the ``MERGE`` process in general requires redistribution of merged rows among Trino nodes, the order of rows in pages to be stored -are indeterminate. Connectors like Hive that depend on an ascending +are indeterminate. Connectors like Hive that depend on an ascending rowId order for deleted rows must sort the deleted rows before storing them. To ensure that all inserted rows for a given partition end up on a -single node, the redistribution hash on the partition key/bucket column(s) -is applied to the page partition key(s). As a result of the hash, all +single node, the redistribution hash on the partition key/bucket columns +is applied to the page partition keys. As a result of the hash, all rows for a specific partition/bucket hash together, whether they were ``MATCHED`` rows or ``NOT MATCHED`` rows. For connectors whose ``RowChangeParadigm`` is ``DELETE_ROW_AND_INSERT_ROW``, inserted rows are distributed using the layout supplied by -``ConnectorMetadata.getInsertLayout()``. For some connectors, the same -layout is used for updated rows. Other connectors require a special +``ConnectorMetadata.getInsertLayout()``. For some connectors, the same +layout is used for updated rows. Other connectors require a special layout for updated rows, supplied by ``ConnectorMetadata.getUpdateLayout()``. Connector support for ``MERGE`` @@ -173,12 +173,12 @@ To start ``MERGE`` processing, the Trino engine calls: * ``ConnectorMetadata.getRowChangeParadigm(...)`` to get the paradigm supported by the connector for changing existing table rows. * ``ConnectorMetadata.beginMerge(...)`` to get the a - ``ConnectorMergeTableHandle`` for the merge operation. That + ``ConnectorMergeTableHandle`` for the merge operation. That ``ConnectorMergeTableHandle`` object contains whatever information the connector needs to specify the ``MERGE`` operation. * ``ConnectorMetadata.getInsertLayout(...)``, from which it extracts the the list of partition or table columns that impact write redistribution. -* ``ConnectorMetadata.getUpdateLayout(...)``. If that layout is non-empty, +* ``ConnectorMetadata.getUpdateLayout(...)``. If that layout is non-empty, it is used to distribute updated rows resulting from the ``MERGE`` operation. @@ -187,29 +187,29 @@ On nodes that are targets of the hash, the Trino engine calls ``ConnectorMergeSink``. To write out each page of merged rows, the Trino engine calls -``ConnectorMergeSink.storeMergedRows(Page)``. The ``storeMergedRows(Page)`` +``ConnectorMergeSink.storeMergedRows(Page)``. The ``storeMergedRows(Page)`` method iterates over the rows in the page, performing updates and deletes in the ``MATCHED`` cases, and inserts in the ``NOT MATCHED`` cases. -For some ``RowChangeParadigm``s, ``UPDATE`` operations translated into the -corresponding ``DELETE`` and ``INSERT`` operations before -``storeMergedRows(Page)`` is called. +When using ``RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW``, the engine +translates ``UPDATE`` operations into a pair of ``DELETE`` and ``INSERT`` +operations before ``storeMergedRows(Page)`` is called. To complete the ``MERGE`` operation, the Trino engine calls ``ConnectorMetadata.finishMerge(...)``, passing the table handle -and a collection of JSON objects encoded as ``Slice`` instances. These +and a collection of JSON objects encoded as ``Slice`` instances. These objects contain connector-specific information specifying what was changed -by the ``MERGE`` operation. Typically this JSON object contains the files +by the ``MERGE`` operation. Typically this JSON object contains the files written and table and partition statistics generated by the ``MERGE`` -operation. The connector takes appropriate actions, if any. +operation. The connector takes appropriate actions, if any. ``RowChangeProcessor`` implementation for ``MERGE`` --------------------------------------------------- In the ``MERGE`` implementation, each ``RowChangeParadigm`` corresponds to an internal Trino engine class that implements interface -``RowChangeProcessor``. ``RowChangeProcessor`` has one interesting method: -``Page transformPage(Page)``. The format of the output page depends +``RowChangeProcessor``. ``RowChangeProcessor`` has one interesting method: +``Page transformPage(Page)``. The format of the output page depends on the ``RowChangeParadigm``. The connector has no access to the ``RowChangeProcessor`` instance -- it @@ -224,7 +224,7 @@ The page supplied to ``transformPage()`` consists of: null if not matched * The merge case ``RowBlock`` * The integer case number block -* The byte is_distinct block, with value 0 if not distinct. +* The byte ``is_distinct`` block, with value 0 if not distinct. The merge case ``RowBlock`` has the following layout: @@ -245,7 +245,7 @@ The page returned from ``transformPage`` consists of: * The merge case operation block. * The rowId block. * A byte block containing 1 if the row is an insert derived from an - update operation, and 0 otherwise. This block is used to correctly + update operation, and 0 otherwise. This block is used to correctly calculate the count of rows changed for connectors that represent updates and deletes plus inserts. @@ -258,15 +258,15 @@ Detecting duplicate matching target rows The SQL ``MERGE`` specification requires that in each ``MERGE`` case, a single target table row must match at most one source row, after -applying the ``MERGE`` case condition expression. The first step +applying the ``MERGE`` case condition expression. The first step toward finding these error is done by labeling each row in the target table with a unique id, using an ``AssignUniqueId`` node above the -target table scan. The projected results from the ``RIGHT JOIN`` +target table scan. The projected results from the ``RIGHT JOIN`` have these unique ids for matched target table rows as well as -the ``WHEN`` clause number. A ``MarkDistinct`` node adds an -"is_distinct" column which is true if no other row has the same -unique id and ``WHEN`` clause number, and false otherwise. If -any row has "is_distinct" = false, a +the ``WHEN`` clause number. A ``MarkDistinct`` node adds an +``is_distinct`` column which is true if no other row has the same +unique id and ``WHEN`` clause number, and false otherwise. If +any row has ``is_distinct`` equal to false, a ``MERGE_TARGET_ROW_MULTIPLE_MATCHES`` exception is raised and the ``MERGE`` operation fails. @@ -280,7 +280,7 @@ originally passed to ``ConnectorMetadata.beginMerge()``. ``ConnectorPageSinkProvider`` API --------------------------------- -To support SQL ``MERGE``,, ``ConnectorPageSinkProvider`` must implement +To support SQL ``MERGE``, ``ConnectorPageSinkProvider`` must implement the method that creates the ``ConnectorMergeSink``: * ``createMergeSink``:: @@ -293,7 +293,7 @@ the method that creates the ``ConnectorMergeSink``: ``ConnectorMergeSink`` API -------------------------- -As mentioned above, to support ``MERGE``, the connector must define an +To support ``MERGE``, the connector must define an implementation of ``ConnectorMergeSink``, usually layered over the connector's ``ConnectorPageSink``. @@ -311,23 +311,22 @@ The only interesting methods are: ``ConnectorPageSinkProvider.createMergeSink()``, passing the page generated by the ``RowChangeProcessor.transformPage()`` method. That page consists of all table columns, in table column order, - followed by the rowId column, followed by the operation column - from the merge case ``RowBlock``. + followed by the ``TINYINT`` operation column, followed by the rowId column. The job of ``storeMergedRows()`` is iterate over the rows in the page, and process them based on the value of the operation column, ``INSERT``, - ``DELETE``, ``UPDATE``, or ignore the row. By choosing appropriate + ``DELETE``, ``UPDATE``, or ignore the row. By choosing appropriate paradigm, the connector can request that the UPDATE operation be transformed into ``DELETE`` and ``INSERT`` operations. * ``finish``:: - ``CompletableFuture> finish()`` + CompletableFuture> finish() The Trino engine calls ``finish()`` when all the data has been processed by - a specific ``ConnectorMergeSink`` instance. The connector returns a future + a specific ``ConnectorMergeSink`` instance. The connector returns a future containing a collection of ``Slice``, representing connector-specific - information about the rows processed. Usually this includes the row count, + information about the rows processed. Usually this includes the row count, and might include information like the files or partitions created or changed. @@ -344,9 +343,11 @@ methods. ConnectorTableHandle tableHandle) This method is called as the engine starts processing a ``MERGE`` statement. - The connector must return a ``RowChangeParadigm`` enum instance. If the - connector does not support ``MERGE`` it should throw a ``NOT_SUPPORTED`` - exception, meaning that SQL ``MERGE`` is not supported by the connector. + The connector must return a ``RowChangeParadigm`` enumeration instance. If + the connector doesn't support ``MERGE``, then it should throw a + ``NOT_SUPPORTED`` exception to indicate that SQL ``MERGE`` isn't supported by + the connector. Note that the default implementation already throws this + exception when the method isn't implemented. * ``getMergeRowIdColumnHandle()``:: @@ -355,7 +356,7 @@ methods. ConnectorTableHandle tableHandle) This method is called in the early stages of query planning for ``MERGE`` - statements. The ColumnHandle returned provides the ``rowId`` used by the + statements. The ColumnHandle returned provides the ``rowId`` used by the connector to identify rows to be merged, as well as any other fields of the row that the connector needs to complete the ``MERGE`` operation. @@ -365,9 +366,9 @@ methods. ConnectorSession session, ConnectorTableHandle tableHandle) - This method is called during query planning to get the table layout to be - used for rows inserted by the ``MERGE`` operation. For some connectors, - this layout will be used for rows deleted as well. + This method is called during query planning to get the table layout to be + used for rows inserted by the ``MERGE`` operation. For some connectors, + this layout is used for rows deleted as well. * ``getUpdateLayout()``:: @@ -375,33 +376,31 @@ methods. ConnectorSession session, ConnectorTableHandle tableHandle) - This method is called during query planning to get the table layout to - be used for rows deleted by the ``MERGE`` operation. If the optional - return value is present, the Trino engine will use the layout for - updated rows. Otherwise, it will use the result of - ``ConnectorMetadata.getInsertLayout`` to distribute updated rows. + This method is called during query planning to get the table layout to be + used for rows deleted by the ``MERGE`` operation. If the optional return + value is present, the Trino engine uses the layout for updated rows. + Otherwise, it uses the result of ``ConnectorMetadata.getInsertLayout`` to + distribute updated rows. * ``beginMerge()``:: ConnectorMergeTableHandle beginMerge( ConnectorSession session, - ConnectorTableHandle tableHandle, - MergeDetails mergeDetails) + ConnectorTableHandle tableHandle) As the last step in creating the ``MERGE`` execution plan, the connector's - ``beginMerge()`` method is called, passing the ``session``, the - ``tableHandle`` and the ``MergeDetails`` object. + ``beginMerge()`` method is called, passing the ``session``, and the + ``tableHandle``. ``beginMerge()`` performs any orchestration needed in the connector to - start processing the ``MERGE``. This orchestration varies from connector - to connector. In the case of Hive connector operating on ACID tables, + start processing the ``MERGE``. This orchestration varies from connector + to connector. In the case of Hive connector operating on transactional tables, for example, ``beginMerge()`` checks that the table is transactional and - that all updated columns are writable, and starts a Hive Metastore - transaction. + starts a Hive Metastore transaction. ``beginMerge()`` returns a ``ConnectorMergeTableHandle`` with any added information the connector needs when the handle is passed back to - ``finishMerge()`` and the split generation machinery. For most + ``finishMerge()`` and the split generation machinery. For most connectors, the returned table handle contains at least a flag identifying the table handle as a table handle for a ``MERGE`` operation. @@ -413,9 +412,9 @@ methods. Collection fragments) During ``MERGE`` processing, the Trino engine accumulates the ``Slice`` - collections returned by ``ConnectorMergeSink.finish()``. The engine calls + collections returned by ``ConnectorMergeSink.finish()``. The engine calls ``finishMerge()``, passing the table handle and that collection of - ``Slice`` fragments. In response, the connector takes appropriate actions - to complete the ``MERGE`` operation. Those actions might include - committing an underlying transaction (if any) or freeing any other + ``Slice`` fragments. In response, the connector takes appropriate actions + to complete the ``MERGE`` operation. Those actions might include + committing an underlying transaction, if any, or freeing any other resources.