diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index d9c4ecd96af03..1922032354065 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -30,6 +30,8 @@ import com.amazonaws.services.glue.model.AlreadyExistsException; import com.amazonaws.services.glue.model.BatchCreatePartitionRequest; import com.amazonaws.services.glue.model.BatchCreatePartitionResult; +import com.amazonaws.services.glue.model.BatchDeletePartitionRequest; +import com.amazonaws.services.glue.model.BatchDeletePartitionResult; import com.amazonaws.services.glue.model.BatchUpdatePartitionRequest; import com.amazonaws.services.glue.model.BatchUpdatePartitionRequestEntry; import com.amazonaws.services.glue.model.BatchUpdatePartitionResult; @@ -45,6 +47,7 @@ import com.amazonaws.services.glue.model.GetPartitionsResult; import com.amazonaws.services.glue.model.GetTableRequest; import com.amazonaws.services.glue.model.PartitionInput; +import com.amazonaws.services.glue.model.PartitionValueList; import com.amazonaws.services.glue.model.SerDeInfo; import com.amazonaws.services.glue.model.StorageDescriptor; import com.amazonaws.services.glue.model.Table; @@ -185,7 +188,35 @@ public void updatePartitionsToTable(String tableName, List changedPartit @Override public void dropPartitions(String tableName, List partitionsToDrop) { - throw new UnsupportedOperationException("Not support dropPartitionsToTable yet."); + if (CollectionUtils.isNullOrEmpty(partitionsToDrop)) { + LOG.info("No partitions to drop for " + tableName); + return; + } + LOG.info("Drop " + partitionsToDrop.size() + "partition(s) in table " + tableId(databaseName, tableName)); + try { + for (List batch : CollectionUtils.batches(partitionsToDrop, MAX_PARTITIONS_PER_REQUEST)) { + + List partitionValueLists = batch.stream().map(partition -> { + PartitionValueList partitionValueList = new PartitionValueList(); + partitionValueList.setValues(partitionValueExtractor.extractPartitionValuesInPath(partition)); + return partitionValueList; + }).collect(Collectors.toList()); + + BatchDeletePartitionRequest batchDeletePartitionRequest = new BatchDeletePartitionRequest() + .withDatabaseName(databaseName) + .withTableName(tableName) + .withPartitionsToDelete(partitionValueLists); + + BatchDeletePartitionResult result = awsGlue.batchDeletePartition(batchDeletePartitionRequest); + if (CollectionUtils.nonEmpty(result.getErrors())) { + throw new HoodieGlueSyncException("Fail to drop partitions to " + tableId(databaseName, tableName) + + " with error(s): " + result.getErrors()); + } + Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS); + } + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to drop partitions to " + tableId(databaseName, tableName), e); + } } /**