diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java index fa9e209c1dcf..a236f13f12bd 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java @@ -184,7 +184,7 @@ public void testInvalidCherrypickSnapshotCases() { assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('', 1L)", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t', '2.2')", catalogName)) .isInstanceOf(IllegalArgumentException.class) diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java index 75bbff1fc7b4..c116cb4f857f 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java @@ -189,7 +189,7 @@ public void testInvalidExpireSnapshotsCases() { assertThatThrownBy(() -> sql("CALL %s.system.expire_snapshots('')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } @TestTemplate diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java index c8762a943673..69920e1d5402 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java @@ -190,7 +190,7 @@ public void testInvalidFastForwardBranchCases() { assertThatThrownBy( () -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } @TestTemplate diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java index 3a2c7a6333f4..4958fde15d55 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -182,6 +182,6 @@ public void testInvalidApplyWapChangesCases() { assertThatThrownBy(() -> sql("CALL %s.system.publish_changes('', 'not_valid')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index f5869cf0a8e1..a5ac8a7e01ac 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -271,7 +271,7 @@ public void testInvalidRemoveOrphanFilesCases() { assertThatThrownBy(() -> sql("CALL %s.system.remove_orphan_files('')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } @TestTemplate diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java index 1dd85f814f55..b322108d75f2 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java @@ -326,7 +326,7 @@ public void testInvalidRewriteManifestsCases() { assertThatThrownBy(() -> sql("CALL %s.system.rewrite_manifests('')", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } @TestTemplate diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java index eee7bed8b0ab..cffc65f5dceb 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java @@ -281,6 +281,6 @@ public void testInvalidRollbackToSnapshotCases() { assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot('', 1L)", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); } } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java index 0e0ae5706357..ab0eca78d53b 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java @@ -214,7 +214,7 @@ public void testInvalidRollbackToSnapshotCases() { assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot(1L)", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot parse identifier for arg table: 1"); + .hasMessage("Cannot parse identifier for parameter 'table': 1"); assertThatThrownBy( () -> sql("CALL %s.system.set_current_snapshot(snapshot_id => 1L)", catalogName)) @@ -233,7 +233,7 @@ public void testInvalidRollbackToSnapshotCases() { assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot('', 1L)", catalogName)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot handle an empty identifier for argument table"); + .hasMessage("Cannot handle an empty identifier for parameter 'table'"); assertThatThrownBy( () -> diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java index b24394f83b9c..31043b953b75 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java @@ -45,11 +45,13 @@ class CherrypickSnapshotProcedure extends BaseProcedure { static final String NAME = "cherrypick_snapshot"; + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter SNAPSHOT_ID_PARAM = + requiredInParameter("snapshot_id", DataTypes.LongType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - requiredInParameter("snapshot_id", DataTypes.LongType) - }; + new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -83,8 +85,10 @@ public ProcedureParameter[] parameters() { @Override public Iterator call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - long snapshotId = args.getLong(1); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + + Identifier tableIdent = input.ident(TABLE_PARAM); + long snapshotId = input.asLong(SNAPSHOT_ID_PARAM); return asScanIterator( OUTPUT_TYPE, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index 7ff85c752bf4..2b771914c3fe 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -26,7 +26,6 @@ import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; -import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -51,15 +50,30 @@ public class ExpireSnapshotsProcedure extends BaseProcedure { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcedure.class); + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter OLDER_THAN_PARAM = + optionalInParameter("older_than", DataTypes.TimestampType); + private static final ProcedureParameter RETAIN_LAST_PARAM = + optionalInParameter("retain_last", DataTypes.IntegerType); + private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM = + optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType); + private static final ProcedureParameter STREAM_RESULTS_PARAM = + optionalInParameter("stream_results", DataTypes.BooleanType); + private static final ProcedureParameter SNAPSHOT_IDS_PARAM = + optionalInParameter("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType)); + private static final ProcedureParameter CLEAN_EXPIRED_METADATA_PARAM = + optionalInParameter("clean_expired_metadata", DataTypes.BooleanType); + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - optionalInParameter("older_than", DataTypes.TimestampType), - optionalInParameter("retain_last", DataTypes.IntegerType), - optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType), - optionalInParameter("stream_results", DataTypes.BooleanType), - optionalInParameter("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType)), - optionalInParameter("clean_expired_metadata", DataTypes.BooleanType) + TABLE_PARAM, + OLDER_THAN_PARAM, + RETAIN_LAST_PARAM, + MAX_CONCURRENT_DELETES_PARAM, + STREAM_RESULTS_PARAM, + SNAPSHOT_IDS_PARAM, + CLEAN_EXPIRED_METADATA_PARAM }; private static final StructType OUTPUT_TYPE = @@ -104,13 +118,14 @@ public ProcedureParameter[] parameters() { @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") public Iterator call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1)); - Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2); - Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3); - Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4); - long[] snapshotIds = args.isNullAt(5) ? null : args.getArray(5).toLongArray(); - Boolean cleanExpiredMetadata = args.isNullAt(6) ? null : args.getBoolean(6); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null); + Integer retainLastNum = input.asInt(RETAIN_LAST_PARAM, null); + Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, null); + Boolean streamResult = input.asBoolean(STREAM_RESULTS_PARAM, null); + long[] snapshotIds = input.asLongArray(SNAPSHOT_IDS_PARAM, null); + Boolean cleanExpiredMetadata = input.asBoolean(CLEAN_EXPIRED_METADATA_PARAM, null); Preconditions.checkArgument( maxConcurrentDeletes == null || maxConcurrentDeletes > 0, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java index 367f7d2a4da8..d7531e759408 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java @@ -35,12 +35,15 @@ public class FastForwardBranchProcedure extends BaseProcedure { static final String NAME = "fast_forward"; + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter BRANCH_PARAM = + requiredInParameter("branch", DataTypes.StringType); + private static final ProcedureParameter TO_PARAM = + requiredInParameter("to", DataTypes.StringType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - requiredInParameter("branch", DataTypes.StringType), - requiredInParameter("to", DataTypes.StringType) - }; + new ProcedureParameter[] {TABLE_PARAM, BRANCH_PARAM, TO_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -75,9 +78,11 @@ public ProcedureParameter[] parameters() { @Override public Iterator call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String from = args.getString(1); - String to = args.getString(2); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + + Identifier tableIdent = input.ident(TABLE_PARAM); + String from = input.asString(BRANCH_PARAM); + String to = input.asString(TO_PARAM); return modifyIcebergTable( tableIdent, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java index 14f891c10b40..8f6dbdcf5a62 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java @@ -26,6 +26,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; @@ -80,6 +81,22 @@ public Integer asInt(ProcedureParameter param, Integer defaultValue) { return args.isNullAt(ordinal) ? defaultValue : (Integer) args.getInt(ordinal); } + public long asTimestampMillis(ProcedureParameter param) { + Long value = asTimestampMillis(param, null); + Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name()); + return value; + } + + public Long asTimestampMillis(ProcedureParameter param, Long defaultValue) { + validateParamType(param, DataTypes.TimestampType); + int ordinal = ordinal(param); + Long value = args.isNullAt(ordinal) ? defaultValue : (Long) args.getLong(ordinal); + if (value != null) { + value = DateTimeUtil.microsToMillis(value); + } + return value; + } + public long asLong(ProcedureParameter param) { Long value = asLong(param, null); Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name()); @@ -92,6 +109,22 @@ public Long asLong(ProcedureParameter param, Long defaultValue) { return args.isNullAt(ordinal) ? defaultValue : (Long) args.getLong(ordinal); } + public long[] asLongArray(ProcedureParameter param, Long[] defaultValue) { + validateParamType(param, DataTypes.createArrayType(DataTypes.LongType)); + Long[] source = + array(param, (array, ordinal) -> array.getLong(ordinal), Long.class, defaultValue); + + if (source == null) { + return null; + } + + long[] result = new long[source.length]; + for (int i = 0; i < source.length; i++) { + result[i] = source[i]; + } + return result; + } + public String asString(ProcedureParameter param) { String value = asString(param, null); Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name()); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java index 872972e22e25..874888204334 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -49,11 +49,13 @@ class PublishChangesProcedure extends BaseProcedure { static final String NAME = "publish_changes"; + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter WAP_ID_PARAM = + requiredInParameter("wap_id", DataTypes.StringType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - requiredInParameter("wap_id", DataTypes.StringType) - }; + new ProcedureParameter[] {TABLE_PARAM, WAP_ID_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -87,8 +89,10 @@ public ProcedureParameter[] parameters() { @Override public Iterator call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String wapId = args.getString(1); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + + Identifier tableIdent = input.ident(TABLE_PARAM); + String wapId = input.asString(WAP_ID_PARAM); return modifyIcebergTable( tableIdent, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java index d4c68b1e7d31..9ba577ad7e24 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java @@ -42,11 +42,13 @@ class RegisterTableProcedure extends BaseProcedure { static final String NAME = "register_table"; + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter METADATA_FILE_PARAM = + requiredInParameter("metadata_file", DataTypes.StringType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - requiredInParameter("metadata_file", DataTypes.StringType) - }; + new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -81,9 +83,11 @@ public ProcedureParameter[] parameters() { @Override public Iterator call(InternalRow args) { + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); TableIdentifier tableName = - Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table")); - String metadataFile = args.getString(1); + Spark3Util.identifierToTableIdentifier( + toIdentifier(input.asString(TABLE_PARAM), TABLE_PARAM.name())); + String metadataFile = input.asString(METADATA_FILE_PARAM); Preconditions.checkArgument( tableCatalog() instanceof HasIcebergCatalog, "Cannot use Register Table in a non-Iceberg catalog"); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index ac3c29156e29..f30f99978c45 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -26,12 +26,11 @@ import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; -import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -45,7 +44,6 @@ import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; /** * A procedure that removes orphan files in a table. @@ -58,19 +56,40 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class); + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter OLDER_THAN_PARAM = + optionalInParameter("older_than", DataTypes.TimestampType); + private static final ProcedureParameter LOCATION_PARAM = + optionalInParameter("location", DataTypes.StringType); + private static final ProcedureParameter DRY_RUN_PARAM = + optionalInParameter("dry_run", DataTypes.BooleanType); + private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM = + optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType); + private static final ProcedureParameter FILE_LIST_VIEW_PARAM = + optionalInParameter("file_list_view", DataTypes.StringType); + private static final ProcedureParameter EQUAL_SCHEMES_PARAM = + optionalInParameter("equal_schemes", STRING_MAP); + private static final ProcedureParameter EQUAL_AUTHORITIES_PARAM = + optionalInParameter("equal_authorities", STRING_MAP); + private static final ProcedureParameter PREFIX_MISMATCH_MODE_PARAM = + optionalInParameter("prefix_mismatch_mode", DataTypes.StringType); + // List files with prefix operations. Default is false. + private static final ProcedureParameter PREFIX_LISTING_PARAM = + optionalInParameter("prefix_listing", DataTypes.BooleanType); + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - optionalInParameter("older_than", DataTypes.TimestampType), - optionalInParameter("location", DataTypes.StringType), - optionalInParameter("dry_run", DataTypes.BooleanType), - optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType), - optionalInParameter("file_list_view", DataTypes.StringType), - optionalInParameter("equal_schemes", STRING_MAP), - optionalInParameter("equal_authorities", STRING_MAP), - optionalInParameter("prefix_mismatch_mode", DataTypes.StringType), - // List files with prefix operations. Default is false. - optionalInParameter("prefix_listing", DataTypes.BooleanType) + TABLE_PARAM, + OLDER_THAN_PARAM, + LOCATION_PARAM, + DRY_RUN_PARAM, + MAX_CONCURRENT_DELETES_PARAM, + FILE_LIST_VIEW_PARAM, + EQUAL_SCHEMES_PARAM, + EQUAL_AUTHORITIES_PARAM, + PREFIX_MISMATCH_MODE_PARAM, + PREFIX_LISTING_PARAM }; private static final StructType OUTPUT_TYPE = @@ -105,46 +124,26 @@ public ProcedureParameter[] parameters() { @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") public Iterator call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1)); - String location = args.isNullAt(2) ? null : args.getString(2); - boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3); - Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4); - String fileListView = args.isNullAt(5) ? null : args.getString(5); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null); + String location = input.asString(LOCATION_PARAM, null); + boolean dryRun = input.asBoolean(DRY_RUN_PARAM, false); + Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, null); + String fileListView = input.asString(FILE_LIST_VIEW_PARAM, null); Preconditions.checkArgument( maxConcurrentDeletes == null || maxConcurrentDeletes > 0, "max_concurrent_deletes should have value > 0, value: %s", maxConcurrentDeletes); - Map equalSchemes = Maps.newHashMap(); - if (!args.isNullAt(6)) { - args.getMap(6) - .foreach( - DataTypes.StringType, - DataTypes.StringType, - (k, v) -> { - equalSchemes.put(k.toString(), v.toString()); - return BoxedUnit.UNIT; - }); - } - - Map equalAuthorities = Maps.newHashMap(); - if (!args.isNullAt(7)) { - args.getMap(7) - .foreach( - DataTypes.StringType, - DataTypes.StringType, - (k, v) -> { - equalAuthorities.put(k.toString(), v.toString()); - return BoxedUnit.UNIT; - }); - } + Map equalSchemes = input.asStringMap(EQUAL_SCHEMES_PARAM, ImmutableMap.of()); + Map equalAuthorities = + input.asStringMap(EQUAL_AUTHORITIES_PARAM, ImmutableMap.of()); - PrefixMismatchMode prefixMismatchMode = - args.isNullAt(8) ? null : PrefixMismatchMode.fromString(args.getString(8)); + PrefixMismatchMode prefixMismatchMode = asPrefixMismatchMode(input, PREFIX_MISMATCH_MODE_PARAM); - boolean prefixListing = args.isNullAt(9) ? false : args.getBoolean(9); + boolean prefixListing = input.asBoolean(PREFIX_LISTING_PARAM, false); return withIcebergTable( tableIdent, @@ -236,4 +235,9 @@ public String name() { public String description() { return "RemoveOrphanFilesProcedure"; } + + private PrefixMismatchMode asPrefixMismatchMode(ProcedureInput input, ProcedureParameter param) { + String modeAsString = input.asString(param, null); + return (modeAsString == null) ? null : PrefixMismatchMode.fromString(modeAsString); + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java index 9b9a9d068822..d4bd0764ee7e 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java @@ -48,12 +48,15 @@ class RewriteManifestsProcedure extends BaseProcedure { static final String NAME = "rewrite_manifests"; + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter USE_CACHING_PARAM = + optionalInParameter("use_caching", DataTypes.BooleanType); + private static final ProcedureParameter SPEC_ID_PARAM = + optionalInParameter("spec_id", DataTypes.IntegerType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - optionalInParameter("use_caching", DataTypes.BooleanType), - optionalInParameter("spec_id", DataTypes.IntegerType) - }; + new ProcedureParameter[] {TABLE_PARAM, USE_CACHING_PARAM, SPEC_ID_PARAM}; // counts are not nullable since the action result is never null private static final StructType OUTPUT_TYPE = @@ -89,9 +92,10 @@ public ProcedureParameter[] parameters() { @Override public Iterator call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1); - Integer specId = args.isNullAt(2) ? null : args.getInt(2); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Boolean useCaching = input.asBoolean(USE_CACHING_PARAM, null); + Integer specId = input.asInt(SPEC_ID_PARAM, null); return modifyIcebergTable( tableIdent, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java index b8a5368e770e..98e1e2b870f7 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java @@ -44,11 +44,13 @@ class RollbackToSnapshotProcedure extends BaseProcedure { static final String NAME = "rollback_to_snapshot"; + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter SNAPSHOT_ID_PARAM = + requiredInParameter("snapshot_id", DataTypes.LongType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - requiredInParameter("snapshot_id", DataTypes.LongType) - }; + new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -82,8 +84,9 @@ public ProcedureParameter[] parameters() { @Override public Iterator call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - long snapshotId = args.getLong(1); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + long snapshotId = input.asLong(SNAPSHOT_ID_PARAM); return modifyIcebergTable( tableIdent, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java index 5f7b0064c27e..fd6791df28de 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java @@ -21,7 +21,6 @@ import java.util.Iterator; import org.apache.iceberg.Snapshot; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; -import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -45,11 +44,13 @@ class RollbackToTimestampProcedure extends BaseProcedure { static final String NAME = "rollback_to_timestamp"; + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter TIMESTAMP_PARAM = + requiredInParameter("timestamp", DataTypes.TimestampType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - requiredInParameter("timestamp", DataTypes.TimestampType) - }; + new ProcedureParameter[] {TABLE_PARAM, TIMESTAMP_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -83,9 +84,10 @@ public ProcedureParameter[] parameters() { @Override public Iterator call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); // timestamps in Spark have microsecond precision so this conversion is lossy - long timestampMillis = DateTimeUtil.microsToMillis(args.getLong(1)); + long timestampMillis = input.asTimestampMillis(TIMESTAMP_PARAM); return modifyIcebergTable( tableIdent, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java index edd13fc0959f..18e3646cf529 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java @@ -48,12 +48,15 @@ class SetCurrentSnapshotProcedure extends BaseProcedure { static final String NAME = "set_current_snapshot"; + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter SNAPSHOT_ID_PARAM = + optionalInParameter("snapshot_id", DataTypes.LongType); + private static final ProcedureParameter REF_PARAM = + optionalInParameter("ref", DataTypes.StringType); + private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - requiredInParameter("table", DataTypes.StringType), - optionalInParameter("snapshot_id", DataTypes.LongType), - optionalInParameter("ref", DataTypes.StringType) - }; + new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM, REF_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -87,9 +90,10 @@ public ProcedureParameter[] parameters() { @Override public Iterator call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Long snapshotId = args.isNullAt(1) ? null : args.getLong(1); - String ref = args.isNullAt(2) ? null : args.getString(2); + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null); + String ref = input.asString(REF_PARAM, null); Preconditions.checkArgument( (snapshotId != null && ref == null) || (snapshotId == null && ref != null), "Either snapshot_id or ref must be provided, not both");