Skip to content

Commit

Permalink
Steven's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Sep 17, 2024
1 parent 70e0783 commit 33307c3
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
Expand All @@ -41,21 +42,21 @@
public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResult> {
private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class);
public static final OutputTag<String> DELETE_STREAM =
new OutputTag<>("delete-stream", Types.STRING);
new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING);

private final TableLoader tableLoader;
private final Long minAgeMs;
private final Long maxSnapshotAgeMs;
private final Integer retainLast;
private final int plannerPoolSize;
private transient ExecutorService plannerPool;
private transient Table table;

public ExpireSnapshotsProcessor(
TableLoader tableLoader, Long minAgeMs, Integer retainLast, int plannerPoolSize) {
TableLoader tableLoader, Long maxSnapshotAgeMs, Integer retainLast, int plannerPoolSize) {
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");

this.tableLoader = tableLoader;
this.minAgeMs = minAgeMs;
this.maxSnapshotAgeMs = maxSnapshotAgeMs;
this.retainLast = retainLast;
this.plannerPoolSize = plannerPoolSize;
}
Expand All @@ -73,21 +74,30 @@ public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> o
try {
table.refresh();
ExpireSnapshots expireSnapshots = table.expireSnapshots();
if (minAgeMs != null) {
expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - minAgeMs);
if (maxSnapshotAgeMs != null) {
expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - maxSnapshotAgeMs);
}

if (retainLast != null) {
expireSnapshots = expireSnapshots.retainLast(retainLast);
}

AtomicLong deleteFileCounter = new AtomicLong(0L);
expireSnapshots
.planWith(plannerPool)
.deleteWith(file -> ctx.output(DELETE_STREAM, file))
.deleteWith(
file -> {
ctx.output(DELETE_STREAM, file);
deleteFileCounter.incrementAndGet();
})
.cleanExpiredFiles(true)
.commit();

LOG.info("Successfully finished expiring snapshots for {} at {}", table, ctx.timestamp());
LOG.info(
"Successfully finished expiring snapshots for {} at {}. Scheduled {} files for delete.",
table,
ctx.timestamp(),
deleteFileCounter.get());
out.collect(
new TaskResult(trigger.taskId(), trigger.timestamp(), true, Collections.emptyList()));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
import org.apache.iceberg.flink.maintenance.operator.TaskResult;
Expand All @@ -37,9 +38,7 @@ public class ExpireSnapshots {
private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
private static final long DELETE_TIMEOUT_MS = 10000L;
private static final int DELETE_PLANNING_WORKER_POOL_SIZE_DEFAULT = 10;
private static final int DELETE_ATTEMPT_NUM = 10;
private static final int DELETE_WORKER_POOL_SIZE_DEFAULT = 10;
private static final String EXECUTOR_TASK_NAME = "ES Executor";
@VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file";

Expand All @@ -53,20 +52,20 @@ public static Builder builder() {
}

public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
private Duration minAge = null;
private Duration maxSnapshotAge = null;
private Integer retainLast = null;
private int planningWorkerPoolSize = DELETE_PLANNING_WORKER_POOL_SIZE_DEFAULT;
private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
private int deleteAttemptNum = DELETE_ATTEMPT_NUM;
private int deleteWorkerPoolSize = DELETE_WORKER_POOL_SIZE_DEFAULT;
private int deleteWorkerPoolSize = SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();

/**
* The snapshots newer than this age will not be removed.
*
* @param newMinAge of the files to be removed
* @param newMaxSnapshotAge of the snapshots to be removed
* @return for chained calls
*/
public Builder minAge(Duration newMinAge) {
this.minAge = newMinAge;
public Builder maxSnapshotAge(Duration newMaxSnapshotAge) {
this.maxSnapshotAge = newMaxSnapshotAge;
return this;
}

Expand Down Expand Up @@ -124,11 +123,11 @@ DataStream<TaskResult> buildInternal(DataStream<Trigger> trigger) {
.process(
new ExpireSnapshotsProcessor(
tableLoader(),
minAge == null ? null : minAge.toMillis(),
maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(),
retainLast,
planningWorkerPoolSize))
.name(EXECUTOR_TASK_NAME)
.uid(uidPrefix() + "-expire-snapshots")
.uid("expire-snapshots-" + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();

Expand All @@ -149,7 +148,7 @@ DataStream<TaskResult> buildInternal(DataStream<Trigger> trigger) {
deleteWorkerPoolSize,
retryStrategy)
.name(DELETE_FILES_TASK_NAME)
.uid(uidPrefix() + "-delete-expired-files")
.uid("delete-expired-files-" + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> {
private int id;
private int index;
private String name;
private TableLoader tableLoader;
private String uidPrefix = null;
private String uidSuffix = null;
private String slotSharingGroup = null;
private Integer parallelism = null;
private TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder();
Expand Down Expand Up @@ -79,7 +79,7 @@ public T scheduleOnDataFileSize(long dataFileSizeInBytes) {
* @param posDeleteFileCount after the downstream job should be started
* @return for chained calls
*/
public T schedulerOnPosDeleteFileCount(int posDeleteFileCount) {
public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) {
triggerEvaluator.posDeleteFileCount(posDeleteFileCount);
return (T) this;
}
Expand All @@ -91,7 +91,7 @@ public T schedulerOnPosDeleteFileCount(int posDeleteFileCount) {
* @param posDeleteRecordCount after the downstream job should be started
* @return for chained calls
*/
public T schedulerOnPosDeleteRecordCount(long posDeleteRecordCount) {
public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) {
triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount);
return (T) this;
}
Expand Down Expand Up @@ -123,22 +123,22 @@ public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) {
/**
* After a given time since the last run, starts the downstream job.
*
* @param time after the downstream job should be started
* @param interval after the downstream job should be started
* @return for chained calls
*/
public T scheduleOnTime(Duration time) {
triggerEvaluator.timeout(time);
public T scheduleOnInterval(Duration interval) {
triggerEvaluator.timeout(interval);
return (T) this;
}

/**
* The prefix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid.
* The suffix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid.
*
* @param newUidPrefix for the transformations
* @param newUidSuffix for the transformations
* @return for chained calls
*/
public T uidPrefix(String newUidPrefix) {
this.uidPrefix = newUidPrefix;
public T uidSuffix(String newUidSuffix) {
this.uidSuffix = newUidSuffix;
return (T) this;
}

Expand Down Expand Up @@ -167,7 +167,7 @@ public T parallelism(int newParallelism) {

@Internal
int id() {
return id;
return index;
}

@Internal
Expand All @@ -181,8 +181,8 @@ TableLoader tableLoader() {
}

@Internal
String uidPrefix() {
return uidPrefix;
String uidSuffix() {
return uidSuffix;
}

@Internal
Expand All @@ -203,24 +203,24 @@ TriggerEvaluator evaluator() {
@Internal
DataStream<TaskResult> build(
DataStream<Trigger> sourceStream,
int newId,
String newName,
int maintenanceTaskIndex,
String maintainanceTaskName,
TableLoader newTableLoader,
String mainUidPrefix,
String mainUidSuffix,
String mainSlotSharingGroup,
int mainParallelism) {
Preconditions.checkArgument(
parallelism == null || parallelism == -1 || parallelism > 0,
"Parallelism should be left to default (-1/null) or greater than 0");
Preconditions.checkNotNull(newName, "Name should not be null");
Preconditions.checkNotNull(maintainanceTaskName, "Name should not be null");
Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null");

this.id = newId;
this.name = newName;
this.index = maintenanceTaskIndex;
this.name = maintainanceTaskName;
this.tableLoader = newTableLoader;

if (uidPrefix == null) {
uidPrefix = mainUidPrefix + "_" + name + "_" + id;
if (uidSuffix == null) {
uidSuffix = name + "_" + index + "_" + mainUidSuffix;
}

if (parallelism == null) {
Expand Down
Loading

0 comments on commit 33307c3

Please sign in to comment.