diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java index 82e87c45f11d..8f5f9daf39e2 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java @@ -188,7 +188,7 @@ public void testInvalidCherrypickSnapshotCases() { assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('', 1L)", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t', 2.2)", catalogName)) .isInstanceOf(AnalysisException.class) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java index f5060ba8e7f2..a9e8f6c52edb 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java @@ -191,7 +191,7 @@ public void testInvalidExpireSnapshotsCases() { assertThatThrownBy(() -> sql("CALL %s.system.expire_snapshots('')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } @TestTemplate diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java index 4d5b569d24cd..dab0fb00d394 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java @@ -190,7 +190,7 @@ public void testInvalidFastForwardBranchCases() { assertThatThrownBy( () -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } @TestTemplate diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java index ed20eaa4d54a..08f44c8f01f2 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -186,6 +186,6 @@ public void testInvalidApplyWapChangesCases() { assertThatThrownBy(() -> sql("CALL %s.system.publish_changes('', 'not_valid')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index 138ac1e6b056..c17dbad71100 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -272,7 +272,7 @@ public void testInvalidRemoveOrphanFilesCases() { assertThatThrownBy(() -> sql("CALL %s.system.remove_orphan_files('')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } @TestTemplate diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java index 1e41b3a04234..fcebb8b04057 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java @@ -326,7 +326,7 @@ public void testInvalidRewriteManifestsCases() { assertThatThrownBy(() -> sql("CALL %s.system.rewrite_manifests('')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } @TestTemplate diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java index 572347971d8c..36e479b91cf1 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java @@ -282,6 +282,6 @@ public void testInvalidRollbackToSnapshotCases() { assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot('', 1L)", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java index 0b78d914c34f..ebf5fe31bad7 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java @@ -219,7 +219,7 @@ public void testInvalidRollbackToSnapshotCases() { assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot(1L)", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot parse identifier for arg table: 1"); + .hasMessage("Cannot parse identifier for parameter 'table': 1"); assertThatThrownBy( () -> sql("CALL %s.system.set_current_snapshot(snapshot_id => 1L)", catalogName)) @@ -236,7 +236,7 @@ public void testInvalidRollbackToSnapshotCases() { assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot('', 1L)", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); assertThatThrownBy( () -> diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java index fb8bdc252df5..d0856804b06c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java @@ -45,6 +45,7 @@ import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.iceberg.catalog.Procedure; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; import org.apache.spark.sql.execution.CacheManager; import org.apache.spark.sql.execution.datasources.SparkExpressionConverter; import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; @@ -57,6 +58,14 @@ abstract class BaseProcedure implements Procedure { DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType); protected static final DataType STRING_ARRAY = DataTypes.createArrayType(DataTypes.StringType); + protected static ProcedureParameter requiredInParameter(String name, DataType dataType) { + return ProcedureParameter.required(name, dataType); + } + + protected static ProcedureParameter optionalInParameter(String name, DataType dataType) { + return ProcedureParameter.optional(name, dataType); + } + private final SparkSession spark; private final TableCatalog tableCatalog; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java index efe9aeb9e8e8..bfde38402dac 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java @@ -40,11 +40,13 @@ */ class CherrypickSnapshotProcedure extends BaseProcedure { + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter SNAPSHOT_ID_PARAM = + requiredInParameter("snapshot_id", DataTypes.LongType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.required("snapshot_id", DataTypes.LongType) - }; + new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -78,8 +80,10 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - long snapshotId = args.getLong(1); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + + Identifier tableIdent = input.ident(TABLE_PARAM); + long snapshotId = input.asLong(SNAPSHOT_ID_PARAM); return modifyIcebergTable( tableIdent, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index 9139b465b1f0..161c98cc5f5c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -25,7 +25,6 @@ import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; -import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -46,15 +45,30 @@ public class ExpireSnapshotsProcedure extends BaseProcedure { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcedure.class); + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter OLDER_THAN_PARAM = + optionalInParameter("older_than", DataTypes.TimestampType); + private static final ProcedureParameter RETAIN_LAST_PARAM = + optionalInParameter("retain_last", DataTypes.IntegerType); + private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM = + optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType); + private static final ProcedureParameter STREAM_RESULTS_PARAM = + optionalInParameter("stream_results", DataTypes.BooleanType); + private static final ProcedureParameter SNAPSHOT_IDS_PARAM = + optionalInParameter("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType)); + private static final ProcedureParameter CLEAN_EXPIRED_METADATA_PARAM = + optionalInParameter("clean_expired_metadata", DataTypes.BooleanType); + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.optional("older_than", DataTypes.TimestampType), - ProcedureParameter.optional("retain_last", DataTypes.IntegerType), - ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType), - ProcedureParameter.optional("stream_results", DataTypes.BooleanType), - ProcedureParameter.optional("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType)), - ProcedureParameter.optional("clean_expired_metadata", DataTypes.BooleanType) + TABLE_PARAM, + OLDER_THAN_PARAM, + RETAIN_LAST_PARAM, + MAX_CONCURRENT_DELETES_PARAM, + STREAM_RESULTS_PARAM, + SNAPSHOT_IDS_PARAM, + CLEAN_EXPIRED_METADATA_PARAM }; private static final StructType OUTPUT_TYPE = @@ -99,13 +113,14 @@ public StructType outputType() { @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1)); - Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2); - Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3); - Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4); - long[] snapshotIds = args.isNullAt(5) ? null : args.getArray(5).toLongArray(); - Boolean cleanExpiredMetadata = args.isNullAt(6) ? null : args.getBoolean(6); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null); + Integer retainLastNum = input.asInt(RETAIN_LAST_PARAM, null); + Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, null); + Boolean streamResult = input.asBoolean(STREAM_RESULTS_PARAM, null); + long[] snapshotIds = input.asLongArray(SNAPSHOT_IDS_PARAM, null); + Boolean cleanExpiredMetadata = input.asBoolean(CLEAN_EXPIRED_METADATA_PARAM, null); Preconditions.checkArgument( maxConcurrentDeletes == null || maxConcurrentDeletes > 0, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java index 11ea5d44c9f8..7a04f8f3a5e6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java @@ -30,12 +30,15 @@ public class FastForwardBranchProcedure extends BaseProcedure { + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter BRANCH_PARAM = + requiredInParameter("branch", DataTypes.StringType); + private static final ProcedureParameter TO_PARAM = + requiredInParameter("to", DataTypes.StringType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.required("branch", DataTypes.StringType), - ProcedureParameter.required("to", DataTypes.StringType) - }; + new ProcedureParameter[] {TABLE_PARAM, BRANCH_PARAM, TO_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -70,9 +73,11 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String from = args.getString(1); - String to = args.getString(2); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + + Identifier tableIdent = input.ident(TABLE_PARAM); + String from = input.asString(BRANCH_PARAM); + String to = input.asString(TO_PARAM); return modifyIcebergTable( tableIdent, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java index 0be4b38de79c..9d4d0e1d8cca 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java @@ -26,6 +26,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; @@ -80,6 +81,22 @@ public Integer asInt(ProcedureParameter param, Integer defaultValue) { return args.isNullAt(ordinal) ? defaultValue : (Integer) args.getInt(ordinal); } + public long asTimestampMillis(ProcedureParameter param) { + Long value = asTimestampMillis(param, null); + Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name()); + return value; + } + + public Long asTimestampMillis(ProcedureParameter param, Long defaultValue) { + validateParamType(param, DataTypes.TimestampType); + int ordinal = ordinal(param); + Long value = args.isNullAt(ordinal) ? defaultValue : (Long) args.getLong(ordinal); + if (value != null) { + value = DateTimeUtil.microsToMillis(value); + } + return value; + } + public long asLong(ProcedureParameter param) { Long value = asLong(param, null); Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name()); @@ -92,6 +109,22 @@ public Long asLong(ProcedureParameter param, Long defaultValue) { return args.isNullAt(ordinal) ? defaultValue : (Long) args.getLong(ordinal); } + public long[] asLongArray(ProcedureParameter param, Long[] defaultValue) { + validateParamType(param, DataTypes.createArrayType(DataTypes.LongType)); + Long[] source = + array(param, (array, ordinal) -> array.getLong(ordinal), Long.class, defaultValue); + + if (source == null) { + return null; + } + + long[] result = new long[source.length]; + for (int i = 0; i < source.length; i++) { + result[i] = source[i]; + } + return result; + } + public String asString(ProcedureParameter param) { String value = asString(param, null); Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java index 33429b164280..2c3ce7418e08 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -44,11 +44,13 @@ */ class PublishChangesProcedure extends BaseProcedure { + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter WAP_ID_PARAM = + requiredInParameter("wap_id", DataTypes.StringType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.required("wap_id", DataTypes.StringType) - }; + new ProcedureParameter[] {TABLE_PARAM, WAP_ID_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -82,8 +84,10 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String wapId = args.getString(1); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + + Identifier tableIdent = input.ident(TABLE_PARAM); + String wapId = input.asString(WAP_ID_PARAM); return modifyIcebergTable( tableIdent, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java index 857949e052c8..88916fff7455 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java @@ -36,11 +36,13 @@ import org.apache.spark.sql.types.StructType; class RegisterTableProcedure extends BaseProcedure { + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter METADATA_FILE_PARAM = + requiredInParameter("metadata_file", DataTypes.StringType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.required("metadata_file", DataTypes.StringType) - }; + new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -75,9 +77,11 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); TableIdentifier tableName = - Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table")); - String metadataFile = args.getString(1); + Spark3Util.identifierToTableIdentifier( + toIdentifier(input.asString(TABLE_PARAM), TABLE_PARAM.name())); + String metadataFile = input.asString(METADATA_FILE_PARAM); Preconditions.checkArgument( tableCatalog() instanceof HasIcebergCatalog, "Cannot use Register Table in a non-Iceberg catalog"); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index 6e70dc3ccd9c..affc9b76d8c3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -25,12 +25,11 @@ import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; -import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -42,7 +41,6 @@ import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; /** * A procedure that removes orphan files in a table. @@ -52,19 +50,40 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class); + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter OLDER_THAN_PARAM = + optionalInParameter("older_than", DataTypes.TimestampType); + private static final ProcedureParameter LOCATION_PARAM = + optionalInParameter("location", DataTypes.StringType); + private static final ProcedureParameter DRY_RUN_PARAM = + optionalInParameter("dry_run", DataTypes.BooleanType); + private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM = + optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType); + private static final ProcedureParameter FILE_LIST_VIEW_PARAM = + optionalInParameter("file_list_view", DataTypes.StringType); + private static final ProcedureParameter EQUAL_SCHEMES_PARAM = + optionalInParameter("equal_schemes", STRING_MAP); + private static final ProcedureParameter EQUAL_AUTHORITIES_PARAM = + optionalInParameter("equal_authorities", STRING_MAP); + private static final ProcedureParameter PREFIX_MISMATCH_MODE_PARAM = + optionalInParameter("prefix_mismatch_mode", DataTypes.StringType); + // List files with prefix operations. Default is false. + private static final ProcedureParameter PREFIX_LISTING_PARAM = + optionalInParameter("prefix_listing", DataTypes.BooleanType); + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.optional("older_than", DataTypes.TimestampType), - ProcedureParameter.optional("location", DataTypes.StringType), - ProcedureParameter.optional("dry_run", DataTypes.BooleanType), - ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType), - ProcedureParameter.optional("file_list_view", DataTypes.StringType), - ProcedureParameter.optional("equal_schemes", STRING_MAP), - ProcedureParameter.optional("equal_authorities", STRING_MAP), - ProcedureParameter.optional("prefix_mismatch_mode", DataTypes.StringType), - // List files with prefix operations. Default is false. - ProcedureParameter.optional("prefix_listing", DataTypes.BooleanType) + TABLE_PARAM, + OLDER_THAN_PARAM, + LOCATION_PARAM, + DRY_RUN_PARAM, + MAX_CONCURRENT_DELETES_PARAM, + FILE_LIST_VIEW_PARAM, + EQUAL_SCHEMES_PARAM, + EQUAL_AUTHORITIES_PARAM, + PREFIX_MISMATCH_MODE_PARAM, + PREFIX_LISTING_PARAM }; private static final StructType OUTPUT_TYPE = @@ -99,46 +118,26 @@ public StructType outputType() { @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1)); - String location = args.isNullAt(2) ? null : args.getString(2); - boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3); - Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4); - String fileListView = args.isNullAt(5) ? null : args.getString(5); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null); + String location = input.asString(LOCATION_PARAM, null); + boolean dryRun = input.asBoolean(DRY_RUN_PARAM, false); + Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, null); + String fileListView = input.asString(FILE_LIST_VIEW_PARAM, null); Preconditions.checkArgument( maxConcurrentDeletes == null || maxConcurrentDeletes > 0, "max_concurrent_deletes should have value > 0, value: %s", maxConcurrentDeletes); - Map equalSchemes = Maps.newHashMap(); - if (!args.isNullAt(6)) { - args.getMap(6) - .foreach( - DataTypes.StringType, - DataTypes.StringType, - (k, v) -> { - equalSchemes.put(k.toString(), v.toString()); - return BoxedUnit.UNIT; - }); - } - - Map equalAuthorities = Maps.newHashMap(); - if (!args.isNullAt(7)) { - args.getMap(7) - .foreach( - DataTypes.StringType, - DataTypes.StringType, - (k, v) -> { - equalAuthorities.put(k.toString(), v.toString()); - return BoxedUnit.UNIT; - }); - } + Map equalSchemes = input.asStringMap(EQUAL_SCHEMES_PARAM, ImmutableMap.of()); + Map equalAuthorities = + input.asStringMap(EQUAL_AUTHORITIES_PARAM, ImmutableMap.of()); - PrefixMismatchMode prefixMismatchMode = - args.isNullAt(8) ? null : PrefixMismatchMode.fromString(args.getString(8)); + PrefixMismatchMode prefixMismatchMode = asPrefixMismatchMode(input, PREFIX_MISMATCH_MODE_PARAM); - boolean prefixListing = args.isNullAt(9) ? false : args.getBoolean(9); + boolean prefixListing = input.asBoolean(PREFIX_LISTING_PARAM, false); return withIcebergTable( tableIdent, @@ -225,4 +224,9 @@ private void validateInterval(long olderThanMillis) { public String description() { return "RemoveOrphanFilesProcedure"; } + + private PrefixMismatchMode asPrefixMismatchMode(ProcedureInput input, ProcedureParameter param) { + String modeAsString = input.asString(param, null); + return (modeAsString == null) ? null : PrefixMismatchMode.fromString(modeAsString); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java index e59077ae3da9..7ee6540cdb63 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java @@ -43,12 +43,15 @@ */ class RewriteManifestsProcedure extends BaseProcedure { + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter USE_CACHING_PARAM = + optionalInParameter("use_caching", DataTypes.BooleanType); + private static final ProcedureParameter SPEC_ID_PARAM = + optionalInParameter("spec_id", DataTypes.IntegerType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.optional("use_caching", DataTypes.BooleanType), - ProcedureParameter.optional("spec_id", DataTypes.IntegerType) - }; + new ProcedureParameter[] {TABLE_PARAM, USE_CACHING_PARAM, SPEC_ID_PARAM}; // counts are not nullable since the action result is never null private static final StructType OUTPUT_TYPE = @@ -84,9 +87,10 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1); - Integer specId = args.isNullAt(2) ? null : args.getInt(2); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Boolean useCaching = input.asBoolean(USE_CACHING_PARAM, null); + Integer specId = input.asInt(SPEC_ID_PARAM, null); return modifyIcebergTable( tableIdent, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java index 49cc1a5ceae3..988211c7e00c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java @@ -39,11 +39,13 @@ */ class RollbackToSnapshotProcedure extends BaseProcedure { + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter SNAPSHOT_ID_PARAM = + requiredInParameter("snapshot_id", DataTypes.LongType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.required("snapshot_id", DataTypes.LongType) - }; + new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -77,8 +79,9 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - long snapshotId = args.getLong(1); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + long snapshotId = input.asLong(SNAPSHOT_ID_PARAM); return modifyIcebergTable( tableIdent, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java index 059725f0c152..05608fb5f0c6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java @@ -20,7 +20,6 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; -import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -40,11 +39,13 @@ */ class RollbackToTimestampProcedure extends BaseProcedure { + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter TIMESTAMP_PARAM = + requiredInParameter("timestamp", DataTypes.TimestampType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.required("timestamp", DataTypes.TimestampType) - }; + new ProcedureParameter[] {TABLE_PARAM, TIMESTAMP_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -78,9 +79,10 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); // timestamps in Spark have microsecond precision so this conversion is lossy - long timestampMillis = DateTimeUtil.microsToMillis(args.getLong(1)); + long timestampMillis = input.asTimestampMillis(TIMESTAMP_PARAM); return modifyIcebergTable( tableIdent, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java index 830324af9294..974c96abaaca 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java @@ -43,12 +43,15 @@ */ class SetCurrentSnapshotProcedure extends BaseProcedure { + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter SNAPSHOT_ID_PARAM = + optionalInParameter("snapshot_id", DataTypes.LongType); + private static final ProcedureParameter REF_PARAM = + optionalInParameter("ref", DataTypes.StringType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", DataTypes.StringType), - ProcedureParameter.optional("snapshot_id", DataTypes.LongType), - ProcedureParameter.optional("ref", DataTypes.StringType) - }; + new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM, REF_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -82,9 +85,10 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Long snapshotId = args.isNullAt(1) ? null : args.getLong(1); - String ref = args.isNullAt(2) ? null : args.getString(2); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null); + String ref = input.asString(REF_PARAM, null); Preconditions.checkArgument( (snapshotId != null && ref == null) || (snapshotId == null && ref != null), "Either snapshot_id or ref must be provided, not both");