diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 03305cc9fe7e..600f48a19626 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -3109,7 +3109,15 @@ public PhysicalOperation visitRefreshMaterializedView(RefreshMaterializedViewNod public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context) { // Set table writer count - context.setDriverInstanceCount(getTaskWriterCount(session)); + // being a single node means there is one node and one writer so + // this setting should take precedence over task_writer_count property + if (node.getPartitioningScheme().isPresent() && + node.getPartitioningScheme().get().getPartitioning().getHandle().isSingleNode()) { + context.setDriverInstanceCount(1); + } + else { + context.setDriverInstanceCount(getTaskWriterCount(session)); + } PhysicalOperation source = node.getSource().accept(this, context); diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHive.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHive.java index 3dab59bbf93b..b13fd9099f8f 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHive.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHive.java @@ -103,4 +103,18 @@ public void testHiveViewTranslationError() // TODO: combine this with tests for successful translation (currently in TestHiveViews product test) } } + + @Test + public void testInsertBucketedTransactionalTableLayout() + throws Exception + { + insertBucketedTableLayout(true); + } + + @Test + public void testInsertPartitionedBucketedTransactionalTableLayout() + throws Exception + { + insertPartitionedBucketedTableLayout(true); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java index 678ebcd085e8..d8834493e627 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java @@ -68,6 +68,7 @@ public class HivePageSink private final HiveWriterFactory writerFactory; + private final boolean isTransactional; private final int[] dataColumnInputIndex; // ordinal of columns (not counting sample weight column) private final int[] partitionColumnsInputIndex; // ordinal of columns (not counting sample weight column) @@ -98,6 +99,7 @@ public class HivePageSink public HivePageSink( HiveWriterFactory writerFactory, List inputColumns, + boolean isTransactional, Optional bucketProperty, PageIndexerFactory pageIndexerFactory, HdfsEnvironment hdfsEnvironment, @@ -112,6 +114,7 @@ public HivePageSink( requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); + this.isTransactional = isTransactional; this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.maxOpenWriters = maxOpenWriters; this.writeVerificationExecutor = requireNonNull(writeVerificationExecutor, "writeVerificationExecutor is null"); @@ -361,7 +364,9 @@ private int[] getWriterIndexes(Page page) HiveWriter writer = writers.get(writerIndex); if (writer != null) { // if current file not too big continue with the current writer - if (bucketFunction != null || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) { + // for transactional tables we don't want to split output files because there is an implicit bucketing with + // 1 bucket so there should be only one output file bucket_00000 + if (bucketFunction != null || isTransactional || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) { continue; } // close current writer diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java index d632c2a13c3a..29b9736b6251 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java @@ -173,6 +173,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean return new HivePageSink( writerFactory, handle.getInputColumns(), + handle.isTransactional(), handle.getBucketProperty(), pageIndexerFactory, hdfsEnvironment, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitioningHandle.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitioningHandle.java index 51195f2c5189..a58a0ab1d5e0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitioningHandle.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitioningHandle.java @@ -80,6 +80,13 @@ public boolean isUsePartitionedBucketing() return usePartitionedBucketing; } + @Override + public boolean isSingleNode() + { + // empty hiveTypes means there is no bucketing + return hiveTypes.isEmpty() && !usePartitionedBucketing; + } + @Override public String toString() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 4c56ac146faf..f95b599a89ea 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -227,6 +227,7 @@ import static io.trino.plugin.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY; import static io.trino.plugin.hive.HiveTableProperties.SORTED_BY_PROPERTY; import static io.trino.plugin.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY; +import static io.trino.plugin.hive.HiveTableProperties.TRANSACTIONAL; import static io.trino.plugin.hive.HiveTableRedirectionsProvider.NO_REDIRECTIONS; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; import static io.trino.plugin.hive.HiveTestUtils.SESSION; @@ -5073,10 +5074,27 @@ protected static List filterNonHiddenColumnMetadata(Collection columns, List partitionColumns) throws Exception { - createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty()); + createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty(), "false"); } - private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List columns, List partitionColumns, Optional bucketProperty) + private void createEmptyTable( + SchemaTableName schemaTableName, + HiveStorageFormat hiveStorageFormat, + List columns, + List partitionColumns, + Optional bucketProperty) + throws Exception + { + createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, bucketProperty, "false"); + } + + protected void createEmptyTable( + SchemaTableName schemaTableName, + HiveStorageFormat hiveStorageFormat, + List columns, + List partitionColumns, + Optional bucketProperty, + String isTransactional) throws Exception { Path targetPath; @@ -5099,7 +5117,8 @@ private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat .setTableType(TableType.MANAGED_TABLE.name()) .setParameters(ImmutableMap.of( PRESTO_VERSION_NAME, TEST_SERVER_VERSION, - PRESTO_QUERY_ID_NAME, session.getQueryId())) + PRESTO_QUERY_ID_NAME, session.getQueryId(), + TRANSACTIONAL, isTransactional)) .setDataColumns(columns) .setPartitionColumns(partitionColumns); @@ -5220,6 +5239,12 @@ public void testPreferredInsertLayout() @Test public void testInsertBucketedTableLayout() throws Exception + { + insertBucketedTableLayout(false); + } + + protected void insertBucketedTableLayout(boolean transactional) + throws Exception { SchemaTableName tableName = temporaryTable("empty_bucketed_table"); try { @@ -5227,7 +5252,7 @@ public void testInsertBucketedTableLayout() new Column("column1", HIVE_STRING, Optional.empty()), new Column("column2", HIVE_LONG, Optional.empty())); HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of()); - createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty)); + createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty), String.valueOf(transactional)); try (Transaction transaction = newTransaction()) { ConnectorMetadata metadata = transaction.getMetadata(); @@ -5256,6 +5281,12 @@ public void testInsertBucketedTableLayout() @Test public void testInsertPartitionedBucketedTableLayout() throws Exception + { + insertPartitionedBucketedTableLayout(false); + } + + protected void insertPartitionedBucketedTableLayout(boolean transactional) + throws Exception { SchemaTableName tableName = temporaryTable("empty_partitioned_table"); try { @@ -5264,7 +5295,7 @@ public void testInsertPartitionedBucketedTableLayout() new Column("column1", HIVE_STRING, Optional.empty()), partitioningColumn); HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of()); - createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty)); + createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty), String.valueOf(transactional)); try (Transaction transaction = newTransaction()) { ConnectorMetadata metadata = transaction.getMetadata(); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java index 967a9a914bde..cb29cc27623f 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java @@ -73,6 +73,8 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class TestHiveTransactionalTable extends HiveProductTest @@ -546,7 +548,7 @@ public void testSimpleUnpartitionedTransactionalInsert() // ensure that we treat ACID tables as implicitly bucketed on INSERT String explainOutput = (String) onTrino().executeQuery("EXPLAIN " + insertQuery).row(0).get(0); - Assertions.assertThat(explainOutput).contains("Output partitioning: hive:HivePartitioningHandle{buckets=1"); + Assertions.assertThat(explainOutput).containsPattern("(.*)Fragment 2 \\[SINGLE\\]\n(.*)Output layout: \\[field, column2\\]\n(.*)Output partitioning: SINGLE \\[\\].*"); onTrino().executeQuery(insertQuery); @@ -1903,6 +1905,58 @@ public void testDeleteAfterMajorCompaction() }); } + @Test + public void testUnbucketedPartitionedTransactionalTableWithTaskWriterCountGreaterThanOne() + { + unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(true); + } + + @Test + public void testUnbucketedTransactionalTableWithTaskWriterCountGreaterThanOne() + { + unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(false); + } + + private void unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(boolean isPartitioned) + { + withTemporaryTable(format("test_unbucketed%s_transactional_table_with_task_writer_count_greater_than_one", isPartitioned ? "_partitioned" : ""), true, isPartitioned, NONE, tableName -> { + onTrino().executeQuery(format( + "CREATE TABLE %s " + + "WITH (" + + "format='ORC', " + + "transactional=true " + + "%s" + + ") AS SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " + + "FROM tpch.sf1000.orders LIMIT 0", tableName, isPartitioned ? ", partitioned_by = ARRAY['orderpriority']" : "")); + onTrino().executeQuery("SET SESSION scale_writers = true"); + onTrino().executeQuery("SET SESSION writer_min_size = '4kB'"); + onTrino().executeQuery("SET SESSION task_writer_count = 4"); + onTrino().executeQuery("SET SESSION hive.target_max_file_size = '1MB'"); + + onTrino().executeQuery( + format( + "INSERT INTO %s SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " + + "FROM tpch.sf1000.orders LIMIT 100000", tableName)); + assertThat(onTrino().executeQuery(format("SELECT count(*) FROM %s", tableName))).containsOnly(row(100000)); + int numberOfCreatedFiles = onTrino().executeQuery(format("SELECT DISTINCT \"$path\" FROM %s", tableName)).getRowsCount(); + int expectedNumberOfPartitions = isPartitioned ? 5 : 1; + assertEquals(numberOfCreatedFiles, expectedNumberOfPartitions, format("There should be only %s files created", expectedNumberOfPartitions)); + + int sizeBeforeDeletion = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size(); + + onTrino().executeQuery(format("DELETE FROM %s WHERE (orderkey %% 2) = 0", tableName)); + assertThat(onTrino().executeQuery(format("SELECT COUNT (orderkey) FROM %s WHERE orderkey %% 2 = 0", tableName))).containsOnly(row(0)); + + int sizeOnTrinoWithWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size(); + int sizeOnHiveWithWhere = onHive().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size(); + int sizeOnTrinoWithoutWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size(); + + assertEquals(sizeOnHiveWithWhere, sizeOnTrinoWithWhere); + assertEquals(sizeOnTrinoWithWhere, sizeOnTrinoWithoutWhere); + assertTrue(sizeBeforeDeletion > sizeOnTrinoWithoutWhere); + }); + } + private void hdfsDeleteAll(String directory) { if (!hdfsClient.exist(directory)) {