diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java new file mode 100644 index 000000000000..efa20dfc51d9 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.file.Path; +import java.util.List; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.RewriteTablePathUtil; +import org.apache.iceberg.Table; +import org.apache.spark.sql.AnalysisException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.io.TempDir; + +public class TestRewriteTablePathProcedure extends ExtensionsTestBase { + @TempDir private Path staging; + @TempDir private Path targetTableDir; + + @BeforeEach + public void setupTableLocation() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + } + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testRewriteTablePathWithPositionalArgument() { + String location = targetTableDir.toFile().toURI().toString(); + Table table = validationCatalog.loadTable(tableIdent); + String metadataJson = + (((HasTableOperations) table).operations()).current().metadataFileLocation(); + + List result = + sql( + "CALL %s.system.rewrite_table_path('%s', '%s', '%s')", + catalogName, tableIdent, table.location(), location); + assertThat(result).hasSize(1); + assertThat(result.get(0)[0]) + .as("Should return correct latest version") + .isEqualTo(RewriteTablePathUtil.fileName(metadataJson)); + assertThat(result.get(0)[1]) + .as("Should return file_list_location") + .asString() + .startsWith(table.location()) + .endsWith("file-list"); + checkFileListLocationCount((String) result.get(0)[1], 1); + } + + @TestTemplate + public void testRewriteTablePathWithNamedArgument() { + Table table = validationCatalog.loadTable(tableIdent); + String v0Metadata = + RewriteTablePathUtil.fileName( + (((HasTableOperations) table).operations()).current().metadataFileLocation()); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + String v1Metadata = + RewriteTablePathUtil.fileName( + (((HasTableOperations) table).operations()).refresh().metadataFileLocation()); + + String targetLocation = targetTableDir.toFile().toURI().toString(); + String stagingLocation = staging.toFile().toURI().toString(); + String expectedFileListLocation = stagingLocation + "file-list"; + + List result = + sql( + "CALL %s.system.rewrite_table_path(" + + "table => '%s', " + + "target_prefix => '%s', " + + "source_prefix => '%s', " + + "end_version => '%s', " + + "start_version => '%s', " + + "staging_location => '%s')", + catalogName, + tableIdent, + targetLocation, + table.location(), + v1Metadata, + v0Metadata, + stagingLocation); + assertThat(result).hasSize(1); + assertThat(result.get(0)[0]).as("Should return correct latest version").isEqualTo(v1Metadata); + assertThat(result.get(0)[1]) + .as("Should return correct file_list_location") + .isEqualTo(expectedFileListLocation); + checkFileListLocationCount((String) result.get(0)[1], 4); + } + + @TestTemplate + public void testProcedureWithInvalidInput() { + String targetLocation = targetTableDir.toFile().toURI().toString(); + + assertThatThrownBy( + () -> sql("CALL %s.system.rewrite_table_path('%s')", catalogName, tableIdent)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Missing required parameters: [source_prefix,target_prefix]"); + assertThatThrownBy( + () -> + sql( + "CALL %s.system.rewrite_table_path('%s','%s')", + catalogName, tableIdent, targetLocation)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Missing required parameters: [target_prefix]"); + assertThatThrownBy( + () -> + sql( + "CALL %s.system.rewrite_table_path('%s', '%s','%s')", + catalogName, "notExists", targetLocation, targetLocation)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Couldn't load table"); + + Table table = validationCatalog.loadTable(tableIdent); + String v0Metadata = + RewriteTablePathUtil.fileName( + (((HasTableOperations) table).operations()).current().metadataFileLocation()); + assertThatThrownBy( + () -> + sql( + "CALL %s.system.rewrite_table_path(" + + "table => '%s', " + + "source_prefix => '%s', " + + "target_prefix => '%s', " + + "start_version => '%s')", + catalogName, tableIdent, table.location(), targetLocation, "v20.metadata.json")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot find provided version file %s in metadata log.", "v20.metadata.json"); + assertThatThrownBy( + () -> + sql( + "CALL %s.system.rewrite_table_path(" + + "table => '%s', " + + "source_prefix => '%s', " + + "target_prefix => '%s', " + + "start_version => '%s'," + + "end_version => '%s')", + catalogName, + tableIdent, + table.location(), + targetLocation, + v0Metadata, + "v11.metadata.json")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot find provided version file %s in metadata log.", "v11.metadata.json"); + } + + private void checkFileListLocationCount(String fileListLocation, long expectedFileCount) { + long fileCount = spark.read().format("text").load(fileListLocation).count(); + assertThat(fileCount).isEqualTo(expectedFileCount); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 64469384b881..4d5d1db38e25 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -219,8 +219,10 @@ private String validateVersion(TableMetadata tableMetadata, String versionFileNa } } - Preconditions.checkNotNull( - versionFile, "Version file %s does not exist in metadata log.", versionFile); + Preconditions.checkArgument( + versionFile != null, + "Cannot find provided version file %s in metadata log.", + versionFileName); Preconditions.checkArgument( fileExist(versionFile), "Version file %s does not exist.", versionFile); return versionFile; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java new file mode 100644 index 000000000000..b936dcfcedfe --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.spark.actions.RewriteTablePathSparkAction; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +public class RewriteTablePathProcedure extends BaseProcedure { + + private static final ProcedureParameter TABLE_PARAM = + ProcedureParameter.required("table", DataTypes.StringType); + private static final ProcedureParameter SOURCE_PREFIX_PARAM = + ProcedureParameter.required("source_prefix", DataTypes.StringType); + private static final ProcedureParameter TARGET_PREFIX_PARAM = + ProcedureParameter.required("target_prefix", DataTypes.StringType); + private static final ProcedureParameter START_VERSION_PARAM = + ProcedureParameter.optional("start_version", DataTypes.StringType); + private static final ProcedureParameter END_VERSION_PARM = + ProcedureParameter.optional("end_version", DataTypes.StringType); + private static final ProcedureParameter STAGING_LOCATION_PARAM = + ProcedureParameter.optional("staging_location", DataTypes.StringType); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + TABLE_PARAM, + SOURCE_PREFIX_PARAM, + TARGET_PREFIX_PARAM, + START_VERSION_PARAM, + END_VERSION_PARM, + STAGING_LOCATION_PARAM + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("latest_version", DataTypes.StringType, true, Metadata.empty()), + new StructField("file_list_location", DataTypes.StringType, true, Metadata.empty()) + }); + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + protected RewriteTablePathProcedure doBuild() { + return new RewriteTablePathProcedure(tableCatalog()); + } + }; + } + + private RewriteTablePathProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + String sourcePrefix = input.asString(SOURCE_PREFIX_PARAM); + String targetPrefix = input.asString(TARGET_PREFIX_PARAM); + String startVersion = input.asString(START_VERSION_PARAM, null); + String endVersion = input.asString(END_VERSION_PARM, null); + String stagingLocation = input.asString(STAGING_LOCATION_PARAM, null); + + return withIcebergTable( + tableIdent, + table -> { + RewriteTablePathSparkAction action = SparkActions.get().rewriteTablePath(table); + + if (startVersion != null) { + action.startVersion(startVersion); + } + if (endVersion != null) { + action.endVersion(endVersion); + } + if (stagingLocation != null) { + action.stagingLocation(stagingLocation); + } + + return toOutputRows(action.rewriteLocationPrefix(sourcePrefix, targetPrefix).execute()); + }); + } + + private InternalRow[] toOutputRows(RewriteTablePath.Result result) { + return new InternalRow[] { + newInternalRow( + UTF8String.fromString(result.latestVersion()), + UTF8String.fromString(result.fileListLocation())) + }; + } + + @Override + public String description() { + return "RewriteTablePathProcedure"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index d636a21ddc00..353970443025 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -62,6 +62,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder); mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder); mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder); + mapBuilder.put("rewrite_table_path", RewriteTablePathProcedure::builder); return mapBuilder.build(); }