Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,6 @@ private StreamProperties(

checkArgument(distribution != SINGLE || this.partitioningColumns.equals(Optional.of(ImmutableList.of())),
"Single stream must be partitioned on empty set");
checkArgument(distribution == SINGLE || !this.partitioningColumns.equals(Optional.of(ImmutableList.of())),
"Multiple streams must not be partitioned on empty set");

this.ordered = ordered;
checkArgument(!ordered || distribution == SINGLE, "Ordered must be a single stream");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,4 +352,18 @@ public void testNewDirectoryPermissions()
{
// Alluxio metastore does not support create operations
}

@Override
public void testInsertBucketedTransactionalTableLayout()
throws Exception
{
// Alluxio metastore does not support insert/update/delete operations
}

@Override
public void testInsertPartitionedBucketedTransactionalTableLayout()
throws Exception
{
// Alluxio metastore does not support insert/update/delete operations
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class HivePageSink

private final HiveWriterFactory writerFactory;

private final boolean isTransactional;
private final int[] dataColumnInputIndex; // ordinal of columns (not counting sample weight column)
private final int[] partitionColumnsInputIndex; // ordinal of columns (not counting sample weight column)

Expand Down Expand Up @@ -98,6 +99,7 @@ public class HivePageSink
public HivePageSink(
HiveWriterFactory writerFactory,
List<HiveColumnHandle> inputColumns,
boolean isTransactional,
Optional<HiveBucketProperty> bucketProperty,
PageIndexerFactory pageIndexerFactory,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -112,6 +114,7 @@ public HivePageSink(

requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");

this.isTransactional = isTransactional;
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.maxOpenWriters = maxOpenWriters;
this.writeVerificationExecutor = requireNonNull(writeVerificationExecutor, "writeVerificationExecutor is null");
Expand Down Expand Up @@ -361,7 +364,9 @@ private int[] getWriterIndexes(Page page)
HiveWriter writer = writers.get(writerIndex);
if (writer != null) {
// if current file not too big continue with the current writer
if (bucketFunction != null || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) {
// for transactional tables we don't want to split output files because there is an explicit or implicit bucketing
// and file names have no random component (e.g. bucket_00000)
if (bucketFunction != null || isTransactional || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't bucket function always present when isTransactional=true?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not present in the scenario I am trying to fix

continue;
}
// close current writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
return new HivePageSink(
writerFactory,
handle.getInputColumns(),
handle.isTransactional(),
handle.getBucketProperty(),
pageIndexerFactory,
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.builder;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
Expand Down Expand Up @@ -227,6 +228,7 @@
import static io.trino.plugin.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.TRANSACTIONAL;
import static io.trino.plugin.hive.HiveTableRedirectionsProvider.NO_REDIRECTIONS;
import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
Expand Down Expand Up @@ -5075,10 +5077,27 @@ protected static List<ColumnMetadata> filterNonHiddenColumnMetadata(Collection<C
private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List<Column> columns, List<Column> partitionColumns)
throws Exception
{
createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty());
createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty(), false);
}

private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List<Column> columns, List<Column> partitionColumns, Optional<HiveBucketProperty> bucketProperty)
private void createEmptyTable(
SchemaTableName schemaTableName,
HiveStorageFormat hiveStorageFormat,
List<Column> columns,
List<Column> partitionColumns,
Optional<HiveBucketProperty> bucketProperty)
throws Exception
{
createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, bucketProperty, false);
}

protected void createEmptyTable(
SchemaTableName schemaTableName,
HiveStorageFormat hiveStorageFormat,
List<Column> columns,
List<Column> partitionColumns,
Optional<HiveBucketProperty> bucketProperty,
boolean isTransactional)
throws Exception
{
Path targetPath;
Expand All @@ -5094,14 +5113,18 @@ private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat
LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, Optional.empty());
targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath();

ImmutableMap.Builder<String, String> tableParamBuilder = ImmutableMap.<String, String>builder()
.put(PRESTO_VERSION_NAME, TEST_SERVER_VERSION)
.put(PRESTO_QUERY_ID_NAME, session.getQueryId());
if (isTransactional) {
tableParamBuilder.put(TRANSACTIONAL, "true");
}
Table.Builder tableBuilder = Table.builder()
.setDatabaseName(schemaName)
.setTableName(tableName)
.setOwner(Optional.of(tableOwner))
.setTableType(TableType.MANAGED_TABLE.name())
.setParameters(ImmutableMap.of(
PRESTO_VERSION_NAME, TEST_SERVER_VERSION,
PRESTO_QUERY_ID_NAME, session.getQueryId()))
.setParameters(tableParamBuilder.build())
.setDataColumns(columns)
.setPartitionColumns(partitionColumns);

Expand Down Expand Up @@ -5222,14 +5245,27 @@ public void testPreferredInsertLayout()
@Test
public void testInsertBucketedTableLayout()
throws Exception
{
insertBucketedTableLayout(false);
}

@Test
public void testInsertBucketedTransactionalTableLayout()
throws Exception
{
insertBucketedTableLayout(true);
}

protected void insertBucketedTableLayout(boolean transactional)
throws Exception
{
SchemaTableName tableName = temporaryTable("empty_bucketed_table");
try {
List<Column> columns = ImmutableList.of(
new Column("column1", HIVE_STRING, Optional.empty()),
new Column("column2", HIVE_LONG, Optional.empty()));
HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of());
createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty));
createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty), transactional);

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Expand Down Expand Up @@ -5258,6 +5294,19 @@ public void testInsertBucketedTableLayout()
@Test
public void testInsertPartitionedBucketedTableLayout()
throws Exception
{
insertPartitionedBucketedTableLayout(false);
}

@Test
public void testInsertPartitionedBucketedTransactionalTableLayout()
throws Exception
{
insertPartitionedBucketedTableLayout(true);
}

protected void insertPartitionedBucketedTableLayout(boolean transactional)
throws Exception
{
SchemaTableName tableName = temporaryTable("empty_partitioned_table");
try {
Expand All @@ -5266,7 +5315,7 @@ public void testInsertPartitionedBucketedTableLayout()
new Column("column1", HIVE_STRING, Optional.empty()),
partitioningColumn);
HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of());
createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty));
createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty), transactional);

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestHiveTransactionalTable
extends HiveProductTest
Expand Down Expand Up @@ -1902,6 +1904,58 @@ public void testDeleteAfterMajorCompaction()
});
}

@Test
public void testUnbucketedPartitionedTransactionalTableWithTaskWriterCountGreaterThanOne()
{
unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(true);
}

@Test
public void testUnbucketedTransactionalTableWithTaskWriterCountGreaterThanOne()
{
unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(false);
}

private void unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(boolean isPartitioned)
{
withTemporaryTable(format("test_unbucketed%s_transactional_table_with_task_writer_count_greater_than_one", isPartitioned ? "_partitioned" : ""), true, isPartitioned, NONE, tableName -> {
onTrino().executeQuery(format(
"CREATE TABLE %s " +
"WITH (" +
"format='ORC', " +
"transactional=true " +
"%s" +
") AS SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " +
"FROM tpch.sf1000.orders LIMIT 0", tableName, isPartitioned ? ", partitioned_by = ARRAY['orderpriority']" : ""));
onTrino().executeQuery("SET SESSION scale_writers = true");
onTrino().executeQuery("SET SESSION writer_min_size = '4kB'");
onTrino().executeQuery("SET SESSION task_writer_count = 4");
onTrino().executeQuery("SET SESSION hive.target_max_file_size = '1MB'");

onTrino().executeQuery(
format(
"INSERT INTO %s SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " +
"FROM tpch.sf1000.orders LIMIT 100000", tableName));
assertThat(onTrino().executeQuery(format("SELECT count(*) FROM %s", tableName))).containsOnly(row(100000));
int numberOfCreatedFiles = onTrino().executeQuery(format("SELECT DISTINCT \"$path\" FROM %s", tableName)).getRowsCount();
int expectedNumberOfPartitions = isPartitioned ? 5 : 1;
assertEquals(numberOfCreatedFiles, expectedNumberOfPartitions, format("There should be only %s files created", expectedNumberOfPartitions));

int sizeBeforeDeletion = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size();

onTrino().executeQuery(format("DELETE FROM %s WHERE (orderkey %% 2) = 0", tableName));
assertThat(onTrino().executeQuery(format("SELECT COUNT (orderkey) FROM %s WHERE orderkey %% 2 = 0", tableName))).containsOnly(row(0));

int sizeOnTrinoWithWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size();
int sizeOnHiveWithWhere = onHive().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size();
int sizeOnTrinoWithoutWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size();

assertEquals(sizeOnHiveWithWhere, sizeOnTrinoWithWhere);
assertEquals(sizeOnTrinoWithWhere, sizeOnTrinoWithoutWhere);
assertTrue(sizeBeforeDeletion > sizeOnTrinoWithoutWhere);
});
}

private void hdfsDeleteAll(String directory)
{
if (!hdfsClient.exist(directory)) {
Expand Down