Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount;
import static com.facebook.presto.hive.HiveSessionProperties.isRespectTableFormat;
import static com.facebook.presto.hive.HiveSessionProperties.isShufflePartitionedColumnsForTableWriteEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled;
Expand Down Expand Up @@ -278,6 +279,8 @@ public class HiveMetadata
private static final String PARTITIONS_TABLE_SUFFIX = "$partitions";
private static final String PRESTO_TEMPORARY_TABLE_NAME_PREFIX = "__presto_temporary_table_";

private static final int SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE = 1009;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wenleix Thanks for working on this. I have a few questions.

Note now it writes exactly file per partition, which might slow down the table write. We might want to introduce extra local round robin shuffle to increase number of file writers if necessary.

One file per partition might be too little. Is there a way to still use up to 100 writer threads per node to write files?

What's the significance of 1009? Is this the maximum number of dynamic partitions or something else?

How many nodes will be used for writing? hash_partition_count or some other number?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if larger numbers of writer threads per nodes actually work and if it still works on T1. One case where you would enable this is when a query writes to a large number of partitions on T1s. In that case it can use up all the available memory for the ORC encoding buffers.

I think this is useful as is because it takes a query that doesn't run at all and makes it able to run. That is strictly better than it being unable to run on T1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the bucket count match the actual hash partition count so we don't have to map from a constant number of buckets to a different number of nodes actually executing the stage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mbasmanova and @aweisberg for the comments!

One file per partition might be too little. Is there a way to still use up to 100 writer threads per node to write files?

In current approach it's difficult, as Presto still thinks it's the "table partitioning", thus it will do local exchange comply to table partitioning.

I do agree one file per partition is very inflexible. To solve this, we might want to differentiate between "table partitioning" and "write/shuffle partitioning" . For the later one, the local exchange can be a simple round robin. What do you think , @arhimondr

What's the significance of 1009? Is this the maximum number of dynamic partitions or something else?

I choose a prime number that is large enough (>1000). The reason is Hive bucket function is reported to be degenerated when the bucket column value has some pattern. -- I can cc you on the internal FB post.

How many nodes will be used for writing? hash_partition_count or some other number?

I think it will be max_tasks_per_stage but I can double check.

Copy link
Contributor Author

@wenleix wenleix Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aweisberg

I wonder if larger numbers of writer threads per nodes actually work and if it still works on T1. One case where you would enable this is when a query writes to a large number of partitions on T1s. In that case it can use up all the available memory for the ORC encoding buffers.

I agree. For T1 we probably wants to configure it to 1. But we might still want to have some flexibility in terms of how many files we can have per partition.

Should the bucket count match the actual hash partition count so we don't have to map from a constant number of buckets to a different number of nodes actually executing the stage?

No it doesn't have to. Bucket will be mapped to nodes in a random and "round robin" fashion. See NodePartitioningManager#createArbitraryBucketToNode :

private static List<InternalNode> createArbitraryBucketToNode(List<InternalNode> nodes, int bucketCount)
{
List<InternalNode> shuffledNodes = new ArrayList<>(nodes);
Collections.shuffle(shuffledNodes);
ImmutableList.Builder<InternalNode> distribution = ImmutableList.builderWithExpectedSize(bucketCount);
for (int i = 0; i < bucketCount; i++) {
distribution.add(shuffledNodes.get(i % shuffledNodes.size()));
}
return distribution.build();
}


private final boolean allowCorruptWritesForTesting;
private final SemiTransactionalHiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -2369,9 +2372,26 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table);

if (!hiveBucketHandle.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this case is basically, it hasn't already been bucketed for us by the user, so we are going to pretend it's bucketed when writing out the files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. If the table is bucketed we have to follow the table bucketing. No other way round.

Now I rethink about it, maybe distinguish between table data partitioning and table write shuffle partitioning (or some other name) might actually make code easier to understand. Otherwise -- why it's actually not partitioned by XX but we pretend them to be partitioned by XX? . And this "if bucketed, use bucketing, otherwise, use partition column" can be a bit difficult to understand and maintain.

cc @arhimondr

if (isShufflePartitionedColumnsForTableWriteEnabled(session) && !table.getPartitionColumns().isEmpty()) {
// TODO: the shuffle partitioning doesnt't have to be the same as Hive bucket partitioning
HivePartitioningHandle partitioningHandle = new HivePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
table.getPartitionColumns().stream()
.map(Column::getType)
.collect(Collectors.toList()),
OptionalInt.empty());
List<String> partitionedBy = table.getPartitionColumns().stream()
.map(Column::getName)
.collect(Collectors.toList());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}

return Optional.empty();
}

HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty()
.orElseThrow(() -> new NoSuchElementException("Bucket property should be set"));
if (!bucketProperty.getSortedBy().isEmpty() && !isSortedWritingEnabled(session)) {
Expand All @@ -2396,9 +2416,31 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
validatePartitionColumns(tableMetadata);
Copy link
Contributor

@aweisberg aweisberg Jan 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for when they create the table at the same time as the select and we want to make sure we write it out as if it was bucketed? In other words it won't ever come back and call getInsertLayout before inserting?

validateBucketColumns(tableMetadata);
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());

if (!bucketProperty.isPresent()) {
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
if (isShufflePartitionedColumnsForTableWriteEnabled(session) && !partitionedBy.isEmpty()) {
List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
Map<String, HiveColumnHandle> columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName);
List<Column> partitionColumns = partitionedBy.stream()
.map(columnHandlesByName::get)
.map(column -> new Column(column.getName(), column.getHiveType(), column.getComment()))
.collect(toList());

// TODO: the shuffle partitioning doesnt't have to be the same as Hive bucket partitioning
HivePartitioningHandle partitioningHandle = new HivePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
partitionColumns.stream()
.map(Column::getType)
.collect(Collectors.toList()),
OptionalInt.empty());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}

return Optional.empty();
}

if (!bucketProperty.get().getSortedBy().isEmpty() && !isSortedWritingEnabled(session)) {
throw new PrestoException(NOT_SUPPORTED, "Writing to bucketed sorted Hive tables is disabled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public final class HiveSessionProperties
public static final String COLLECT_COLUMN_STATISTICS_ON_WRITE = "collect_column_statistics_on_write";
private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count";
private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled";
private static final String SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE = "shuffle_partitioned_columns_for_table_write";
private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path";
private static final String TEMPORARY_TABLE_SCHEMA = "temporary_table_schema";
Expand Down Expand Up @@ -410,7 +411,12 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
ORC_ZSTD_JNI_DECOMPRESSION_ENABLED,
"use JNI based zstd decompression for reading ORC files",
hiveClientConfig.isZstdJniDecompressionEnabled(),
true));
true),
booleanProperty(
SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE,
"Shuffle the data on partitioned columns",
false,
false));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -699,6 +705,11 @@ public static boolean isOfflineDataDebugModeEnabled(ConnectorSession session)
return session.getProperty(OFFLINE_DATA_DEBUG_MODE_ENABLED, Boolean.class);
}

public static boolean isShufflePartitionedColumnsForTableWriteEnabled(ConnectorSession session)
{
return session.getProperty(SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, Boolean.class);
}

public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden)
{
return new PropertyMetadata<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,49 @@ private void testCreatePartitionedTableAs(Session session, HiveStorageFormat sto
assertFalse(getQueryRunner().tableExists(session, "test_create_partitioned_table_as"));
}

@Test
public void testCreatePartitionedTableAsShuffleOnPartitionColumns()
{
testCreatePartitionedTableAsShuffleOnPartitionColumns(
Session.builder(getSession())
.setCatalogSessionProperty(catalog, "shuffle_partitioned_columns_for_table_write", "true")
.build(),
HiveStorageFormat.ORC);
}

public void testCreatePartitionedTableAsShuffleOnPartitionColumns(Session session, HiveStorageFormat storageFormat)
{
@Language("SQL") String createTable = "" +
"CREATE TABLE test_create_partitioned_table_as_shuffle_on_partition_columns " +
"WITH (" +
"format = '" + storageFormat + "', " +
"partitioned_by = ARRAY[ 'SHIP_PRIORITY', 'ORDER_STATUS' ]" +
") " +
"AS " +
"SELECT orderkey AS order_key, shippriority AS ship_priority, orderstatus AS order_status " +
"FROM tpch.tiny.orders";

assertUpdate(session, createTable, "SELECT count(*) from orders");

TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, "test_create_partitioned_table_as_shuffle_on_partition_columns");
assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat);
assertEquals(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY), ImmutableList.of("ship_priority", "order_status"));

List<?> partitions = getPartitions("test_create_partitioned_table_as_shuffle_on_partition_columns");
assertEquals(partitions.size(), 3);

assertQuery(
session,
"SELECT count(distinct \"$path\") from test_create_partitioned_table_as_shuffle_on_partition_columns",
"SELECT 3");

assertQuery(session, "SELECT * from test_create_partitioned_table_as_shuffle_on_partition_columns", "SELECT orderkey, shippriority, orderstatus FROM orders");

assertUpdate(session, "DROP TABLE test_create_partitioned_table_as_shuffle_on_partition_columns");

assertFalse(getQueryRunner().tableExists(session, "test_create_partitioned_table_as_shuffle_on_partition_columns"));
}

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Partition keys must be the last columns in the table and in the same order as the table properties.*")
public void testCreatePartitionedTableInvalidColumnOrdering()
{
Expand Down Expand Up @@ -1328,6 +1371,64 @@ private void testInsertPartitionedTable(Session session, HiveStorageFormat stora
assertFalse(getQueryRunner().tableExists(session, "test_insert_partitioned_table"));
}

@Test
public void testInsertPartitionedTableShuffleOnPartitionColumns()
{
testInsertPartitionedTableShuffleOnPartitionColumns(
Session.builder(getSession())
.setCatalogSessionProperty(catalog, "shuffle_partitioned_columns_for_table_write", "true")
.build(),
HiveStorageFormat.ORC);
}

public void testInsertPartitionedTableShuffleOnPartitionColumns(Session session, HiveStorageFormat storageFormat)
{
String tableName = "test_insert_partitioned_table_shuffle_on_partition_columns";

@Language("SQL") String createTable = "" +
"CREATE TABLE " + tableName + " " +
"(" +
" order_key BIGINT," +
" comment VARCHAR," +
" order_status VARCHAR" +
") " +
"WITH (" +
"format = '" + storageFormat + "', " +
"partitioned_by = ARRAY[ 'order_status' ]" +
") ";

assertUpdate(session, createTable);

TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, tableName);
assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat);
assertEquals(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY), ImmutableList.of("order_status"));

assertUpdate(
session,
"INSERT INTO " + tableName + " " +
"SELECT orderkey, comment, orderstatus " +
"FROM tpch.tiny.orders",
"SELECT count(*) from orders");

// verify the partitions
List<?> partitions = getPartitions(tableName);
assertEquals(partitions.size(), 3);

assertQuery(
session,
"SELECT * from " + tableName,
"SELECT orderkey, comment, orderstatus FROM orders");

assertQuery(
session,
"SELECT count(distinct \"$path\") from " + tableName,
"SELECT 3");

assertUpdate(session, "DROP TABLE " + tableName);

assertFalse(getQueryRunner().tableExists(session, tableName));
}

@Test
public void testInsertPartitionedTableExistingPartition()
{
Expand Down