Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm grateful that the ExpireSnapshotsAction refactoring work has been considered to work for different compute engines, such as flink. In this way, we flink don't have to abstract the common logic of actions between flink and spark and we could just extend those BaseXXAction to implement our flink own actions. That's really important !

About the ExpireSnapshotsAction, I think both flink & spark will have the same Result. So maybe we could just use the ExpireSnapshotsActionResult directly ( I don't think there will be a reason that flink or spark will extend this BaseExpireSnapshotsActionResult, so maybe we could just remove the Base prefix).

Copy link
Contributor Author

@aokolnychyi aokolnychyi Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I followed the naming we have in core like BaseFile, BaseTable, BaseRewriteManifests.

Also, there is a class called ExpireSnapshotsActionResult in this package in Spark already. We deprecate it with the introduction of this new one.

I agree query engines will most likely use the same result classes (unless they are doing something really specific).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making ExpireSnapshots.Result a class instead of an interface? Then we would always have the same default implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine converting Result into a class but I am not sure we will gain much by doing that. The current class is in the core module so it is accessible to everyone. Making Result a class may make inheritance a bit harder. I do like our current pattern of having BaseXXX classes in core and interfaces in the API module.


private final long deletedDataFilesCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we don't consider the format v2 in this patch, right ? That's OK if true because we could support v2 in a separate issue. For the RewriteDataFilesAction, now @chenjunjiedada and I have prepared few PRs to support format v2's Rewrite actions.

  1. Core: Remove all delete files in RewriteFiles action. #2303
  2. Spark: support replace equality deletes to position deletes #2216

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, you are correct. I keep the scope of this PR to the existing logic.

@RussellSpitzer is also working on redesigning the rewrite data files action to also support sort-based compactions and per partition compactions. Shall 4 of us meet next week to discuss this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be easy to extend the interface and this implementation to include other counts once we support them.

private final long deletedManifestsCount;
private final long deletedManifestListsCount;

public BaseExpireSnapshotsActionResult(long deletedDataFilesCount,
long deletedManifestsCount,
long deletedManifestListsCount) {
this.deletedDataFilesCount = deletedDataFilesCount;
this.deletedManifestsCount = deletedManifestsCount;
this.deletedManifestListsCount = deletedManifestListsCount;
}

@Override
public long deletedDataFilesCount() {
return deletedDataFilesCount;
}

@Override
public long deletedManifestsCount() {
return deletedManifestsCount;
}

@Override
public long deletedManifestListsCount() {
return deletedManifestListsCount;
}
}
4 changes: 3 additions & 1 deletion spark/src/main/java/org/apache/iceberg/actions/Actions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.spark.actions.BaseExpireSnapshotsSparkAction;
import org.apache.spark.sql.SparkSession;

public class Actions {
Expand Down Expand Up @@ -79,7 +80,8 @@ public RewriteDataFilesAction rewriteDataFiles() {
}

public ExpireSnapshotsAction expireSnapshots() {
return new ExpireSnapshotsAction(spark, table);
BaseExpireSnapshotsSparkAction delegate = new BaseExpireSnapshotsSparkAction(spark, table);
return new ExpireSnapshotsAction(delegate);
Copy link
Contributor

@flyrain flyrain Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still use ExpireSnapshotsAction since it is deprecated in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a user-facing API so we cannot break it without deprecating first.

Copy link
Contributor

@flyrain flyrain Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the Iceberg deprecating process. Here is my understanding, we need to create a new method with new return type for this user-facing API, which can be used by user moving forward. Meanwhile we mark this method deprecated so that user won't use it anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, we will mark Actions deprecated once we have a new alternative. We will have a totally new entry point called SparkActions in another package. Right now, I am migrating action by action to reduce the amount of changes in a single pr. Once all actions are done, I'll create a new public API for users to use and deprecate the rest.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ protected BaseSnapshotUpdateSparkAction(SparkSession spark) {
super(spark);
}

protected abstract ThisT self();

@Override
public ThisT snapshotProperty(String property, String value) {
summary.put(property, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.iceberg.BaseTable;
Expand All @@ -36,6 +37,7 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.JobGroupUtils;
import org.apache.iceberg.spark.SparkUtil;
Expand All @@ -51,12 +53,13 @@

import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;

abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {
public abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {
Copy link
Contributor

@flyrain flyrain Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need it to be public? Looks like there is no scope change needed in this PR.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily yes since we are using it in another package. Once we get rid of the old actions, we should be able to move this and make it non-public.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is public but will be removed, we should mark it deprecated so people know not to rely on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will be used even after refactoring. It may be moved but to another package, though.


private static final AtomicInteger JOB_COUNTER = new AtomicInteger();

private final SparkSession spark;
private final JavaSparkContext sparkContext;
private final Map<String, String> options = Maps.newHashMap();

protected BaseSparkAction(SparkSession spark) {
this.spark = spark;
Expand All @@ -71,6 +74,24 @@ protected JavaSparkContext sparkContext() {
return sparkContext;
}

protected abstract ThisT self();

@Override
public ThisT option(String name, String value) {
options.put(name, value);
return self();
}

@Override
public ThisT options(Map<String, String> newOptions) {
options.putAll(newOptions);
return self();
}

protected Map<String, String> options() {
return options;
}

protected <T> T withJobGroupInfo(JobGroupInfo info, Supplier<T> supplier) {
SparkContext context = spark().sparkContext();
JobGroupInfo previousInfo = JobGroupUtils.getJobGroupInfo(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,15 @@

package org.apache.iceberg.actions;

import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.Column;
import org.apache.iceberg.spark.actions.BaseExpireSnapshotsSparkAction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;

/**
* An action which performs the same operation as {@link ExpireSnapshots} but uses Spark
* An action which performs the same operation as {@link org.apache.iceberg.ExpireSnapshots} but uses Spark
* to determine the delta in files between the pre and post-expiration table metadata. All of the same
* restrictions of Remove Snapshots also apply to this action.
* <p>
Expand All @@ -57,44 +36,15 @@
* require a shuffle so parallelism can be controlled through spark.sql.shuffle.partitions. The expiration is done
* locally using a direct call to RemoveSnapshots. The snapshot expiration will be fully committed before any deletes
* are issued. Deletes are still performed locally after retrieving the results from the Spark executors.
*
* @deprecated since 0.12.0, will be removed in 0.13.0; use {@link BaseExpireSnapshotsSparkAction} instead.
*/
@SuppressWarnings("UnnecessaryAnonymousClass")
public class ExpireSnapshotsAction extends BaseSparkAction<ExpireSnapshotsAction, ExpireSnapshotsActionResult> {
private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class);

private static final String DATA_FILE = "Data File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";

// Creates an executor service that runs each task in the thread that invokes execute/submit.
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;

private final Table table;
private final TableOperations ops;
private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
ops.io().deleteFile(file);
}
};

private Set<Long> expireSnapshotIdValues = Sets.newHashSet();
private Long expireOlderThanValue = null;
private Integer retainLastValue = null;
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private Dataset<Row> expiredFiles = null;
private boolean streamResults = false;

ExpireSnapshotsAction(SparkSession spark, Table table) {
super(spark);
@Deprecated
public class ExpireSnapshotsAction implements Action<ExpireSnapshotsAction, ExpireSnapshotsActionResult> {
private final BaseExpireSnapshotsSparkAction delegate;

this.table = table;
this.ops = ((HasTableOperations) table).operations();

ValidationException.check(
PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
"Cannot expire snapshots: GC is disabled (deleting files may corrupt other tables)");
ExpireSnapshotsAction(BaseExpireSnapshotsSparkAction delegate) {
this.delegate = delegate;
}

/**
Expand All @@ -105,7 +55,7 @@ public void accept(String file) {
* @return this for method chaining
*/
public ExpireSnapshotsAction streamDeleteResults(boolean stream) {
this.streamResults = stream;
delegate.option("stream-results", Boolean.toString(stream));
return this;
}

Expand All @@ -116,7 +66,7 @@ public ExpireSnapshotsAction streamDeleteResults(boolean stream) {
* @return this for method chaining
*/
public ExpireSnapshotsAction executeDeleteWith(ExecutorService executorService) {
this.deleteExecutorService = executorService;
delegate.executeDeleteWith(executorService);
return this;
}

Expand All @@ -127,7 +77,7 @@ public ExpireSnapshotsAction executeDeleteWith(ExecutorService executorService)
* @return this for method chaining
*/
public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
this.expireSnapshotIdValues.add(expireSnapshotId);
delegate.expireSnapshotId(expireSnapshotId);
return this;
}

Expand All @@ -138,7 +88,7 @@ public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
* @return this for method chaining
*/
public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
this.expireOlderThanValue = timestampMillis;
delegate.expireOlderThan(timestampMillis);
return this;
}

Expand All @@ -149,9 +99,7 @@ public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
* @return this for method chaining
*/
public ExpireSnapshotsAction retainLast(int numSnapshots) {
Preconditions.checkArgument(1 <= numSnapshots,
"Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots);
this.retainLastValue = numSnapshots;
delegate.retainLast(numSnapshots);
return this;
}

Expand All @@ -162,7 +110,7 @@ public ExpireSnapshotsAction retainLast(int numSnapshots) {
* @return this for method chaining
*/
public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
this.deleteFunc = newDeleteFunc;
delegate.deleteWith(newDeleteFunc);
return this;
}

Expand All @@ -176,95 +124,12 @@ public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
* @return a Dataset of files that are no longer referenced by the table
*/
public Dataset<Row> expire() {
if (expiredFiles == null) {
// Metadata before Expiration
Dataset<Row> originalFiles = buildValidFileDF(ops.current());

// Perform Expiration
ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false);
for (final Long id : expireSnapshotIdValues) {
expireSnaps = expireSnaps.expireSnapshotId(id);
}

if (expireOlderThanValue != null) {
expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue);
}

if (retainLastValue != null) {
expireSnaps = expireSnaps.retainLast(retainLastValue);
}

expireSnaps.commit();

// Metadata after Expiration
Dataset<Row> validFiles = buildValidFileDF(ops.refresh());

this.expiredFiles = originalFiles.except(validFiles);
}

return expiredFiles;
return delegate.expire();
}

@Override
public ExpireSnapshotsActionResult execute() {
JobGroupInfo info = newJobGroupInfo("EXPIRE", "EXPIRE-SNAPSHOTS");
return withJobGroupInfo(info, this::doExecute);
}

private ExpireSnapshotsActionResult doExecute() {
if (streamResults) {
return deleteFiles(expire().toLocalIterator());
} else {
return deleteFiles(expire().collectAsList().iterator());
}
}

private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) {
return ds.select(new Column("file_path"), functions.lit(type).as("file_type"));
}

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, table.io());
return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE)
.union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST))
.union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST));
}

/**
* Deletes files passed to it based on their type.
* @param expired an Iterator of Spark Rows of the structure (path: String, type: String)
* @return Statistics on which files were deleted
*/
private ExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
AtomicLong dataFileCount = new AtomicLong(0L);
AtomicLong manifestCount = new AtomicLong(0L);
AtomicLong manifestListCount = new AtomicLong(0L);

Tasks.foreach(expired)
.retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
.executeWith(deleteExecutorService)
.onFailure((fileInfo, exc) ->
LOG.warn("Delete failed for {}: {}", fileInfo.getString(1), fileInfo.getString(0), exc))
.run(fileInfo -> {
String file = fileInfo.getString(0);
String type = fileInfo.getString(1);
deleteFunc.accept(file);
switch (type) {
case DATA_FILE:
dataFileCount.incrementAndGet();
LOG.trace("Deleted Data File: {}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
LOG.debug("Deleted Manifest: {}", file);
break;
case MANIFEST_LIST:
manifestListCount.incrementAndGet();
LOG.debug("Deleted Manifest List: {}", file);
break;
}
});
LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get());
return new ExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get());
org.apache.iceberg.actions.ExpireSnapshots.Result result = delegate.execute();
return ExpireSnapshotsActionResult.wrap(result);
}
}
Loading