Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, S
* @param partitions {@link List} of partition to be deleted
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions);
public abstract HoodieWriteMetadata<O> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions);

/**
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
return postWrite(result, instantTime, table);
}

public List<WriteStatus> deletePartitions(List<String> partitions, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.deletePartitions(context, instantTime, partitions);
return postWrite(result, instantTime, table);
}

@Override
public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeletePartitionCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor;
Expand Down Expand Up @@ -243,8 +244,8 @@ public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context
}

@Override
public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
throw new HoodieNotSupportedException("DeletePartitions is not supported yet");
public HoodieWriteMetadata<List<WriteStatus>> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
return new FlinkDeletePartitionCommitActionExecutor<>(context, config, this, instantTime, partitions).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieDeletePartitionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;

public class FlinkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends FlinkInsertOverwriteCommitActionExecutor<T> {

private final List<String> partitions;

public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<?, ?, ?, ?> table,
String instantTime,
List<String> partitions) {
super(context, null, config, table, instantTime, null, WriteOperationType.DELETE_PARTITION);
this.partitions = partitions;
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
try {
HoodieTimer timer = new HoodieTimer().startTimer();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
Map<String, List<String>> partitionToReplaceFileIds =
context.parallelize(partitions).distinct().collectAsList()
.stream().collect(Collectors.toMap(partitionPath -> partitionPath, this::getAllExistingFileIds));
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
result.setWriteStatuses(Collections.emptyList());

// created requested
HoodieInstant dropPartitionsInstant = new HoodieInstant(REQUESTED, REPLACE_COMMIT_ACTION, instantTime);
if (!table.getMetaClient().getFs().exists(new Path(table.getMetaClient().getMetaPath(),
dropPartitionsInstant.getFileName()))) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.DELETE_PARTITION.name())
.setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
.build();
table.getMetaClient().getActiveTimeline().saveToPendingReplaceCommit(dropPartitionsInstant,
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
}

this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())),
instantTime);
this.commitOnAutoCommit(result);
return result;
} catch (Exception e) {
throw new HoodieDeletePartitionException("Failed to drop partitions for commit time " + instantTime, e);
}
}

private List<String> getAllExistingFileIds(String partitionPath) {
// because new commit is not complete. it is safe to mark all existing file Ids as old files
return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;

import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down Expand Up @@ -382,7 +387,16 @@ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec

@Override
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec) throws CatalogException {
return false;
if (!tableExists(tablePath)) {
return false;
}
String tablePathStr = inferTablePath(catalogPathStr, tablePath);
Map<String, String> options = TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
boolean hiveStylePartitioning = Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "false"));
return StreamerUtil.partitionExists(
inferTablePath(catalogPathStr, tablePath),
HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, catalogPartitionSpec),
hadoopConf);
}

@Override
Expand All @@ -394,7 +408,40 @@ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPa
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException("dropPartition is not implemented.");
if (!tableExists(tablePath)) {
if (ignoreIfNotExists) {
return;
} else {
throw new PartitionNotExistException(getName(), tablePath, catalogPartitionSpec);
}
}

String tablePathStr = inferTablePath(catalogPathStr, tablePath);
Map<String, String> options = TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
boolean hiveStylePartitioning = Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "false"));
String partitionPathStr = HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, catalogPartitionSpec);

if (!StreamerUtil.partitionExists(tablePathStr, partitionPathStr, hadoopConf)) {
if (ignoreIfNotExists) {
return;
} else {
throw new PartitionNotExistException(getName(), tablePath, catalogPartitionSpec);
}
}

// enable auto-commit though ~
options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(options, tablePathStr, tablePath)) {
writeClient.deletePartitions(Collections.singletonList(partitionPathStr), HoodieActiveTimeline.createNewInstantTime())
.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
throw new HoodieMetadataException(String.format("Failed to commit metadata table records at file id %s.", writeStatus.getFileId()));
}
});
fs.delete(new Path(tablePathStr, partitionPathStr), true);
Copy link
Contributor

@TengHuo TengHuo Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@SteNicholas SteNicholas Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TengHuo, like HoodieHiveCatalog, the dropPartition operation needs to drop the partition meta and directory on the filesystem. Otherwise, after the dropPartition operation, as you mentioned, this will cause the unvalid data files in the dropped partition if there is insert operation and no cleaner to clean the data files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

So, if I understand correctly, in spark side, there will be the invalid data files in the dropped partition if there is insert operation and no cleaner to clean the data files. Am I right?

And may I ask what the invalid data files issue is exactly? Do you have a ticket about it?

Voon and me are checking the code about drop partitions, we may fix it if there is any.

Copy link
Member Author

@SteNicholas SteNicholas Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TengHuo, the invalid data files after dropping partition mean that the data files is dirty data. Meanwhile, why should the dropPartition behavior keeps the consistency between Flink and Spark? If should, IMO, this should provider unified interface to drop partition.
cc @voonhous

} catch (Exception e) {
throw new CatalogException(String.format("Dropping partition %s of table %s exception.", partitionPathStr, tablePath), e);
}
}

@Override
Expand Down Expand Up @@ -505,7 +552,20 @@ private Map<String, String> applyOptionsHook(String tablePath, Map<String, Strin
return newOptions;
}

private String inferTablePath(String catalogPath, ObjectPath tablePath) {
private HoodieFlinkWriteClient<?> createWriteClient(
Map<String, String> options,
String tablePathStr,
ObjectPath tablePath) throws IOException {
return StreamerUtil.createWriteClient(
Configuration.fromMap(options)
.set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
.set(FlinkOptions.SOURCE_AVRO_SCHEMA,
StreamerUtil.createMetaClient(tablePathStr, hadoopConf)
.getTableConfig().getTableCreateSchema().get().toString()));
}

@VisibleForTesting
protected String inferTablePath(String catalogPath, ObjectPath tablePath) {
return String.format("%s/%s/%s", catalogPath, tablePath.getDatabaseName(), tablePath.getObjectName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -35,9 +39,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
Expand Down Expand Up @@ -113,4 +119,57 @@ public static List<String> getPartitionKeys(CatalogTable table) {
}
return Collections.emptyList();
}

/**
* Returns the partition path with given {@link CatalogPartitionSpec}.
*/
public static String inferPartitionPath(boolean hiveStylePartitioning, CatalogPartitionSpec catalogPartitionSpec) {
return catalogPartitionSpec.getPartitionSpec().entrySet()
.stream().map(entry ->
hiveStylePartitioning
? String.format("%s=%s", entry.getKey(), entry.getValue())
: entry.getValue())
.collect(Collectors.joining("/"));
}

/**
* Returns a list of ordered partition values by re-arranging them based on the given list of
* partition keys. If the partition value is null, it'll be converted into default partition
* name.
*
* @param partitionSpec The partition spec
* @param partitionKeys The partition keys
* @param tablePath The table path
* @return A list of partition values ordered by partition keys
* @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have
* different sizes, or any key in partitionKeys doesn't exist in partitionSpec.
*/
@VisibleForTesting
public static List<String> getOrderedPartitionValues(
String catalogName,
HiveConf hiveConf,
CatalogPartitionSpec partitionSpec,
List<String> partitionKeys,
ObjectPath tablePath)
throws PartitionSpecInvalidException {
Map<String, String> spec = partitionSpec.getPartitionSpec();
if (spec.size() != partitionKeys.size()) {
throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
}

List<String> values = new ArrayList<>(spec.size());
for (String key : partitionKeys) {
if (!spec.containsKey(key)) {
throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
} else {
String value = spec.get(key);
if (value == null) {
value = hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
}
values.add(value);
}
}

return values;
}
}
Loading