-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Fix bug when DELETE ACID block is a DictionaryBlock #9354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,8 @@ | |
| import io.trino.spi.Page; | ||
| import io.trino.spi.TrinoException; | ||
| import io.trino.spi.block.Block; | ||
| import io.trino.spi.block.ColumnarRow; | ||
| import io.trino.spi.block.LongArrayBlock; | ||
| import io.trino.spi.block.RowBlock; | ||
| import io.trino.spi.block.RunLengthEncodedBlock; | ||
| import io.trino.spi.connector.ConnectorPageSource; | ||
| import io.trino.spi.connector.ConnectorSession; | ||
|
|
@@ -43,6 +43,7 @@ | |
| import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; | ||
| import static io.trino.plugin.hive.PartitionAndStatementId.CODEC; | ||
| import static io.trino.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES; | ||
| import static io.trino.spi.block.ColumnarRow.toColumnarRow; | ||
| import static io.trino.spi.predicate.Utils.nativeValueToBlock; | ||
| import static io.trino.spi.type.BigintType.BIGINT; | ||
| import static java.lang.String.format; | ||
|
|
@@ -116,28 +117,32 @@ public HiveUpdatablePageSource( | |
| @Override | ||
| public void deleteRows(Block rowIds) | ||
| { | ||
| List<Block> blocks = rowIds.getChildren(); | ||
| checkArgument(blocks.size() == 3, "The rowId block for DELETE should have 3 children, but has %s", blocks.size()); | ||
|
||
| deleteRowsInternal(rowIds); | ||
| ColumnarRow acidBlock = toColumnarRow(rowIds); | ||
| int fieldCount = acidBlock.getFieldCount(); | ||
| checkArgument(fieldCount == 3, "The rowId block for DELETE should have 3 children, but has %s", fieldCount); | ||
| deleteRowsInternal(acidBlock); | ||
| } | ||
|
|
||
| private void deleteRowsInternal(Block rowIds) | ||
| private void deleteRowsInternal(ColumnarRow columnarRow) | ||
| { | ||
| int positionCount = rowIds.getPositionCount(); | ||
| List<Block> blocks = rowIds.getChildren(); | ||
| int positionCount = columnarRow.getPositionCount(); | ||
| for (int position = 0; position < positionCount; position++) { | ||
| checkArgument(!columnarRow.isNull(position), "In the delete rowIds, found null row at position %s", position); | ||
| } | ||
|
|
||
| Block originalTransactionChannel = columnarRow.getField(ORIGINAL_TRANSACTION_CHANNEL); | ||
| Block[] blockArray = { | ||
| new RunLengthEncodedBlock(DELETE_OPERATION_BLOCK, positionCount), | ||
| blocks.get(ORIGINAL_TRANSACTION_CHANNEL), | ||
| blocks.get(BUCKET_CHANNEL), | ||
| blocks.get(ROW_ID_CHANNEL), | ||
| originalTransactionChannel, | ||
| columnarRow.getField(BUCKET_CHANNEL), | ||
| columnarRow.getField(ROW_ID_CHANNEL), | ||
| RunLengthEncodedBlock.create(BIGINT, writeId, positionCount), | ||
| new RunLengthEncodedBlock(hiveRowTypeNullsBlock, positionCount), | ||
| }; | ||
| Page deletePage = new Page(blockArray); | ||
|
|
||
| Block block = blocks.get(ORIGINAL_TRANSACTION_CHANNEL); | ||
| for (int index = 0; index < positionCount; index++) { | ||
| maxWriteId = Math.max(maxWriteId, block.getLong(index, 0)); | ||
| maxWriteId = Math.max(maxWriteId, originalTransactionChannel.getLong(index, 0)); | ||
| } | ||
|
|
||
| lazyInitializeDeleteFileWriter(); | ||
|
|
@@ -152,19 +157,19 @@ public void updateRows(Page page, List<Integer> columnValueAndRowIdChannels) | |
| verify(positionCount > 0, "Unexpected empty page"); // should be filtered out by engine | ||
|
|
||
| HiveUpdateProcessor updateProcessor = transaction.getUpdateProcessor().orElseThrow(() -> new IllegalArgumentException("updateProcessor not present")); | ||
| RowBlock acidRowBlock = updateProcessor.getAcidRowBlock(page, columnValueAndRowIdChannels); | ||
| ColumnarRow acidBlock = updateProcessor.getAcidBlock(page, columnValueAndRowIdChannels); | ||
|
|
||
| List<Block> blocks = acidRowBlock.getChildren(); | ||
| checkArgument(blocks.size() == 3 || blocks.size() == 4, "The rowId block for UPDATE should have 3 or 4 children, but has %s", blocks.size()); | ||
| deleteRowsInternal(acidRowBlock); | ||
| int fieldCount = acidBlock.getFieldCount(); | ||
| checkArgument(fieldCount == 3 || fieldCount == 4, "The rowId block for UPDATE should have 3 or 4 children, but has %s", fieldCount); | ||
| deleteRowsInternal(acidBlock); | ||
|
|
||
| Block mergedColumnsBlock = updateProcessor.createMergedColumnsBlock(page, columnValueAndRowIdChannels); | ||
|
|
||
| Block currentTransactionBlock = RunLengthEncodedBlock.create(BIGINT, writeId, positionCount); | ||
| Block[] blockArray = { | ||
| new RunLengthEncodedBlock(INSERT_OPERATION_BLOCK, positionCount), | ||
| currentTransactionBlock, | ||
| blocks.get(BUCKET_CHANNEL), | ||
| acidBlock.getField(BUCKET_CHANNEL), | ||
| createRowIdBlock(positionCount), | ||
| currentTransactionBlock, | ||
| mergedColumnsBlock, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -832,6 +832,43 @@ public void testDeleteAllRowsInPartition() | |
| }); | ||
| } | ||
|
|
||
| @Test(groups = HIVE_TRANSACTIONAL, timeOut = TEST_TIMEOUT) | ||
| public void testDeleteAfterDelete() | ||
| { | ||
| withTemporaryTable("delete_after_delete", true, false, NONE, tableName -> { | ||
| onTrino().executeQuery(format("CREATE TABLE %s (id INT) WITH (transactional = true)", tableName)); | ||
|
|
||
| onTrino().executeQuery(format("INSERT INTO %s VALUES (1), (2), (3)", tableName)); | ||
|
|
||
| onTrino().executeQuery(format("DELETE FROM %s WHERE id = 2", tableName)); | ||
|
|
||
| verifySelectForTrinoAndHive("SELECT * FROM " + tableName, "true", row(1), row(3)); | ||
|
|
||
| onTrino().executeQuery("DELETE FROM " + tableName); | ||
|
||
|
|
||
| assertThat(onTrino().executeQuery("SELECT count(*) FROM " + tableName)).containsOnly(row(0)); | ||
| }); | ||
| } | ||
|
|
||
| @Test(groups = HIVE_TRANSACTIONAL, timeOut = TEST_TIMEOUT) | ||
| public void testDeleteAfterDeleteWithPredicate() | ||
| { | ||
| withTemporaryTable("delete_after_delete_predicate", true, false, NONE, tableName -> { | ||
| onTrino().executeQuery(format("CREATE TABLE %s (id INT) WITH (transactional = true)", tableName)); | ||
|
|
||
| onTrino().executeQuery(format("INSERT INTO %s VALUES (1), (2), (3)", tableName)); | ||
|
|
||
| onTrino().executeQuery(format("DELETE FROM %s WHERE id = 2", tableName)); | ||
|
|
||
| verifySelectForTrinoAndHive("SELECT * FROM " + tableName, "true", row(1), row(3)); | ||
|
|
||
| // A predicate sufficient to fool statistics-based optimization | ||
| onTrino().executeQuery(format("DELETE FROM %s WHERE id != 2", tableName)); | ||
|
|
||
| assertThat(onTrino().executeQuery("SELECT count(*) FROM " + tableName)).containsOnly(row(0)); | ||
| }); | ||
| } | ||
|
|
||
| @Test(groups = HIVE_TRANSACTIONAL, dataProvider = "inserterAndDeleterProvider", timeOut = TEST_TIMEOUT) | ||
| public void testBucketedUnpartitionedDelete(Engine inserter, Engine deleter) | ||
| { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.