Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -50,12 +54,15 @@ public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload

private static final Logger LOG = LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class);

private final Option<Map<String, String>> extraMetadata;

public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
this.extraMetadata = extraMetadata;
}

@Override
Expand Down Expand Up @@ -149,4 +156,41 @@ public Long parsedToSeconds(String time) {
}
return timestamp;
}

@Override
public Option<HoodieCompactionPlan> execute() {
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& !config.getFailedWritesCleanPolicy().isLazy()) {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move these validation checks to a commonplace? Looks like this was being used elsewhere too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug in BaseScheduleCompactionActionExecutor$execute as the PR #3025 want to fix, this bug will let independence hudi compaction can not run. But this PR need to discuss, so we choose to modify the subclass FlinkScheduleCompactionActionExecutor to finish compaction.

.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
}

HoodieCompactionPlan plan = scheduleCompaction();
if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
extraMetadata.ifPresent(plan::setExtraMetadata);
HoodieInstant compactionInstant =
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
try {
table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling compaction", ioe);
}
return Option.of(plan);
}
return Option.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
Expand Down Expand Up @@ -520,39 +518,6 @@ private FlinkOptions() {
// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";

/**
* Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
* in the properties file (set by `--props` option) and cmd line options
* (set by `--hoodie-conf` option).
*/
@SuppressWarnings("unchecked, rawtypes")
public static org.apache.flink.configuration.Configuration fromStreamerConfig(FlinkStreamerConfig config) {
Map<String, String> propsMap = new HashMap<String, String>((Map) StreamerUtil.getProps(config));
org.apache.flink.configuration.Configuration conf = fromMap(propsMap);

conf.setString(FlinkOptions.PATH, config.targetBasePath);
conf.setString(READ_AVRO_SCHEMA_PATH, config.readSchemaFilePath);
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
// keygenClass has higher priority than keygenType
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);

return conf;
}

/**
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class CleanFunction<T> extends AbstractRichFunction

private final Configuration conf;

private HoodieFlinkWriteClient writeClient;
protected HoodieFlinkWriteClient writeClient;
private NonThrownExecutor executor;

private volatile boolean isCleaning;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

/**
Expand Down Expand Up @@ -79,22 +80,24 @@ public void processElement(CompactionPlanEvent event, Context context, Collector
final CompactionOperation compactionOperation = event.getOperation();
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
this.writeClient.getConfig(),
this.writeClient.getEngineContext(),
this.writeClient.getHoodieTable().getMetaClient()),
this.writeClient.getHoodieTable().getMetaClient(),
this.writeClient.getConfig(),
compactionOperation,
instantTime);
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}, "Execute compaction for instant %s from task %d", instantTime, taskID
() -> doCompaction(instantTime, compactionOperation, collector), "Execute compaction for instant %s from task %d", instantTime, taskID
);
}

private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
this.writeClient.getConfig(),
this.writeClient.getEngineContext(),
this.writeClient.getHoodieTable().getMetaClient()),
this.writeClient.getHoodieTable().getMetaClient(),
this.writeClient.getConfig(),
compactionOperation,
instantTime);
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}

@VisibleForTesting
public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.hudi.sink.compact;

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.util.StreamerUtil;

Expand Down Expand Up @@ -59,11 +59,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
*/
private final Configuration conf;

/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;

/**
* Buffer to collect the event from each compact task {@code CompactFunction}.
* The key is the instant time.
Expand All @@ -78,7 +73,9 @@ public CompactionCommitSink(Configuration conf) {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
if (writeClient == null) {
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
}
this.commitBuffer = new HashMap<>();
}

Expand Down Expand Up @@ -122,6 +119,13 @@ private void commitIfNecessary(String instant, List<CompactionCommitEvent> event
}
// commit the compaction
this.writeClient.commitCompaction(instant, statuses, Option.empty());

// Whether to cleanup the old log file when compaction
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
this.writeClient.startAsyncCleaning();
this.writeClient.waitForCleaningFinish();
}

// reset the status
reset(instant);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink.compact;

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.table.HoodieFlinkTable;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static java.util.stream.Collectors.toList;

/**
* Flink hudi compaction source function.
*
* <P>This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task
* event {@link CompactionPlanEvent} to downstream operators.
*
* <p>The compaction instant time is specified explicitly with strategies:
*
* <ul>
* <li>If the timeline has no inflight instants,
* use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
* as the instant time;</li>
* <li>If the timeline has inflight instants,
* use the {earliest inflight instant time - 1ms} as the instant time.</li>
* </ul>
*/
public class CompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {

protected static final Logger LOG = LoggerFactory.getLogger(CompactionPlanSourceFunction.class);

/**
* Compaction instant time.
*/
private String compactionInstantTime;

/**
* Hoodie flink table.
*/
private HoodieFlinkTable<?> table;

/**
* The compaction plan.
*/
private HoodieCompactionPlan compactionPlan;

/**
* Hoodie instant.
*/
private HoodieInstant instant;

public CompactionPlanSourceFunction(HoodieFlinkTable<?> table, HoodieInstant instant, HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
this.table = table;
this.instant = instant;
this.compactionPlan = compactionPlan;
this.compactionInstantTime = compactionInstantTime;
}

@Override
public void open(Configuration parameters) throws Exception {
// no operation
}

@Override
public void run(SourceContext sourceContext) throws Exception {
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();

List<CompactionOperation> operations = this.compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("CompactionPlanFunction compacting " + operations + " files");
for (CompactionOperation operation : operations) {
sourceContext.collect(new CompactionPlanEvent(compactionInstantTime, operation));
}
}

@Override
public void close() throws Exception {
// no operation
}

@Override
public void cancel() {
// no operation
}
}
Loading