Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount;
import static com.facebook.presto.hive.HiveSessionProperties.isRespectTableFormat;
import static com.facebook.presto.hive.HiveSessionProperties.isShufflePartitionedColumnsForTableWriteEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled;
Expand Down Expand Up @@ -289,6 +290,12 @@ public class HiveMetadata
Optional.empty(),
Optional.empty(),
emptyList());

// 1009 is chosen as a prime number greater than 1000.
// This is because Hive bucket function can result in skewed distribution when bucket number of power of 2
// TODO: Use a regular number once better hash function is used for table write shuffle partitioning.
private static final int SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE = 1009;

private final boolean allowCorruptWritesForTesting;
private final SemiTransactionalHiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -2418,6 +2425,39 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionColumns));
}

@Override
public Optional<ConnectorNewTableLayout> getPreferredShuffleLayoutForInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
SchemaTableName tableName = hiveTableHandle.getSchemaTableName();
Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table);
if (hiveBucketHandle.isPresent()) {
// For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected,
// and there is no additional preferred shuffle partitioning
return Optional.empty();
}

if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) {
return Optional.empty();
}

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = new HivePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
table.getPartitionColumns().stream()
.map(Column::getType)
.collect(Collectors.toList()),
OptionalInt.empty());
List<String> partitionedBy = table.getPartitionColumns().stream()
.map(Column::getName)
.collect(Collectors.toList());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}

@Override
public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand All @@ -2444,6 +2484,41 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
bucketedBy));
}

@Override
public Optional<ConnectorNewTableLayout> getPreferredShuffleLayoutForNewTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
validatePartitionColumns(tableMetadata);
validateBucketColumns(tableMetadata);
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
if (bucketProperty.isPresent()) {
// For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected,
// and there is no additional preferred shuffle partitioning
return Optional.empty();
}

List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) {
return Optional.empty();
}

List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
Map<String, HiveColumnHandle> columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName);
List<Column> partitionColumns = partitionedBy.stream()
.map(columnHandlesByName::get)
.map(column -> new Column(column.getName(), column.getHiveType(), column.getComment()))
.collect(toList());

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = new HivePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
partitionColumns.stream()
.map(Column::getType)
.collect(Collectors.toList()),
OptionalInt.empty());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public final class HiveSessionProperties
public static final String COLLECT_COLUMN_STATISTICS_ON_WRITE = "collect_column_statistics_on_write";
private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count";
private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled";
private static final String SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE = "shuffle_partitioned_columns_for_table_write";
private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path";
private static final String TEMPORARY_TABLE_SCHEMA = "temporary_table_schema";
Expand Down Expand Up @@ -410,7 +411,12 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
ORC_ZSTD_JNI_DECOMPRESSION_ENABLED,
"use JNI based zstd decompression for reading ORC files",
hiveClientConfig.isZstdJniDecompressionEnabled(),
true));
true),
booleanProperty(
SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE,
"Shuffle the data on partitioned columns",
false,
false));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -699,6 +705,11 @@ public static boolean isOfflineDataDebugModeEnabled(ConnectorSession session)
return session.getProperty(OFFLINE_DATA_DEBUG_MODE_ENABLED, Boolean.class);
}

public static boolean isShufflePartitionedColumnsForTableWriteEnabled(ConnectorSession session)
{
return session.getProperty(SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, Boolean.class);
}

public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden)
{
return new PropertyMetadata<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,50 @@ private void testCreatePartitionedTableAs(Session session, HiveStorageFormat sto
assertFalse(getQueryRunner().tableExists(session, "test_create_partitioned_table_as"));
}

@Test
public void testCreatePartitionedTableAsShuffleOnPartitionColumns()
{
testCreatePartitionedTableAsShuffleOnPartitionColumns(
Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.setCatalogSessionProperty(catalog, "shuffle_partitioned_columns_for_table_write", "true")
.build(),
HiveStorageFormat.ORC);
}

public void testCreatePartitionedTableAsShuffleOnPartitionColumns(Session session, HiveStorageFormat storageFormat)
{
@Language("SQL") String createTable = "" +
"CREATE TABLE test_create_partitioned_table_as_shuffle_on_partition_columns " +
"WITH (" +
"format = '" + storageFormat + "', " +
"partitioned_by = ARRAY[ 'SHIP_PRIORITY', 'ORDER_STATUS' ]" +
") " +
"AS " +
"SELECT orderkey AS order_key, shippriority AS ship_priority, orderstatus AS order_status " +
"FROM tpch.tiny.orders";

assertUpdate(session, createTable, "SELECT count(*) from orders");

TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, "test_create_partitioned_table_as_shuffle_on_partition_columns");
assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat);
assertEquals(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY), ImmutableList.of("ship_priority", "order_status"));

List<?> partitions = getPartitions("test_create_partitioned_table_as_shuffle_on_partition_columns");
assertEquals(partitions.size(), 3);

assertQuery(
session,
"SELECT count(distinct \"$path\") from test_create_partitioned_table_as_shuffle_on_partition_columns",
"SELECT 3");

assertQuery(session, "SELECT * from test_create_partitioned_table_as_shuffle_on_partition_columns", "SELECT orderkey, shippriority, orderstatus FROM orders");

assertUpdate(session, "DROP TABLE test_create_partitioned_table_as_shuffle_on_partition_columns");

assertFalse(getQueryRunner().tableExists(session, "test_create_partitioned_table_as_shuffle_on_partition_columns"));
}

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Partition keys must be the last columns in the table and in the same order as the table properties.*")
public void testCreatePartitionedTableInvalidColumnOrdering()
{
Expand Down Expand Up @@ -1328,6 +1372,65 @@ private void testInsertPartitionedTable(Session session, HiveStorageFormat stora
assertFalse(getQueryRunner().tableExists(session, "test_insert_partitioned_table"));
}

@Test
public void testInsertPartitionedTableShuffleOnPartitionColumns()
{
testInsertPartitionedTableShuffleOnPartitionColumns(
Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.setCatalogSessionProperty(catalog, "shuffle_partitioned_columns_for_table_write", "true")
.build(),
HiveStorageFormat.ORC);
}

public void testInsertPartitionedTableShuffleOnPartitionColumns(Session session, HiveStorageFormat storageFormat)
{
String tableName = "test_insert_partitioned_table_shuffle_on_partition_columns";

@Language("SQL") String createTable = "" +
"CREATE TABLE " + tableName + " " +
"(" +
" order_key BIGINT," +
" comment VARCHAR," +
" order_status VARCHAR" +
") " +
"WITH (" +
"format = '" + storageFormat + "', " +
"partitioned_by = ARRAY[ 'order_status' ]" +
") ";

assertUpdate(session, createTable);

TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, tableName);
assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat);
assertEquals(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY), ImmutableList.of("order_status"));

assertUpdate(
session,
"INSERT INTO " + tableName + " " +
"SELECT orderkey, comment, orderstatus " +
"FROM tpch.tiny.orders",
"SELECT count(*) from orders");

// verify the partitions
List<?> partitions = getPartitions(tableName);
assertEquals(partitions.size(), 3);

assertQuery(
session,
"SELECT * from " + tableName,
"SELECT orderkey, comment, orderstatus FROM orders");

assertQuery(
session,
"SELECT count(distinct \"$path\") from " + tableName,
"SELECT 3");

assertUpdate(session, "DROP TABLE " + tableName);

assertFalse(getQueryRunner().tableExists(session, tableName));
}

@Test
public void testInsertPartitionedTableExistingPartition()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ public interface Metadata

Optional<NewTableLayout> getNewTableLayout(Session session, String catalogName, ConnectorTableMetadata tableMetadata);

@Experimental
Optional<NewTableLayout> getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata);

/**
* Begin the atomic creation of a table with data.
*/
Expand All @@ -244,6 +247,9 @@ public interface Metadata

Optional<NewTableLayout> getInsertLayout(Session session, TableHandle target);

@Experimental
Optional<NewTableLayout> getPreferredShuffleLayoutForInsert(Session session, TableHandle target);

/**
* Describes statistics that must be collected during a write.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,17 @@ public Optional<NewTableLayout> getInsertLayout(Session session, TableHandle tab
.map(layout -> new NewTableLayout(connectorId, catalogMetadata.getTransactionHandleFor(connectorId), layout));
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForInsert(Session session, TableHandle table)
{
ConnectorId connectorId = table.getConnectorId();
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, connectorId);
ConnectorMetadata metadata = catalogMetadata.getMetadata();

return metadata.getPreferredShuffleLayoutForInsert(session.toConnectorSession(connectorId), table.getConnectorHandle())
.map(layout -> new NewTableLayout(connectorId, catalogMetadata.getTransactionHandleFor(connectorId), layout));
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
Expand Down Expand Up @@ -783,6 +794,19 @@ public Optional<NewTableLayout> getNewTableLayout(Session session, String catalo
.map(layout -> new NewTableLayout(connectorId, transactionHandle, layout));
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName);
ConnectorId connectorId = catalogMetadata.getConnectorId();
ConnectorMetadata metadata = catalogMetadata.getMetadata();

ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(connectorId);
ConnectorSession connectorSession = session.toConnectorSession(connectorId);
return metadata.getPreferredShuffleLayoutForNewTable(connectorSession, tableMetadata)
.map(layout -> new NewTableLayout(connectorId, transactionHandle, layout));
}

@Override
public void beginQuery(Session session, Set<ConnectorId> connectors)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2195,7 +2195,7 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont
public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context)
{
// Set table writer count
if (node.getPartitioningScheme().isPresent()) {
if (node.getTablePartitioningScheme().isPresent()) {
context.setDriverInstanceCount(getTaskPartitionedWriterCount(session));
}
else {
Expand Down
Loading