Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3109,7 +3109,15 @@ public PhysicalOperation visitRefreshMaterializedView(RefreshMaterializedViewNod
public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context)
{
// Set table writer count
context.setDriverInstanceCount(getTaskWriterCount(session));
// being a single node means there is one node and one writer so
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

being a single node means there is one node and one writer so

Single node doesn't mean single writer (there can be multiple writers per node).

Currently, single node partitioning is used only by system partitioning handle and it's not for insert path.
This code here only deals with local distribution, but there is also io.trino.sql.planner.optimizations.AddLocalExchanges.Rewriter#visitTableWriter and possibly more, see changes in b8e4e3f
I would rather not change this code.

Could we just handle your case using dedicated constant partitioning function which would direct all rows to single writer?

// this setting should take precedence over task_writer_count property
if (node.getPartitioningScheme().isPresent() &&
node.getPartitioningScheme().get().getPartitioning().getHandle().isSingleNode()) {
context.setDriverInstanceCount(1);
}
else {
context.setDriverInstanceCount(getTaskWriterCount(session));
}

PhysicalOperation source = node.getSource().accept(this, context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,18 @@ public void testHiveViewTranslationError()
// TODO: combine this with tests for successful translation (currently in TestHiveViews product test)
}
}

@Test
public void testInsertBucketedTransactionalTableLayout()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not in AbstractTestHive?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because AbstractTestHive is also extended by other classes like TestHiveAlluxioMetastore

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because AbstractTestHive is also extended by other classes like TestHiveAlluxioMetastore

Yet io.trino.plugin.hive.AbstractTestHive#testInsertBucketedTableLayout and io.trino.plugin.hive.AbstractTestHive#testInsertPartitionedBucketedTableLayout are in AbstractTestHive

throws Exception
{
insertBucketedTableLayout(true);
}

@Test
public void testInsertPartitionedBucketedTransactionalTableLayout()
throws Exception
{
insertPartitionedBucketedTableLayout(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class HivePageSink

private final HiveWriterFactory writerFactory;

private final boolean isTransactional;
private final int[] dataColumnInputIndex; // ordinal of columns (not counting sample weight column)
private final int[] partitionColumnsInputIndex; // ordinal of columns (not counting sample weight column)

Expand Down Expand Up @@ -98,6 +99,7 @@ public class HivePageSink
public HivePageSink(
HiveWriterFactory writerFactory,
List<HiveColumnHandle> inputColumns,
boolean isTransactional,
Optional<HiveBucketProperty> bucketProperty,
PageIndexerFactory pageIndexerFactory,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -112,6 +114,7 @@ public HivePageSink(

requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");

this.isTransactional = isTransactional;
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.maxOpenWriters = maxOpenWriters;
this.writeVerificationExecutor = requireNonNull(writeVerificationExecutor, "writeVerificationExecutor is null");
Expand Down Expand Up @@ -361,7 +364,9 @@ private int[] getWriterIndexes(Page page)
HiveWriter writer = writers.get(writerIndex);
if (writer != null) {
// if current file not too big continue with the current writer
if (bucketFunction != null || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) {
// for transactional tables we don't want to split output files because there is an implicit bucketing with
// 1 bucket so there should be only one output file bucket_00000
if (bucketFunction != null || isTransactional || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) {
continue;
}
// close current writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
return new HivePageSink(
writerFactory,
handle.getInputColumns(),
handle.isTransactional(),
handle.getBucketProperty(),
pageIndexerFactory,
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public boolean isUsePartitionedBucketing()
return usePartitionedBucketing;
}

@Override
public boolean isSingleNode()
{
// empty hiveTypes means there is no bucketing
return hiveTypes.isEmpty() && !usePartitionedBucketing;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why no bucketing means no insert distribution? Because you want single file?

}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@
import static io.trino.plugin.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.TRANSACTIONAL;
import static io.trino.plugin.hive.HiveTableRedirectionsProvider.NO_REDIRECTIONS;
import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
Expand Down Expand Up @@ -5073,10 +5074,27 @@ protected static List<ColumnMetadata> filterNonHiddenColumnMetadata(Collection<C
private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List<Column> columns, List<Column> partitionColumns)
throws Exception
{
createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty());
createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty(), "false");
}

private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List<Column> columns, List<Column> partitionColumns, Optional<HiveBucketProperty> bucketProperty)
private void createEmptyTable(
SchemaTableName schemaTableName,
HiveStorageFormat hiveStorageFormat,
List<Column> columns,
List<Column> partitionColumns,
Optional<HiveBucketProperty> bucketProperty)
throws Exception
{
createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, bucketProperty, "false");
}

protected void createEmptyTable(
SchemaTableName schemaTableName,
HiveStorageFormat hiveStorageFormat,
List<Column> columns,
List<Column> partitionColumns,
Optional<HiveBucketProperty> bucketProperty,
String isTransactional)
throws Exception
{
Path targetPath;
Expand All @@ -5099,7 +5117,8 @@ private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat
.setTableType(TableType.MANAGED_TABLE.name())
.setParameters(ImmutableMap.of(
PRESTO_VERSION_NAME, TEST_SERVER_VERSION,
PRESTO_QUERY_ID_NAME, session.getQueryId()))
PRESTO_QUERY_ID_NAME, session.getQueryId(),
TRANSACTIONAL, isTransactional))
.setDataColumns(columns)
.setPartitionColumns(partitionColumns);

Expand Down Expand Up @@ -5220,14 +5239,20 @@ public void testPreferredInsertLayout()
@Test
public void testInsertBucketedTableLayout()
throws Exception
{
insertBucketedTableLayout(false);
}

protected void insertBucketedTableLayout(boolean transactional)
throws Exception
{
SchemaTableName tableName = temporaryTable("empty_bucketed_table");
try {
List<Column> columns = ImmutableList.of(
new Column("column1", HIVE_STRING, Optional.empty()),
new Column("column2", HIVE_LONG, Optional.empty()));
HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of());
createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty));
createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty), String.valueOf(transactional));

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Expand Down Expand Up @@ -5256,6 +5281,12 @@ public void testInsertBucketedTableLayout()
@Test
public void testInsertPartitionedBucketedTableLayout()
throws Exception
{
insertPartitionedBucketedTableLayout(false);
}

protected void insertPartitionedBucketedTableLayout(boolean transactional)
throws Exception
{
SchemaTableName tableName = temporaryTable("empty_partitioned_table");
try {
Expand All @@ -5264,7 +5295,7 @@ public void testInsertPartitionedBucketedTableLayout()
new Column("column1", HIVE_STRING, Optional.empty()),
partitioningColumn);
HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of());
createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty));
createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty), String.valueOf(transactional));

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestHiveTransactionalTable
extends HiveProductTest
Expand Down Expand Up @@ -546,7 +548,7 @@ public void testSimpleUnpartitionedTransactionalInsert()

// ensure that we treat ACID tables as implicitly bucketed on INSERT
String explainOutput = (String) onTrino().executeQuery("EXPLAIN " + insertQuery).row(0).get(0);
Assertions.assertThat(explainOutput).contains("Output partitioning: hive:HivePartitioningHandle{buckets=1");
Assertions.assertThat(explainOutput).containsPattern("(.*)Fragment 2 \\[SINGLE\\]\n(.*)Output layout: \\[field, column2\\]\n(.*)Output partitioning: SINGLE \\[\\].*");

onTrino().executeQuery(insertQuery);

Expand Down Expand Up @@ -1903,6 +1905,58 @@ public void testDeleteAfterMajorCompaction()
});
}

@Test
public void testUnbucketedPartitionedTransactionalTableWithTaskWriterCountGreaterThanOne()
{
unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(true);
}

@Test
public void testUnbucketedTransactionalTableWithTaskWriterCountGreaterThanOne()
{
unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(false);
}

private void unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(boolean isPartitioned)
{
withTemporaryTable(format("test_unbucketed%s_transactional_table_with_task_writer_count_greater_than_one", isPartitioned ? "_partitioned" : ""), true, isPartitioned, NONE, tableName -> {
onTrino().executeQuery(format(
"CREATE TABLE %s " +
"WITH (" +
"format='ORC', " +
"transactional=true " +
"%s" +
") AS SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " +
"FROM tpch.sf1000.orders LIMIT 0", tableName, isPartitioned ? ", partitioned_by = ARRAY['orderpriority']" : ""));
onTrino().executeQuery("SET SESSION scale_writers = true");
onTrino().executeQuery("SET SESSION writer_min_size = '4kB'");
onTrino().executeQuery("SET SESSION task_writer_count = 4");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a test with task_writer_count>1 for unpartitioned (here) and also partitioned tables?
i think we should.

onTrino().executeQuery("SET SESSION hive.target_max_file_size = '1MB'");

onTrino().executeQuery(
format(
"INSERT INTO %s SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " +
"FROM tpch.sf1000.orders LIMIT 100000", tableName));
assertThat(onTrino().executeQuery(format("SELECT count(*) FROM %s", tableName))).containsOnly(row(100000));
int numberOfCreatedFiles = onTrino().executeQuery(format("SELECT DISTINCT \"$path\" FROM %s", tableName)).getRowsCount();
int expectedNumberOfPartitions = isPartitioned ? 5 : 1;
assertEquals(numberOfCreatedFiles, expectedNumberOfPartitions, format("There should be only %s files created", expectedNumberOfPartitions));

int sizeBeforeDeletion = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size();

onTrino().executeQuery(format("DELETE FROM %s WHERE (orderkey %% 2) = 0", tableName));
assertThat(onTrino().executeQuery(format("SELECT COUNT (orderkey) FROM %s WHERE orderkey %% 2 = 0", tableName))).containsOnly(row(0));

int sizeOnTrinoWithWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size();
int sizeOnHiveWithWhere = onHive().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size();
int sizeOnTrinoWithoutWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size();

assertEquals(sizeOnHiveWithWhere, sizeOnTrinoWithWhere);
assertEquals(sizeOnTrinoWithWhere, sizeOnTrinoWithoutWhere);
assertTrue(sizeBeforeDeletion > sizeOnTrinoWithoutWhere);
});
}

private void hdfsDeleteAll(String directory)
{
if (!hdfsClient.exist(directory)) {
Expand Down