Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,11 @@ default ExpireSnapshots expireSnapshots(Table table) {
default DeleteReachableFiles deleteReachableFiles(String metadataLocation) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement deleteReachableFiles");
}

/**
* Instantiates an action to generate a change data set.
*/
default GenerateChangeSet generateChangeSet(Table table) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement generateChangeSet");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

/**
* An action that generates a change data set from snapshots.
*/
public interface GenerateChangeSet extends Action<GenerateChangeSet, GenerateChangeSet.Result> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a few sentences about the purpose of this action similar to other actions?

/**
* Emit changed data set by a snapshot id.
*
* @param snapshotId id of the snapshot to generate changed data
* @return this for method chaining
*/
GenerateChangeSet forSnapshot(long snapshotId);

/**
* Emit change data set from the start snapshot (inclusive).
* <p>
* If neither {@link #fromSnapshotInclusive(long)} or {@link #fromSnapshotExclusive(long)} is provided,
* the start snapshot (inclusive) is defaulted to the oldest ancestor of the snapshot history.
*
* @param fromSnapshotId id of the start snapshot (inclusive)
* @return this for method chaining
*/
GenerateChangeSet fromSnapshotInclusive(long fromSnapshotId);

/**
* Emit change data set from the start snapshot (exclusive).
* <p>
* If neither {@link #fromSnapshotInclusive(long)} or {@link #fromSnapshotExclusive(long)} is provided,
* the start snapshot (inclusive) is defaulted to the oldest ancestor of the snapshot history.
*
* @param fromSnapshotId id of the start snapshot (exclusive)
* @return this for method chaining
*/
GenerateChangeSet fromSnapshotExclusive(long fromSnapshotId);

/**
* Emit changed data to a particular snapshot (inclusive).
* <p>
* If not provided, end snapshot is defaulted to the table's current snapshot.
*
* @param toSnapshotId id of the end snapshot (inclusive)
* @return this for method chaining
*/
GenerateChangeSet toSnapshot(long toSnapshotId);

/**
* The action result that contains a dataset of changed rows.
*/
interface Result<T> {
Copy link
Contributor

@aokolnychyi aokolnychyi May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't think the action API should be limited to table maintenance. When we added it, the idea was to provide solutions for common problems that require a query engine. Initially, those happened to be mostly related to table maintenance. However, I wouldn't mind actions for other purposes.

We all agree to share the planning part across engines via a dedicated scan API. However, we still need to build an engine-specific representation somewhere. Of course, we could have a utility but I am not a big fan of exposing utilities to users. The action API is much more user-facing and requires us to think about the proper interface and keep the compatibility. We had many issues with Spark utils exposed to the users in the past.

One thing about using the action API for CDC that concerns me is the need to parameterize the result. Our action API was engine agnostic so far but we need to return Spark Dataset here. I am not sure how big of a deal it is.

Thoughts, @flyrain @rdblue @RussellSpitzer @szehon-ho @stevenzwu?

@rdblue @stevenzwu, I think both of you had concerns about using an action. Could you elaborate a little bit? Given that we will have a common Scan API to share the planning logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative we discussed is using a new metadata table so that users can point to it to load changes. What is our long-term plan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi My question would be why not exposing the CDC read (change set) via a source (in Spark or other engines). I remember @rdblue mentioned some problem with Spark source for that purpose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, do you mean a custom source for CDC records or just extending the main data source integration? Spark, unlike Flink, does not have an API for CDC which can be used by data sources. We would want to add that to Spark eventually but it will be a major effort and will take tons of time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for explaining. I guess that is the problem Ryan mentioned. Spark source API doesn't support CDC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both of you had concerns about using an action. Could you elaborate a little bit?

I don't have a problem with using Actions for other tasks, but I think those other tasks should be actions taken on the table. It seems odd to me to have an "action" be something that essentially builds a view of a table that is evaluated lazily. In this case, the action doesn't really do anything. It is a convenience.

That's why the return value doesn't really fit. Normally, you get back a summary of what was done, but in this case nothing was done and you get back the resulting dataframe.

The other good point is what Steven raised. I would like to make this a source so that you can use it as natively in Spark as possible. We can use the same approach as time travel and metadata tables. If you load db.table.change_log, then you get the CDC view of the table. And you can use time travel selectors (from-snapshot-id, to-snapshot-id, etc) to select the range of time. I'd definitely prefer this more native approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata table came up a few times in the original discussion but we were worried about the extra complexity its would require and how much this would delay the CDC implementation. Let's estimate a little bit the amount of changes that will be required to support a metadata table approach. We can start with batch support only for now.

Copy link
Member

@szehon-ho szehon-ho May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to chime in late, but I like metadata table way as well, as was discussing with @flyrain, though I might be missing some context.

To me it's a better UX (user can use SQL to query and do analytic on it, time-travel). And code wise it may make the integration with other engine like Trino easier as there's some common interfaces exposed across metadata tables they may take advantage of instead of a separate Action interface, though I admit that's a bit theoretical at this point.

Copy link
Contributor Author

@flyrain flyrain May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no doubt that metadata table is better on UX. We've planned the metadata table in the phase 2, check the milestone part in the design doc. The idea was to bring the basic CDC functionality to users sooner to unblock them. Some users really want it ASAP.

/**
* Returns the change set.
*/
T changeSet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

public class BaseGenerateChangeSetActionResult<T> implements GenerateChangeSet.Result<T> {
private final T changeSet;

public BaseGenerateChangeSetActionResult(T changeSet) {
this.changeSet = changeSet;
}

@Override
public T changeSet() {
return changeSet;
}
}