Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void testInvalidCherrypickSnapshotCases() {

assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('', 1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");

assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t', '2.2')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void testInvalidExpireSnapshotsCases() {

assertThatThrownBy(() -> sql("CALL %s.system.expire_snapshots('')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void testInvalidFastForwardBranchCases() {
assertThatThrownBy(
() -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,6 @@ public void testInvalidApplyWapChangesCases() {

assertThatThrownBy(() -> sql("CALL %s.system.publish_changes('', 'not_valid')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testInvalidRemoveOrphanFilesCases() {

assertThatThrownBy(() -> sql("CALL %s.system.remove_orphan_files('')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public void testInvalidRewriteManifestsCases() {

assertThatThrownBy(() -> sql("CALL %s.system.rewrite_manifests('')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,6 @@ public void testInvalidRollbackToSnapshotCases() {

assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot('', 1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void testInvalidRollbackToSnapshotCases() {

assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot(1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot parse identifier for arg table: 1");
.hasMessage("Cannot parse identifier for parameter 'table': 1");

assertThatThrownBy(
() -> sql("CALL %s.system.set_current_snapshot(snapshot_id => 1L)", catalogName))
Expand All @@ -233,7 +233,7 @@ public void testInvalidRollbackToSnapshotCases() {

assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot('', 1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");

assertThatThrownBy(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ class CherrypickSnapshotProcedure extends BaseProcedure {

static final String NAME = "cherrypick_snapshot";

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter SNAPSHOT_ID_PARAM =
requiredInParameter("snapshot_id", DataTypes.LongType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
requiredInParameter("snapshot_id", DataTypes.LongType)
};
new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -83,8 +85,10 @@ public ProcedureParameter[] parameters() {

@Override
public Iterator<Scan> call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
long snapshotId = args.getLong(1);
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);

Identifier tableIdent = input.ident(TABLE_PARAM);
long snapshotId = input.asLong(SNAPSHOT_ID_PARAM);

return asScanIterator(
OUTPUT_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -51,15 +50,30 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {

private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcedure.class);

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter OLDER_THAN_PARAM =
optionalInParameter("older_than", DataTypes.TimestampType);
private static final ProcedureParameter RETAIN_LAST_PARAM =
optionalInParameter("retain_last", DataTypes.IntegerType);
private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM =
optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType);
private static final ProcedureParameter STREAM_RESULTS_PARAM =
optionalInParameter("stream_results", DataTypes.BooleanType);
private static final ProcedureParameter SNAPSHOT_IDS_PARAM =
optionalInParameter("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType));
private static final ProcedureParameter CLEAN_EXPIRED_METADATA_PARAM =
optionalInParameter("clean_expired_metadata", DataTypes.BooleanType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
optionalInParameter("older_than", DataTypes.TimestampType),
optionalInParameter("retain_last", DataTypes.IntegerType),
optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType),
optionalInParameter("stream_results", DataTypes.BooleanType),
optionalInParameter("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType)),
optionalInParameter("clean_expired_metadata", DataTypes.BooleanType)
TABLE_PARAM,
OLDER_THAN_PARAM,
RETAIN_LAST_PARAM,
MAX_CONCURRENT_DELETES_PARAM,
STREAM_RESULTS_PARAM,
SNAPSHOT_IDS_PARAM,
CLEAN_EXPIRED_METADATA_PARAM
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -104,13 +118,14 @@ public ProcedureParameter[] parameters() {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Iterator<Scan> call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1));
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
long[] snapshotIds = args.isNullAt(5) ? null : args.getArray(5).toLongArray();
Boolean cleanExpiredMetadata = args.isNullAt(6) ? null : args.getBoolean(6);
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
Identifier tableIdent = input.ident(TABLE_PARAM);
Long olderThanMillis = input.asTimestampLong(OLDER_THAN_PARAM, null);
Integer retainLastNum = input.asInt(RETAIN_LAST_PARAM, null);
Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, null);
boolean streamResult = input.asBoolean(STREAM_RESULTS_PARAM, false);
Long[] snapshotIds = input.asLongArray(SNAPSHOT_IDS_PARAM, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not make those return long[] instead of Long[]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion. I have adjusted the return value to long[] to maintain consistency with the original usage.

boolean cleanExpiredMetadata = input.asBoolean(CLEAN_EXPIRED_METADATA_PARAM, false);
Copy link
Contributor

@nastra nastra Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally speaking, we need to make sure that we don't change return types as this changes semantics. previously it was expecting a Boolean and now it gets a boolean. If the parameter wasn't defined by the caller, then we would rely on the API's default value (instead of passing true/false). That's also why the null checks further below existed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the boolean handling. I actually suggested @slfan1989 convert these to primitive booleans because all the procedures here use boolean parameters as explicit enablement flags, where null defaults to disabled.

This gives us a chance to clean up the if $Boolean != null pattern below when setting the corresponding flags.
Since we typically backport changes from the latest Spark version to older ones, making this change now should save us some backport work later, even though it means a bit more review effort upfront. Our unit tests should catch any behavioral changes from the semantic shift.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be in favor of using boolean instead of Boolean but we do actually have cases where we want to not pass an argument to the underlying core API if the user didn't provide any input and thus rely on whatever the default behavior of the core API is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the information! From my perspective, changing it to boolean is feasible. The unit tests have all passed, and the results are as expected. Shall we go ahead and accept this improvement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to switch back to using Boolean if it was Boolean originally

Copy link
Contributor

@dramaticlly dramaticlly Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we do actually have cases where we want to not pass an argument to the underlying core API if the user didn't provide any input and thus rely on whatever the default behavior of the core API is

From my understanding, I believe @nastra wanted us to keep original boxed boolean to convey user intention, as unset default of null implies not apply such flag to underlying spark action. Now we switch to false as primitive boolean, it will always pass such flag to action.

Sorry I missed this initially, let's switch to Boolean just like before this change and keep the minimal necessary change to adopt ProcedureInput. Maybe also worth a comment to convey the message as well

Copy link
Contributor Author

@slfan1989 slfan1989 Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra @dramaticlly Thank you for helping review the code! I have already reverted this part of the code back to Boolean.


Preconditions.checkArgument(
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
Expand Down Expand Up @@ -150,14 +165,9 @@ public Iterator<Scan> call(InternalRow args) {
}
}

if (streamResult != null) {
action.option(
ExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult));
}
action.option(ExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult));

if (cleanExpiredMetadata != null) {
action.cleanExpiredMetadata(cleanExpiredMetadata);
}
action.cleanExpiredMetadata(cleanExpiredMetadata);

ExpireSnapshots.Result result = action.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ public class FastForwardBranchProcedure extends BaseProcedure {

static final String NAME = "fast_forward";

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter BRANCH_PARAM =
requiredInParameter("branch", DataTypes.StringType);
private static final ProcedureParameter TO_PARAM =
requiredInParameter("to", DataTypes.StringType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
requiredInParameter("branch", DataTypes.StringType),
requiredInParameter("to", DataTypes.StringType)
};
new ProcedureParameter[] {TABLE_PARAM, BRANCH_PARAM, TO_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -75,9 +78,11 @@ public ProcedureParameter[] parameters() {

@Override
public Iterator<Scan> call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String from = args.getString(1);
String to = args.getString(2);
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);

Identifier tableIdent = input.ident(TABLE_PARAM);
String from = input.asString(BRANCH_PARAM);
String to = input.asString(TO_PARAM);

return modifyIcebergTable(
tableIdent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
Expand Down Expand Up @@ -80,6 +82,22 @@ public Integer asInt(ProcedureParameter param, Integer defaultValue) {
return args.isNullAt(ordinal) ? defaultValue : (Integer) args.getInt(ordinal);
}

public long asTimestampLong(ProcedureParameter param) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public long asTimestampLong(ProcedureParameter param) {
public long asTimestampMillis(ProcedureParameter param) {

Long value = asTimestampLong(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name());
return value;
}

public Long asTimestampLong(ProcedureParameter param, Long defaultValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be called asTimestampMillis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing the code. I will make the adjustments based on your suggestions.

validateParamType(param, DataTypes.TimestampType);
int ordinal = ordinal(param);
Long value = args.isNullAt(ordinal) ? defaultValue : (Long) args.getLong(ordinal);
if (value != null) {
value = DateTimeUtil.microsToMillis(value);
}
return value;
}

public long asLong(ProcedureParameter param) {
Long value = asLong(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name());
Expand All @@ -92,6 +110,11 @@ public Long asLong(ProcedureParameter param, Long defaultValue) {
return args.isNullAt(ordinal) ? defaultValue : (Long) args.getLong(ordinal);
}

public Long[] asLongArray(ProcedureParameter param, Long[] defaultValue) {
validateParamType(param, DataTypes.createArrayType(DataTypes.LongType));
return array(param, (array, ordinal) -> array.getLong(ordinal), Long.class, defaultValue);
}

public String asString(ProcedureParameter param) {
String value = asString(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name());
Expand Down Expand Up @@ -119,6 +142,13 @@ public String[] asStringArray(ProcedureParameter param, String[] defaultValue) {
defaultValue);
}

public DeleteOrphanFiles.PrefixMismatchMode asPrefixMismatchMode(ProcedureParameter param) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should put this method here as it's only used in a single place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the code has already been improved.

String modeAsString = asString(param, null);
return (modeAsString == null)
? null
: DeleteOrphanFiles.PrefixMismatchMode.fromString(modeAsString);
}

@SuppressWarnings("unchecked")
private <T> T[] array(
ProcedureParameter param,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ class PublishChangesProcedure extends BaseProcedure {

static final String NAME = "publish_changes";

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter WAP_ID_PARAM =
requiredInParameter("wap_id", DataTypes.StringType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
requiredInParameter("wap_id", DataTypes.StringType)
};
new ProcedureParameter[] {TABLE_PARAM, WAP_ID_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -87,8 +89,10 @@ public ProcedureParameter[] parameters() {

@Override
public Iterator<Scan> call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String wapId = args.getString(1);
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);

Identifier tableIdent = input.ident(TABLE_PARAM);
String wapId = input.asString(WAP_ID_PARAM);

return modifyIcebergTable(
tableIdent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ class RegisterTableProcedure extends BaseProcedure {

static final String NAME = "register_table";

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter METADATA_FILE_PARAM =
requiredInParameter("metadata_file", DataTypes.StringType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
requiredInParameter("metadata_file", DataTypes.StringType)
};
new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -81,9 +83,11 @@ public ProcedureParameter[] parameters() {

@Override
public Iterator<Scan> call(InternalRow args) {
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
TableIdentifier tableName =
Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
String metadataFile = args.getString(1);
Spark3Util.identifierToTableIdentifier(
toIdentifier(input.asString(TABLE_PARAM), TABLE_PARAM.name()));
String metadataFile = input.asString(METADATA_FILE_PARAM);
Preconditions.checkArgument(
tableCatalog() instanceof HasIcebergCatalog,
"Cannot use Register Table in a non-Iceberg catalog");
Expand Down
Loading