diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/Block.java b/core/trino-spi/src/main/java/io/trino/spi/block/Block.java index 7c111523d0ee..68b4ad233fb8 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/Block.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/Block.java @@ -311,7 +311,9 @@ default Block getLoadedBlock() } /** - * Gets the direct child blocks of this block. + * Gets the direct child blocks of this block. This method is an internal subroutine + * of the {@link Block} API, and should never be called directly. To take apart + * {@link Block} components, see {@link ColumnarRow} */ default List getChildren() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveUpdatablePageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveUpdatablePageSource.java index 5a019f2e1c6a..3318f7eaf869 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveUpdatablePageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveUpdatablePageSource.java @@ -22,8 +22,8 @@ import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; +import io.trino.spi.block.ColumnarRow; import io.trino.spi.block.LongArrayBlock; -import io.trino.spi.block.RowBlock; import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorSession; @@ -43,6 +43,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static io.trino.plugin.hive.PartitionAndStatementId.CODEC; import static io.trino.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES; +import static io.trino.spi.block.ColumnarRow.toColumnarRow; import static io.trino.spi.predicate.Utils.nativeValueToBlock; import static io.trino.spi.type.BigintType.BIGINT; import static java.lang.String.format; @@ -116,28 +117,32 @@ public HiveUpdatablePageSource( @Override public void deleteRows(Block rowIds) { - List blocks = rowIds.getChildren(); - checkArgument(blocks.size() == 3, "The rowId block for DELETE should have 3 children, but has %s", blocks.size()); - deleteRowsInternal(rowIds); + ColumnarRow acidBlock = toColumnarRow(rowIds); + int fieldCount = acidBlock.getFieldCount(); + checkArgument(fieldCount == 3, "The rowId block for DELETE should have 3 children, but has %s", fieldCount); + deleteRowsInternal(acidBlock); } - private void deleteRowsInternal(Block rowIds) + private void deleteRowsInternal(ColumnarRow columnarRow) { - int positionCount = rowIds.getPositionCount(); - List blocks = rowIds.getChildren(); + int positionCount = columnarRow.getPositionCount(); + for (int position = 0; position < positionCount; position++) { + checkArgument(!columnarRow.isNull(position), "In the delete rowIds, found null row at position %s", position); + } + + Block originalTransactionChannel = columnarRow.getField(ORIGINAL_TRANSACTION_CHANNEL); Block[] blockArray = { new RunLengthEncodedBlock(DELETE_OPERATION_BLOCK, positionCount), - blocks.get(ORIGINAL_TRANSACTION_CHANNEL), - blocks.get(BUCKET_CHANNEL), - blocks.get(ROW_ID_CHANNEL), + originalTransactionChannel, + columnarRow.getField(BUCKET_CHANNEL), + columnarRow.getField(ROW_ID_CHANNEL), RunLengthEncodedBlock.create(BIGINT, writeId, positionCount), new RunLengthEncodedBlock(hiveRowTypeNullsBlock, positionCount), }; Page deletePage = new Page(blockArray); - Block block = blocks.get(ORIGINAL_TRANSACTION_CHANNEL); for (int index = 0; index < positionCount; index++) { - maxWriteId = Math.max(maxWriteId, block.getLong(index, 0)); + maxWriteId = Math.max(maxWriteId, originalTransactionChannel.getLong(index, 0)); } lazyInitializeDeleteFileWriter(); @@ -152,11 +157,11 @@ public void updateRows(Page page, List columnValueAndRowIdChannels) verify(positionCount > 0, "Unexpected empty page"); // should be filtered out by engine HiveUpdateProcessor updateProcessor = transaction.getUpdateProcessor().orElseThrow(() -> new IllegalArgumentException("updateProcessor not present")); - RowBlock acidRowBlock = updateProcessor.getAcidRowBlock(page, columnValueAndRowIdChannels); + ColumnarRow acidBlock = updateProcessor.getAcidBlock(page, columnValueAndRowIdChannels); - List blocks = acidRowBlock.getChildren(); - checkArgument(blocks.size() == 3 || blocks.size() == 4, "The rowId block for UPDATE should have 3 or 4 children, but has %s", blocks.size()); - deleteRowsInternal(acidRowBlock); + int fieldCount = acidBlock.getFieldCount(); + checkArgument(fieldCount == 3 || fieldCount == 4, "The rowId block for UPDATE should have 3 or 4 children, but has %s", fieldCount); + deleteRowsInternal(acidBlock); Block mergedColumnsBlock = updateProcessor.createMergedColumnsBlock(page, columnValueAndRowIdChannels); @@ -164,7 +169,7 @@ public void updateRows(Page page, List columnValueAndRowIdChannels) Block[] blockArray = { new RunLengthEncodedBlock(INSERT_OPERATION_BLOCK, positionCount), currentTransactionBlock, - blocks.get(BUCKET_CHANNEL), + acidBlock.getField(BUCKET_CHANNEL), createRowIdBlock(positionCount), currentTransactionBlock, mergedColumnsBlock, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveUpdateProcessor.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveUpdateProcessor.java index 666ee6fa885a..a132d5007252 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveUpdateProcessor.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveUpdateProcessor.java @@ -21,6 +21,7 @@ import io.trino.plugin.hive.orc.OrcDeletedRows.MaskDeletedRowsFunction; import io.trino.spi.Page; import io.trino.spi.block.Block; +import io.trino.spi.block.ColumnarRow; import io.trino.spi.block.RowBlock; import io.trino.spi.type.RowType; @@ -44,6 +45,7 @@ import static io.trino.plugin.hive.HiveUpdatablePageSource.ROW_ID_CHANNEL; import static io.trino.plugin.hive.acid.AcidSchema.ACID_COLUMN_ROW_STRUCT; import static io.trino.plugin.hive.acid.AcidSchema.ACID_READ_FIELDS; +import static io.trino.spi.block.ColumnarRow.toColumnarRow; import static io.trino.spi.block.RowBlock.fromFieldBlocks; import static io.trino.spi.type.RowType.Field; import static io.trino.spi.type.RowType.field; @@ -178,11 +180,10 @@ public static HiveColumnHandle getUpdateRowIdColumnHandle(List return createBaseColumn(UPDATE_ROW_ID_COLUMN_NAME, UPDATE_ROW_ID_COLUMN_INDEX, toHiveType(acidRowType), acidRowType, SYNTHESIZED, Optional.empty()); } - public RowBlock getAcidRowBlock(Page page, List columnValueAndRowIdChannels) + public ColumnarRow getAcidBlock(Page page, List columnValueAndRowIdChannels) { Block acidBlock = page.getBlock(columnValueAndRowIdChannels.get(columnValueAndRowIdChannels.size() - 1)); - checkArgument(acidBlock instanceof RowBlock, "The acid block in the page must be a RowBlock, but instead was %s", acidBlock); - return (RowBlock) acidBlock; + return toColumnarRow(acidBlock); } /** @@ -195,18 +196,23 @@ public RowBlock getAcidRowBlock(Page page, List columnValueAndRowIdChan public Block createMergedColumnsBlock(Page page, List columnValueAndRowIdChannels) { requireNonNull(page, "page is null"); - RowBlock acidBlock = getAcidRowBlock(page, columnValueAndRowIdChannels); - List acidBlocks = acidBlock.getChildren(); + ColumnarRow acidBlock = getAcidBlock(page, columnValueAndRowIdChannels); + int fieldCount = acidBlock.getFieldCount(); List nonUpdatedColumnRowBlocks; if (nonUpdatedColumns.isEmpty()) { - checkArgument(acidBlocks.size() == 3, "The ACID RowBlock must contain 3 children, but instead had %s children", acidBlocks.size()); + checkArgument(fieldCount == 3, "The ACID block must contain 3 children, but instead had %s children", fieldCount); nonUpdatedColumnRowBlocks = ImmutableList.of(); } else { - checkArgument(acidBlocks.size() == 4, "The first RowBlock must contain 4 children, but instead had %s children", acidBlocks.size()); - Block lastAcidBlock = acidBlocks.get(3); + checkArgument(fieldCount == 4, "The first RowBlock must contain 4 children, but instead had %s children", fieldCount); + Block lastAcidBlock = acidBlock.getField(3); checkArgument(lastAcidBlock instanceof RowBlock, "The last block in the acidBlock must be a RowBlock, but instead was %s", lastAcidBlock); - nonUpdatedColumnRowBlocks = lastAcidBlock.getChildren(); + ColumnarRow nonUpdatedColumnRow = toColumnarRow(lastAcidBlock); + ImmutableList.Builder builder = ImmutableList.builder(); + for (int field = 0; field < nonUpdatedColumnRow.getFieldCount(); field++) { + builder.add(nonUpdatedColumnRow.getField(field)); + } + nonUpdatedColumnRowBlocks = builder.build(); } // Merge the non-updated and updated column blocks diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java index 70cd0b2c2394..962c907c0ea2 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java @@ -832,6 +832,43 @@ public void testDeleteAllRowsInPartition() }); } + @Test(groups = HIVE_TRANSACTIONAL, timeOut = TEST_TIMEOUT) + public void testDeleteAfterDelete() + { + withTemporaryTable("delete_after_delete", true, false, NONE, tableName -> { + onTrino().executeQuery(format("CREATE TABLE %s (id INT) WITH (transactional = true)", tableName)); + + onTrino().executeQuery(format("INSERT INTO %s VALUES (1), (2), (3)", tableName)); + + onTrino().executeQuery(format("DELETE FROM %s WHERE id = 2", tableName)); + + verifySelectForTrinoAndHive("SELECT * FROM " + tableName, "true", row(1), row(3)); + + onTrino().executeQuery("DELETE FROM " + tableName); + + assertThat(onTrino().executeQuery("SELECT count(*) FROM " + tableName)).containsOnly(row(0)); + }); + } + + @Test(groups = HIVE_TRANSACTIONAL, timeOut = TEST_TIMEOUT) + public void testDeleteAfterDeleteWithPredicate() + { + withTemporaryTable("delete_after_delete_predicate", true, false, NONE, tableName -> { + onTrino().executeQuery(format("CREATE TABLE %s (id INT) WITH (transactional = true)", tableName)); + + onTrino().executeQuery(format("INSERT INTO %s VALUES (1), (2), (3)", tableName)); + + onTrino().executeQuery(format("DELETE FROM %s WHERE id = 2", tableName)); + + verifySelectForTrinoAndHive("SELECT * FROM " + tableName, "true", row(1), row(3)); + + // A predicate sufficient to fool statistics-based optimization + onTrino().executeQuery(format("DELETE FROM %s WHERE id != 2", tableName)); + + assertThat(onTrino().executeQuery("SELECT count(*) FROM " + tableName)).containsOnly(row(0)); + }); + } + @Test(groups = HIVE_TRANSACTIONAL, dataProvider = "inserterAndDeleterProvider", timeOut = TEST_TIMEOUT) public void testBucketedUnpartitionedDelete(Engine inserter, Engine deleter) {