diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 63e0f8936b67e..c6810c1e60dd5 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -171,6 +171,7 @@ import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount; import static com.facebook.presto.hive.HiveSessionProperties.isRespectTableFormat; +import static com.facebook.presto.hive.HiveSessionProperties.isShufflePartitionedColumnsForTableWriteEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled; @@ -278,6 +279,8 @@ public class HiveMetadata private static final String PARTITIONS_TABLE_SUFFIX = "$partitions"; private static final String PRESTO_TEMPORARY_TABLE_NAME_PREFIX = "__presto_temporary_table_"; + private static final int SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE = 1009; + private final boolean allowCorruptWritesForTesting; private final SemiTransactionalHiveMetastore metastore; private final HdfsEnvironment hdfsEnvironment; @@ -2369,9 +2372,26 @@ public Optional getInsertLayout(ConnectorSession sessio .orElseThrow(() -> new TableNotFoundException(tableName)); Optional hiveBucketHandle = getHiveBucketHandle(table); + if (!hiveBucketHandle.isPresent()) { + if (isShufflePartitionedColumnsForTableWriteEnabled(session) && !table.getPartitionColumns().isEmpty()) { + // TODO: the shuffle partitioning doesnt't have to be the same as Hive bucket partitioning + HivePartitioningHandle partitioningHandle = new HivePartitioningHandle( + SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, + table.getPartitionColumns().stream() + .map(Column::getType) + .collect(Collectors.toList()), + OptionalInt.empty()); + List partitionedBy = table.getPartitionColumns().stream() + .map(Column::getName) + .collect(Collectors.toList()); + + return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); + } + return Optional.empty(); } + HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty() .orElseThrow(() -> new NoSuchElementException("Bucket property should be set")); if (!bucketProperty.getSortedBy().isEmpty() && !isSortedWritingEnabled(session)) { @@ -2396,9 +2416,31 @@ public Optional getNewTableLayout(ConnectorSession sess validatePartitionColumns(tableMetadata); validateBucketColumns(tableMetadata); Optional bucketProperty = getBucketProperty(tableMetadata.getProperties()); + if (!bucketProperty.isPresent()) { + List partitionedBy = getPartitionedBy(tableMetadata.getProperties()); + if (isShufflePartitionedColumnsForTableWriteEnabled(session) && !partitionedBy.isEmpty()) { + List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator); + Map columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName); + List partitionColumns = partitionedBy.stream() + .map(columnHandlesByName::get) + .map(column -> new Column(column.getName(), column.getHiveType(), column.getComment())) + .collect(toList()); + + // TODO: the shuffle partitioning doesnt't have to be the same as Hive bucket partitioning + HivePartitioningHandle partitioningHandle = new HivePartitioningHandle( + SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, + partitionColumns.stream() + .map(Column::getType) + .collect(Collectors.toList()), + OptionalInt.empty()); + + return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); + } + return Optional.empty(); } + if (!bucketProperty.get().getSortedBy().isEmpty() && !isSortedWritingEnabled(session)) { throw new PrestoException(NOT_SUPPORTED, "Writing to bucketed sorted Hive tables is disabled"); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index b4b0d97ba41d2..ce3ba40def83f 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -82,6 +82,7 @@ public final class HiveSessionProperties public static final String COLLECT_COLUMN_STATISTICS_ON_WRITE = "collect_column_statistics_on_write"; private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count"; private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled"; + private static final String SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE = "shuffle_partitioned_columns_for_table_write"; private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled"; private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path"; private static final String TEMPORARY_TABLE_SCHEMA = "temporary_table_schema"; @@ -410,7 +411,12 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon ORC_ZSTD_JNI_DECOMPRESSION_ENABLED, "use JNI based zstd decompression for reading ORC files", hiveClientConfig.isZstdJniDecompressionEnabled(), - true)); + true), + booleanProperty( + SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, + "Shuffle the data on partitioned columns", + false, + false)); } public List> getSessionProperties() @@ -699,6 +705,11 @@ public static boolean isOfflineDataDebugModeEnabled(ConnectorSession session) return session.getProperty(OFFLINE_DATA_DEBUG_MODE_ENABLED, Boolean.class); } + public static boolean isShufflePartitionedColumnsForTableWriteEnabled(ConnectorSession session) + { + return session.getProperty(SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, Boolean.class); + } + public static PropertyMetadata dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden) { return new PropertyMetadata<>( diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index 10704e14b8849..21bcc3a528914 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -661,6 +661,49 @@ private void testCreatePartitionedTableAs(Session session, HiveStorageFormat sto assertFalse(getQueryRunner().tableExists(session, "test_create_partitioned_table_as")); } + @Test + public void testCreatePartitionedTableAsShuffleOnPartitionColumns() + { + testCreatePartitionedTableAsShuffleOnPartitionColumns( + Session.builder(getSession()) + .setCatalogSessionProperty(catalog, "shuffle_partitioned_columns_for_table_write", "true") + .build(), + HiveStorageFormat.ORC); + } + + public void testCreatePartitionedTableAsShuffleOnPartitionColumns(Session session, HiveStorageFormat storageFormat) + { + @Language("SQL") String createTable = "" + + "CREATE TABLE test_create_partitioned_table_as_shuffle_on_partition_columns " + + "WITH (" + + "format = '" + storageFormat + "', " + + "partitioned_by = ARRAY[ 'SHIP_PRIORITY', 'ORDER_STATUS' ]" + + ") " + + "AS " + + "SELECT orderkey AS order_key, shippriority AS ship_priority, orderstatus AS order_status " + + "FROM tpch.tiny.orders"; + + assertUpdate(session, createTable, "SELECT count(*) from orders"); + + TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, "test_create_partitioned_table_as_shuffle_on_partition_columns"); + assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat); + assertEquals(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY), ImmutableList.of("ship_priority", "order_status")); + + List partitions = getPartitions("test_create_partitioned_table_as_shuffle_on_partition_columns"); + assertEquals(partitions.size(), 3); + + assertQuery( + session, + "SELECT count(distinct \"$path\") from test_create_partitioned_table_as_shuffle_on_partition_columns", + "SELECT 3"); + + assertQuery(session, "SELECT * from test_create_partitioned_table_as_shuffle_on_partition_columns", "SELECT orderkey, shippriority, orderstatus FROM orders"); + + assertUpdate(session, "DROP TABLE test_create_partitioned_table_as_shuffle_on_partition_columns"); + + assertFalse(getQueryRunner().tableExists(session, "test_create_partitioned_table_as_shuffle_on_partition_columns")); + } + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Partition keys must be the last columns in the table and in the same order as the table properties.*") public void testCreatePartitionedTableInvalidColumnOrdering() { @@ -1328,6 +1371,64 @@ private void testInsertPartitionedTable(Session session, HiveStorageFormat stora assertFalse(getQueryRunner().tableExists(session, "test_insert_partitioned_table")); } + @Test + public void testInsertPartitionedTableShuffleOnPartitionColumns() + { + testInsertPartitionedTableShuffleOnPartitionColumns( + Session.builder(getSession()) + .setCatalogSessionProperty(catalog, "shuffle_partitioned_columns_for_table_write", "true") + .build(), + HiveStorageFormat.ORC); + } + + public void testInsertPartitionedTableShuffleOnPartitionColumns(Session session, HiveStorageFormat storageFormat) + { + String tableName = "test_insert_partitioned_table_shuffle_on_partition_columns"; + + @Language("SQL") String createTable = "" + + "CREATE TABLE " + tableName + " " + + "(" + + " order_key BIGINT," + + " comment VARCHAR," + + " order_status VARCHAR" + + ") " + + "WITH (" + + "format = '" + storageFormat + "', " + + "partitioned_by = ARRAY[ 'order_status' ]" + + ") "; + + assertUpdate(session, createTable); + + TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, tableName); + assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat); + assertEquals(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY), ImmutableList.of("order_status")); + + assertUpdate( + session, + "INSERT INTO " + tableName + " " + + "SELECT orderkey, comment, orderstatus " + + "FROM tpch.tiny.orders", + "SELECT count(*) from orders"); + + // verify the partitions + List partitions = getPartitions(tableName); + assertEquals(partitions.size(), 3); + + assertQuery( + session, + "SELECT * from " + tableName, + "SELECT orderkey, comment, orderstatus FROM orders"); + + assertQuery( + session, + "SELECT count(distinct \"$path\") from " + tableName, + "SELECT 3"); + + assertUpdate(session, "DROP TABLE " + tableName); + + assertFalse(getQueryRunner().tableExists(session, tableName)); + } + @Test public void testInsertPartitionedTableExistingPartition() {