Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, S
*/
public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context, String instantTime, K keys);

/**
* Deletes all data of partitions.
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param partitions {@link List} of partition to be deleted
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions);

/**
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context
return new FlinkDeleteCommitActionExecutor<>(context, config, this, instantTime, keys).execute();
}

@Override
public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
throw new HoodieNotSupportedException("DeletePartitions is not supported yet");
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> preppedRecords) {
return new FlinkUpsertPreppedCommitActionExecutor<>(context, config, this, instantTime, preppedRecords).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context
throw new HoodieNotSupportedException("Delete is not supported yet");
}

@Override
public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
throw new HoodieNotSupportedException("Delete partitions is not supported yet");
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String instantTime)
return postWrite(result, instantTime, table);
}

public HoodieWriteResult deletePartitions(List<String> partitions, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime);
setOperationType(WriteOperationType.DELETE_PARTITION);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.deletePartitions(context,instantTime, partitions);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}

@Override
protected JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata<JavaRDD<WriteStatus>> result,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeletePartitionCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
Expand Down Expand Up @@ -108,6 +109,11 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> delete(HoodieEngineContext cont
return new SparkDeleteCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute();
}

@Override
public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
return new SparkDeletePartitionCommitActionExecutor(context, config, this, instantTime, partitions).execute();
}

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SparkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends SparkInsertOverwriteCommitActionExecutor<T> {

private List<String> partitions;
public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, List<String> partitions) {
super(context, config, table, instantTime,null, WriteOperationType.DELETE_PARTITION);
this.partitions = partitions;
}

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
HoodieTimer timer = new HoodieTimer().startTimer();
Map<String, List<String>> partitionToReplaceFileIds = jsc.parallelize(partitions, partitions.size()).distinct()
.mapToPair(partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
HoodieWriteMetadata result = new HoodieWriteMetadata();
result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));

result.setWriteStatuses(jsc.emptyRDD());
this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
this.commitOnAutoCommit(result);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteS
new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
}

private List<String> getAllExistingFileIds(String partitionPath) {
protected List<String> getAllExistingFileIds(String partitionPath) {
// because new commit is not complete. it is safe to mark all existing file Ids as old files
return table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends SparkInsertOverwriteCommitActionExecutor<T> {
Expand All @@ -47,11 +46,6 @@ public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context
super(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE_TABLE);
}

protected List<String> getAllExistingFileIds(String partitionPath) {
return table.getSliceView().getLatestFileSlices(partitionPath)
.map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
}

@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ public void testInsertOverwritePartitionHandlingWithFewerRecords() throws Except
* Test scenario of writing similar number file groups in partition.
*/
@Test
public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() throws Exception {
public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords() throws Exception {
verifyInsertOverwritePartitionHandling(3000, 3000);
}

Expand Down Expand Up @@ -1061,6 +1061,109 @@ private Set<String> getFileIdsFromWriteStatus(List<WriteStatus> statuses) {
return statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
}

/**
* Test scenario of writing fewer file groups for first partition than second an third partition.
*/
@Test
public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition() throws Exception {
verifyDeletePartitionsHandling(1000, 3000, 3000);
}

/**
* Test scenario of writing similar number file groups in partition.
*/
@Test
public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords() throws Exception {
verifyDeletePartitionsHandling(3000, 3000, 3000);
}

/**
* Test scenario of writing more file groups for first partition than second an third partition.
*/
@Test
public void verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition() throws Exception {
verifyDeletePartitionsHandling(3000, 1000, 1000);
}

private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient client, int recordsCount, String commitTime1, String partitionPath) {
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
Set<String> batchBuckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
verifyRecordsWritten(commitTime1, inserts1, statuses);
return batchBuckets;
}

private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, String commitTime, List<String> deletePartitionPath) {
client.startCommitWithTime(commitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
HoodieWriteResult writeResult = client.deletePartitions(deletePartitionPath, commitTime);
Set<String> deletePartitionReplaceFileIds =
writeResult.getPartitionToReplaceFileIds().entrySet()
.stream().flatMap(entry -> entry.getValue().stream()).collect(Collectors.toSet());
return deletePartitionReplaceFileIds;
}

/**
* 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for first partition.
* 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for second partition.
* 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for third partition.
* 4) delete first partition and check result.
* 5) delete second and third partition and check result.
*
*/
private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount) throws Exception {
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
dataGen = new HoodieTestDataGenerator();

// Do Inserts for DEFAULT_FIRST_PARTITION_PATH
String commitTime1 = "001";
Set<String> batch1Buckets =
this.insertPartitionRecordsWithCommit(client, batch1RecordsCount, commitTime1, DEFAULT_FIRST_PARTITION_PATH);

// Do Inserts for DEFAULT_SECOND_PARTITION_PATH
String commitTime2 = "002";
Set<String> batch2Buckets =
this.insertPartitionRecordsWithCommit(client, batch2RecordsCount, commitTime2, DEFAULT_SECOND_PARTITION_PATH);

// Do Inserts for DEFAULT_THIRD_PARTITION_PATH
String commitTime3 = "003";
Set<String> batch3Buckets =
this.insertPartitionRecordsWithCommit(client, batch3RecordsCount, commitTime3, DEFAULT_THIRD_PARTITION_PATH);

// delete DEFAULT_FIRST_PARTITION_PATH
String commitTime4 = "004";
Set<String> deletePartitionReplaceFileIds1 =
deletePartitionWithCommit(client, commitTime4, Arrays.asList(DEFAULT_FIRST_PARTITION_PATH));
assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
List<HoodieBaseFile> baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH));
assertEquals(0, baseFiles.size());
baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH));
assertTrue(baseFiles.size() > 0);
baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
assertTrue(baseFiles.size() > 0);

// delete DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH
String commitTime5 = "005";
Set<String> deletePartitionReplaceFileIds2 =
deletePartitionWithCommit(client, commitTime5, Arrays.asList(DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH));
Set<String> expectedFileId = new HashSet<>();
expectedFileId.addAll(batch2Buckets);
expectedFileId.addAll(batch3Buckets);
assertEquals(expectedFileId, deletePartitionReplaceFileIds2);

baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH),
String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH),
String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
assertEquals(0, baseFiles.size());
}

/**
* Verify data in parquet files matches expected records and commit time.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,32 @@ public static long countRecordsSince(JavaSparkContext jsc, String basePath, SQLC
}
}

public static List<HoodieBaseFile> getLatestBaseFiles(String basePath, FileSystem fs,
String... paths) {
List<HoodieBaseFile> latestFiles = new ArrayList<>();
try {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
for (String path : paths) {
BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));
latestFiles.addAll(fileSystemView.getLatestBaseFiles().collect(Collectors.toList()));
}
} catch (Exception e) {
throw new HoodieException("Error reading hoodie table as a dataframe", e);
}
return latestFiles;
}

/**
* Reads the paths under the a hoodie table out as a DataFrame.
*/
public static Dataset<Row> read(JavaSparkContext jsc, String basePath, SQLContext sqlContext, FileSystem fs,
String... paths) {
List<String> filteredPaths = new ArrayList<>();
try {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
for (String path : paths) {
BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));
List<HoodieBaseFile> latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
for (HoodieBaseFile file : latestFiles) {
filteredPaths.add(file.getPath());
}
List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, fs, paths);
for (HoodieBaseFile file : latestFiles) {
filteredPaths.add(file.getPath());
}
return sqlContext.read().parquet(filteredPaths.toArray(new String[filteredPaths.size()]));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public enum WriteOperationType {
INSERT_OVERWRITE("insert_overwrite"),
// cluster
CLUSTER("cluster"),
// delete partition
DELETE_PARTITION("delete_partition"),
// insert overwrite with dynamic partitioning
INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// used for old version
Expand Down Expand Up @@ -74,6 +76,8 @@ public static WriteOperationType fromValue(String value) {
return DELETE;
case "insert_overwrite":
return INSERT_OVERWRITE;
case "delete_partition":
return DELETE_PARTITION;
case "insert_overwrite_table":
return INSERT_OVERWRITE_TABLE;
case "cluster":
Expand Down