Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void testInvalidCherrypickSnapshotCases() {

assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('', 1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");

assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t', '2.2')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void testInvalidExpireSnapshotsCases() {

assertThatThrownBy(() -> sql("CALL %s.system.expire_snapshots('')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void testInvalidFastForwardBranchCases() {
assertThatThrownBy(
() -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,6 @@ public void testInvalidApplyWapChangesCases() {

assertThatThrownBy(() -> sql("CALL %s.system.publish_changes('', 'not_valid')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testInvalidRemoveOrphanFilesCases() {

assertThatThrownBy(() -> sql("CALL %s.system.remove_orphan_files('')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public void testInvalidRewriteManifestsCases() {

assertThatThrownBy(() -> sql("CALL %s.system.rewrite_manifests('')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,6 @@ public void testInvalidRollbackToSnapshotCases() {

assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot('', 1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void testInvalidRollbackToSnapshotCases() {

assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot(1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot parse identifier for arg table: 1");
.hasMessage("Cannot parse identifier for parameter 'table': 1");

assertThatThrownBy(
() -> sql("CALL %s.system.set_current_snapshot(snapshot_id => 1L)", catalogName))
Expand All @@ -233,7 +233,7 @@ public void testInvalidRollbackToSnapshotCases() {

assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot('', 1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
.hasMessage("Cannot handle an empty identifier for parameter 'table'");

assertThatThrownBy(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ class CherrypickSnapshotProcedure extends BaseProcedure {

static final String NAME = "cherrypick_snapshot";

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter SNAPSHOT_ID_PARAM =
requiredInParameter("snapshot_id", DataTypes.LongType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
requiredInParameter("snapshot_id", DataTypes.LongType)
};
new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -83,8 +85,10 @@ public ProcedureParameter[] parameters() {

@Override
public Iterator<Scan> call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
long snapshotId = args.getLong(1);
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);

Identifier tableIdent = input.ident(TABLE_PARAM);
long snapshotId = input.asLong(SNAPSHOT_ID_PARAM);

return asScanIterator(
OUTPUT_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -51,15 +50,30 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {

private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcedure.class);

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter OLDER_THAN_PARAM =
optionalInParameter("older_than", DataTypes.TimestampType);
private static final ProcedureParameter RETAIN_LAST_PARAM =
optionalInParameter("retain_last", DataTypes.IntegerType);
private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM =
optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType);
private static final ProcedureParameter STREAM_RESULTS_PARAM =
optionalInParameter("stream_results", DataTypes.BooleanType);
private static final ProcedureParameter SNAPSHOT_IDS_PARAM =
optionalInParameter("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType));
private static final ProcedureParameter CLEAN_EXPIRED_METADATA_PARAM =
optionalInParameter("clean_expired_metadata", DataTypes.BooleanType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
optionalInParameter("older_than", DataTypes.TimestampType),
optionalInParameter("retain_last", DataTypes.IntegerType),
optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType),
optionalInParameter("stream_results", DataTypes.BooleanType),
optionalInParameter("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType)),
optionalInParameter("clean_expired_metadata", DataTypes.BooleanType)
TABLE_PARAM,
OLDER_THAN_PARAM,
RETAIN_LAST_PARAM,
MAX_CONCURRENT_DELETES_PARAM,
STREAM_RESULTS_PARAM,
SNAPSHOT_IDS_PARAM,
CLEAN_EXPIRED_METADATA_PARAM
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -104,13 +118,14 @@ public ProcedureParameter[] parameters() {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Iterator<Scan> call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1));
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
long[] snapshotIds = args.isNullAt(5) ? null : args.getArray(5).toLongArray();
Boolean cleanExpiredMetadata = args.isNullAt(6) ? null : args.getBoolean(6);
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
Identifier tableIdent = input.ident(TABLE_PARAM);
Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null);
Integer retainLastNum = input.asInt(RETAIN_LAST_PARAM, null);
Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, null);
Boolean streamResult = input.asBoolean(STREAM_RESULTS_PARAM, null);
long[] snapshotIds = input.asLongArray(SNAPSHOT_IDS_PARAM, null);
Boolean cleanExpiredMetadata = input.asBoolean(CLEAN_EXPIRED_METADATA_PARAM, null);

Preconditions.checkArgument(
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ public class FastForwardBranchProcedure extends BaseProcedure {

static final String NAME = "fast_forward";

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter BRANCH_PARAM =
requiredInParameter("branch", DataTypes.StringType);
private static final ProcedureParameter TO_PARAM =
requiredInParameter("to", DataTypes.StringType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
requiredInParameter("branch", DataTypes.StringType),
requiredInParameter("to", DataTypes.StringType)
};
new ProcedureParameter[] {TABLE_PARAM, BRANCH_PARAM, TO_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -75,9 +78,11 @@ public ProcedureParameter[] parameters() {

@Override
public Iterator<Scan> call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String from = args.getString(1);
String to = args.getString(2);
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);

Identifier tableIdent = input.ident(TABLE_PARAM);
String from = input.asString(BRANCH_PARAM);
String to = input.asString(TO_PARAM);

return modifyIcebergTable(
tableIdent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
Expand Down Expand Up @@ -80,6 +81,22 @@ public Integer asInt(ProcedureParameter param, Integer defaultValue) {
return args.isNullAt(ordinal) ? defaultValue : (Integer) args.getInt(ordinal);
}

public long asTimestampMillis(ProcedureParameter param) {
Long value = asTimestampMillis(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name());
return value;
}

public Long asTimestampMillis(ProcedureParameter param, Long defaultValue) {
validateParamType(param, DataTypes.TimestampType);
int ordinal = ordinal(param);
Long value = args.isNullAt(ordinal) ? defaultValue : (Long) args.getLong(ordinal);
if (value != null) {
value = DateTimeUtil.microsToMillis(value);
}
return value;
}

public long asLong(ProcedureParameter param) {
Long value = asLong(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name());
Expand All @@ -92,6 +109,22 @@ public Long asLong(ProcedureParameter param, Long defaultValue) {
return args.isNullAt(ordinal) ? defaultValue : (Long) args.getLong(ordinal);
}

public long[] asLongArray(ProcedureParameter param, Long[] defaultValue) {
validateParamType(param, DataTypes.createArrayType(DataTypes.LongType));
Long[] source =
array(param, (array, ordinal) -> array.getLong(ordinal), Long.class, defaultValue);

if (source == null) {
return null;
}

long[] result = new long[source.length];
for (int i = 0; i < source.length; i++) {
result[i] = source[i];
}
return result;
}

public String asString(ProcedureParameter param) {
String value = asString(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ class PublishChangesProcedure extends BaseProcedure {

static final String NAME = "publish_changes";

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter WAP_ID_PARAM =
requiredInParameter("wap_id", DataTypes.StringType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
requiredInParameter("wap_id", DataTypes.StringType)
};
new ProcedureParameter[] {TABLE_PARAM, WAP_ID_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -87,8 +89,10 @@ public ProcedureParameter[] parameters() {

@Override
public Iterator<Scan> call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String wapId = args.getString(1);
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);

Identifier tableIdent = input.ident(TABLE_PARAM);
String wapId = input.asString(WAP_ID_PARAM);

return modifyIcebergTable(
tableIdent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ class RegisterTableProcedure extends BaseProcedure {

static final String NAME = "register_table";

private static final ProcedureParameter TABLE_PARAM =
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter METADATA_FILE_PARAM =
requiredInParameter("metadata_file", DataTypes.StringType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
requiredInParameter("table", DataTypes.StringType),
requiredInParameter("metadata_file", DataTypes.StringType)
};
new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -81,9 +83,11 @@ public ProcedureParameter[] parameters() {

@Override
public Iterator<Scan> call(InternalRow args) {
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
TableIdentifier tableName =
Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
String metadataFile = args.getString(1);
Spark3Util.identifierToTableIdentifier(
toIdentifier(input.asString(TABLE_PARAM), TABLE_PARAM.name()));
String metadataFile = input.asString(METADATA_FILE_PARAM);
Preconditions.checkArgument(
tableCatalog() instanceof HasIcebergCatalog,
"Cannot use Register Table in a non-Iceberg catalog");
Expand Down
Loading