diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java new file mode 100644 index 000000000000..4d7397bb150f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +public class BaseMigrateTableActionResult implements MigrateTable.Result { + + private final long migratedDataFilesCount; + + public BaseMigrateTableActionResult(long migratedDataFilesCount) { + this.migratedDataFilesCount = migratedDataFilesCount; + } + + @Override + public long migratedDataFilesCount() { + return migratedDataFilesCount; + } +} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java new file mode 100644 index 000000000000..3ea24d374716 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +public class BaseSnapshotTableActionResult implements SnapshotTable.Result { + + private final long importedDataFilesCount; + + public BaseSnapshotTableActionResult(long importedDataFilesCount) { + this.importedDataFilesCount = importedDataFilesCount; + } + + @Override + public long importedDataFilesCount() { + return importedDataFilesCount; + } +} diff --git a/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java b/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java index 6760dd13f0df..a567bda9139a 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java @@ -21,6 +21,7 @@ import java.util.Map; +@Deprecated public interface CreateAction extends Action { /** diff --git a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java b/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java index cb1a55945bdf..6c3e8e6d31a5 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java @@ -19,6 +19,7 @@ package org.apache.iceberg.actions; +@Deprecated public interface SnapshotAction extends CreateAction { SnapshotAction withLocation(String location); } diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java index e2df5eebfbd6..b5c777d2b219 100644 --- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java +++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java @@ -20,157 +20,43 @@ package org.apache.iceberg.actions; import java.util.Map; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SnapshotSummary; -import org.apache.iceberg.Table; -import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.JobGroupInfo; -import org.apache.iceberg.spark.SparkSessionCatalog; -import org.apache.iceberg.spark.SparkTableUtil; -import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.iceberg.spark.actions.BaseMigrateTableSparkAction; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.connector.catalog.CatalogPlugin; import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Some; -import scala.collection.JavaConverters; /** * Takes a Spark table in the sourceCatalog and attempts to transform it into an Iceberg * Table in the same location with the same identifier. Once complete the identifier which * previously referred to a non-iceberg table will refer to the newly migrated iceberg * table. + * + * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link MigrateTable} instead. */ -public class Spark3MigrateAction extends Spark3CreateAction { - private static final Logger LOG = LoggerFactory.getLogger(Spark3MigrateAction.class); - private static final String BACKUP_SUFFIX = "_BACKUP_"; +@Deprecated +public class Spark3MigrateAction implements CreateAction { - private final Identifier backupIdent; + private final MigrateTable delegate; public Spark3MigrateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableName) { - super(spark, sourceCatalog, sourceTableName, sourceCatalog, sourceTableName); - String backupName = sourceTableIdent().name() + BACKUP_SUFFIX; - this.backupIdent = Identifier.of(sourceTableIdent().namespace(), backupName); - } - - private Long doExecute() { - LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent()); - - // Move source table to a new name, halting all modifications and allowing us to stage - // the creation of a new Iceberg table in its place - renameAndBackupSourceTable(); - - StagedSparkTable stagedTable = null; - Table icebergTable; - boolean threw = true; - try { - LOG.info("Staging a new Iceberg table {}", destTableIdent()); - stagedTable = stageDestTable(); - icebergTable = stagedTable.table(); - - LOG.info("Ensuring {} has a valid name mapping", destTableIdent()); - ensureNameMappingPresent(icebergTable); - - String stagingLocation = getMetadataLocation(icebergTable); - Some backupNamespace = Some.apply(backupIdent.namespace()[0]); - TableIdentifier v1BackupIdentifier = new TableIdentifier(backupIdent.name(), backupNamespace); - LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation); - SparkTableUtil.importSparkTable(spark(), v1BackupIdentifier, icebergTable, stagingLocation); - - LOG.info("Committing staged changes to {}", destTableIdent()); - stagedTable.commitStagedChanges(); - threw = false; - } finally { - if (threw) { - LOG.error("Error when attempting perform migration changes, aborting table creation and restoring backup."); - - restoreSourceTable(); - - if (stagedTable != null) { - try { - stagedTable.abortStagedChanges(); - } catch (Exception abortException) { - LOG.error("Cannot abort staged changes", abortException); - } - } - } - } - - Snapshot snapshot = icebergTable.currentSnapshot(); - long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); - LOG.info("Successfully loaded Iceberg metadata for {} files", numMigratedFiles); - return numMigratedFiles; + this.delegate = new BaseMigrateTableSparkAction(spark, sourceCatalog, sourceTableName); } @Override - public Long execute() { - JobGroupInfo info = new JobGroupInfo("MIGRATE", "MIGRATE", false); - return withJobGroupInfo(info, this::doExecute); + public CreateAction withProperties(Map properties) { + delegate.tableProperties(properties); + return this; } @Override - protected Map targetTableProps() { - Map properties = Maps.newHashMap(); - - // copy over relevant source table props - properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava()); - EXCLUDED_PROPERTIES.forEach(properties::remove); - - // set default and user-provided props - properties.put(TableCatalog.PROP_PROVIDER, "iceberg"); - properties.putAll(additionalProperties()); - - // make sure we mark this table as migrated - properties.put("migrated", "true"); - - // inherit the source table location - properties.putIfAbsent(LOCATION, sourceTableLocation()); - - return properties; + public CreateAction withProperty(String key, String value) { + delegate.tableProperty(key, value); + return this; } @Override - protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) { - // Currently the Import code relies on being able to look up the table in the session code - Preconditions.checkArgument(catalog instanceof SparkSessionCatalog, - "Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.", - catalog.name(), catalog.getClass().getName()); - - return (TableCatalog) catalog; - } - - private void renameAndBackupSourceTable() { - try { - LOG.info("Renaming {} as {} for backup", sourceTableIdent(), backupIdent); - destCatalog().renameTable(sourceTableIdent(), backupIdent); - - } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) { - throw new NoSuchTableException("Cannot find source table %s", sourceTableIdent()); - - } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) { - throw new AlreadyExistsException( - "Cannot rename %s as %s for backup. The backup table already exists.", - sourceTableIdent(), backupIdent); - } - } - - private void restoreSourceTable() { - try { - LOG.info("Restoring {} from {}", sourceTableIdent(), backupIdent); - destCatalog().renameTable(backupIdent, sourceTableIdent()); - - } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) { - LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e); - - } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) { - LOG.error("Cannot restore the original table, a table with the original name exists. " + - "Use the backup table {} to restore the original table manually.", backupIdent, e); - } + public Long execute() { + MigrateTable.Result result = delegate.execute(); + return result.migratedDataFilesCount(); } } diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java index c4ed14d4600f..99483505b74a 100644 --- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java +++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java @@ -20,132 +20,51 @@ package org.apache.iceberg.actions; import java.util.Map; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SnapshotSummary; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.JobGroupInfo; -import org.apache.iceberg.spark.SparkTableUtil; -import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.iceberg.spark.actions.BaseSnapshotTableSparkAction; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.connector.catalog.CatalogPlugin; import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.JavaConverters; /** * Creates a new Iceberg table based on a source Spark table. The new Iceberg table will * have a different data and metadata directory allowing it to exist independently of the * source table. + * + * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link SnapshotTable} instead. */ -public class Spark3SnapshotAction extends Spark3CreateAction implements SnapshotAction { - private static final Logger LOG = LoggerFactory.getLogger(Spark3SnapshotAction.class); +@Deprecated +public class Spark3SnapshotAction implements SnapshotAction { - private String destTableLocation = null; + private final SnapshotTable delegate; public Spark3SnapshotAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent, CatalogPlugin destCatalog, Identifier destTableIdent) { - super(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent); - } - - private Long doExecute() { - LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent()); - StagedSparkTable stagedTable = stageDestTable(); - Table icebergTable = stagedTable.table(); - // TODO Check table location here against source location - - boolean threw = true; - try { - LOG.info("Ensuring {} has a valid name mapping", destTableIdent()); - ensureNameMappingPresent(icebergTable); - - String stagingLocation = getMetadataLocation(icebergTable); - TableIdentifier v1TableIdentifier = v1SourceTable().identifier(); - LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation); - SparkTableUtil.importSparkTable(spark(), v1TableIdentifier, icebergTable, stagingLocation); - - LOG.info("Committing staged changes to {}", destTableIdent()); - stagedTable.commitStagedChanges(); - threw = false; - } finally { - if (threw) { - LOG.error("Error when attempting to commit snapshot changes, rolling back"); - try { - stagedTable.abortStagedChanges(); - } catch (Exception abortException) { - LOG.error("Cannot abort staged changes", abortException); - } - } - } - - Snapshot snapshot = icebergTable.currentSnapshot(); - long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); - LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numMigratedFiles, destTableIdent()); - return numMigratedFiles; + delegate = new BaseSnapshotTableSparkAction(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent); } @Override - public Long execute() { - JobGroupInfo info = new JobGroupInfo("SNAPSHOT", "SNAPSHOT", false); - return withJobGroupInfo(info, this::doExecute); + public SnapshotAction withLocation(String location) { + delegate.tableLocation(location); + return this; } @Override - protected Map targetTableProps() { - Map properties = Maps.newHashMap(); - - // copy over relevant source table props - properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava()); - EXCLUDED_PROPERTIES.forEach(properties::remove); - - // Remove any possible location properties from origin properties - properties.remove(LOCATION); - properties.remove(TableProperties.WRITE_METADATA_LOCATION); - properties.remove(TableProperties.WRITE_NEW_DATA_LOCATION); - - // set default and user-provided props - properties.put(TableCatalog.PROP_PROVIDER, "iceberg"); - properties.putAll(additionalProperties()); - - // make sure we mark this table as a snapshot table - properties.put(TableProperties.GC_ENABLED, "false"); - properties.put("snapshot", "true"); - - // Don't use the default location for the destination table if an alternate has be set - if (destTableLocation != null) { - properties.put(LOCATION, destTableLocation); - } - - return properties; + public SnapshotAction withProperties(Map properties) { + delegate.tableProperties(properties); + return this; } @Override - protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) { - // Currently the Import code relies on being able to look up the table in the session code - Preconditions.checkArgument(catalog.name().equals("spark_catalog"), - "Cannot snapshot a table that isn't in spark_catalog, the session catalog. Found source catalog %s", - catalog.name()); - - Preconditions.checkArgument(catalog instanceof TableCatalog, - "Cannot snapshot a table from a non-table catalog %s. Catalog has class of %s.", catalog.name(), - catalog.getClass().toString()); - - return (TableCatalog) catalog; + public SnapshotAction withProperty(String key, String value) { + delegate.tableProperty(key, value); + return this; } @Override - public SnapshotAction withLocation(String location) { - Preconditions.checkArgument(!sourceTableLocation().equals(location), - "Cannot create snapshot where destination location is the same as the source location." + - " This would cause a mixing of original table created and snapshot created files."); - this.destTableLocation = location; - return this; + public Long execute() { + SnapshotTable.Result result = delegate.execute(); + return result.importedDataFilesCount(); } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java new file mode 100644 index 000000000000..021c5e593631 --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.util.Map; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseMigrateTableActionResult; +import org.apache.iceberg.actions.MigrateTable; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.StagingTableCatalog; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Some; +import scala.collection.JavaConverters; + +/** + * Takes a Spark table in the source catalog and attempts to transform it into an Iceberg + * table in the same location with the same identifier. Once complete the identifier which + * previously referred to a non-Iceberg table will refer to the newly migrated Iceberg + * table. + */ +public class BaseMigrateTableSparkAction + extends BaseTableCreationSparkAction + implements MigrateTable { + + private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class); + private static final String BACKUP_SUFFIX = "_BACKUP_"; + + private final StagingTableCatalog destCatalog; + private final Identifier destTableIdent; + private final Identifier backupIdent; + + public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) { + super(spark, sourceCatalog, sourceTableIdent); + this.destCatalog = checkDestinationCatalog(sourceCatalog); + this.destTableIdent = sourceTableIdent; + String backupName = sourceTableIdent.name() + BACKUP_SUFFIX; + this.backupIdent = Identifier.of(sourceTableIdent.namespace(), backupName); + } + + @Override + protected MigrateTable self() { + return this; + } + + @Override + protected StagingTableCatalog destCatalog() { + return destCatalog; + } + + @Override + protected Identifier destTableIdent() { + return destTableIdent; + } + + @Override + public MigrateTable tableProperties(Map properties) { + setProperties(properties); + return this; + } + + @Override + public MigrateTable tableProperty(String property, String value) { + setProperty(property, value); + return this; + } + + @Override + public MigrateTable.Result execute() { + JobGroupInfo info = newJobGroupInfo("MIGRATE-TABLE", "MIGRATE-TABLE"); + return withJobGroupInfo(info, this::doExecute); + } + + private MigrateTable.Result doExecute() { + LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent()); + + // move the source table to a new name, halting all modifications and allowing us to stage + // the creation of a new Iceberg table in its place + renameAndBackupSourceTable(); + + StagedSparkTable stagedTable = null; + Table icebergTable; + boolean threw = true; + try { + LOG.info("Staging a new Iceberg table {}", destTableIdent()); + stagedTable = stageDestTable(); + icebergTable = stagedTable.table(); + + LOG.info("Ensuring {} has a valid name mapping", destTableIdent()); + ensureNameMappingPresent(icebergTable); + + Some backupNamespace = Some.apply(backupIdent.namespace()[0]); + TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace); + String stagingLocation = getMetadataLocation(icebergTable); + LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation); + SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable, stagingLocation); + + LOG.info("Committing staged changes to {}", destTableIdent()); + stagedTable.commitStagedChanges(); + threw = false; + } finally { + if (threw) { + LOG.error("Failed to perform the migration, aborting table creation and restoring the original table"); + + restoreSourceTable(); + + if (stagedTable != null) { + try { + stagedTable.abortStagedChanges(); + } catch (Exception abortException) { + LOG.error("Cannot abort staged changes", abortException); + } + } + } + } + + Snapshot snapshot = icebergTable.currentSnapshot(); + long migratedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + LOG.info("Successfully loaded Iceberg metadata for {} files to {}", migratedDataFilesCount, destTableIdent()); + return new BaseMigrateTableActionResult(migratedDataFilesCount); + } + + @Override + protected Map destTableProps() { + Map properties = Maps.newHashMap(); + + // copy over relevant source table props + properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava()); + EXCLUDED_PROPERTIES.forEach(properties::remove); + + // set default and user-provided props + properties.put(TableCatalog.PROP_PROVIDER, "iceberg"); + properties.putAll(additionalProperties()); + + // make sure we mark this table as migrated + properties.put("migrated", "true"); + + // inherit the source table location + properties.putIfAbsent(LOCATION, sourceTableLocation()); + + return properties; + } + + @Override + protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) { + // currently the import code relies on being able to look up the table in the session catalog + Preconditions.checkArgument(catalog instanceof SparkSessionCatalog, + "Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.", + catalog.name(), catalog.getClass().getName()); + + return (TableCatalog) catalog; + } + + private void renameAndBackupSourceTable() { + try { + LOG.info("Renaming {} as {} for backup", sourceTableIdent(), backupIdent); + destCatalog().renameTable(sourceTableIdent(), backupIdent); + + } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) { + throw new NoSuchTableException("Cannot find source table %s", sourceTableIdent()); + + } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) { + throw new AlreadyExistsException( + "Cannot rename %s as %s for backup. The backup table already exists.", + sourceTableIdent(), backupIdent); + } + } + + private void restoreSourceTable() { + try { + LOG.info("Restoring {} from {}", sourceTableIdent(), backupIdent); + destCatalog().renameTable(backupIdent, sourceTableIdent()); + + } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) { + LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e); + + } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) { + LOG.error("Cannot restore the original table, a table with the original name exists. " + + "Use the backup table {} to restore the original table manually.", backupIdent, e); + } + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java new file mode 100644 index 000000000000..4d4f37ba1467 --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.util.Map; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.BaseSnapshotTableActionResult; +import org.apache.iceberg.actions.SnapshotTable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.StagingTableCatalog; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + +/** + * Creates a new Iceberg table based on a source Spark table. The new Iceberg table will + * have a different data and metadata directory allowing it to exist independently of the + * source table. + */ +public class BaseSnapshotTableSparkAction + extends BaseTableCreationSparkAction + implements SnapshotTable { + + private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class); + + private StagingTableCatalog destCatalog; + private Identifier destTableIdent; + private String destTableLocation = null; + + BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) { + super(spark, sourceCatalog, sourceTableIdent); + } + + // used by the old constructor + public BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent, + CatalogPlugin destCatalog, Identifier destTableIdent) { + super(spark, sourceCatalog, sourceTableIdent); + this.destCatalog = checkDestinationCatalog(destCatalog); + this.destTableIdent = destTableIdent; + } + + @Override + protected SnapshotTable self() { + return this; + } + + @Override + protected StagingTableCatalog destCatalog() { + return destCatalog; + } + + @Override + protected Identifier destTableIdent() { + return destTableIdent; + } + + @Override + public SnapshotTable as(String ident) { + String ctx = "snapshot destination"; + CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog(); + CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog); + this.destCatalog = checkDestinationCatalog(catalogAndIdent.catalog()); + this.destTableIdent = catalogAndIdent.identifier(); + return this; + } + + @Override + public SnapshotTable tableProperties(Map properties) { + setProperties(properties); + return this; + } + + @Override + public SnapshotTable tableProperty(String property, String value) { + setProperty(property, value); + return this; + } + + @Override + public SnapshotTable.Result execute() { + JobGroupInfo info = newJobGroupInfo("SNAPSHOT-TABLE", "SNAPSHOT-TABLE"); + return withJobGroupInfo(info, this::doExecute); + } + + private SnapshotTable.Result doExecute() { + Preconditions.checkArgument(destCatalog() != null && destTableIdent() != null, + "The destination catalog and identifier cannot be null. " + + "Make sure to configure the action with a valid destination table identifier via the `as` method."); + + LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent()); + StagedSparkTable stagedTable = stageDestTable(); + Table icebergTable = stagedTable.table(); + + // TODO: Check the dest table location does not overlap with the source table location + + boolean threw = true; + try { + LOG.info("Ensuring {} has a valid name mapping", destTableIdent()); + ensureNameMappingPresent(icebergTable); + + TableIdentifier v1TableIdent = v1SourceTable().identifier(); + String stagingLocation = getMetadataLocation(icebergTable); + LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation); + SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, stagingLocation); + + LOG.info("Committing staged changes to {}", destTableIdent()); + stagedTable.commitStagedChanges(); + threw = false; + } finally { + if (threw) { + LOG.error("Error when populating the staged table with metadata, aborting changes"); + + try { + stagedTable.abortStagedChanges(); + } catch (Exception abortException) { + LOG.error("Cannot abort staged changes", abortException); + } + } + } + + Snapshot snapshot = icebergTable.currentSnapshot(); + long importedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + LOG.info("Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent()); + return new BaseSnapshotTableActionResult(importedDataFilesCount); + } + + @Override + protected Map destTableProps() { + Map properties = Maps.newHashMap(); + + // copy over relevant source table props + properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava()); + EXCLUDED_PROPERTIES.forEach(properties::remove); + + // remove any possible location properties from origin properties + properties.remove(LOCATION); + properties.remove(TableProperties.WRITE_METADATA_LOCATION); + properties.remove(TableProperties.WRITE_NEW_DATA_LOCATION); + + // set default and user-provided props + properties.put(TableCatalog.PROP_PROVIDER, "iceberg"); + properties.putAll(additionalProperties()); + + // make sure we mark this table as a snapshot table + properties.put(TableProperties.GC_ENABLED, "false"); + properties.put("snapshot", "true"); + + // set the destination table location if provided + if (destTableLocation != null) { + properties.put(LOCATION, destTableLocation); + } + + return properties; + } + + @Override + protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) { + // currently the import code relies on being able to look up the table in the session catalog + Preconditions.checkArgument(catalog.name().equalsIgnoreCase("spark_catalog"), + "Cannot snapshot a table that isn't in the session catalog (i.e. spark_catalog). " + + "Found source catalog: %s.", catalog.name()); + + Preconditions.checkArgument(catalog instanceof TableCatalog, + "Cannot snapshot as catalog %s of class %s in not a table catalog", + catalog.name(), catalog.getClass().getName()); + + return (TableCatalog) catalog; + } + + @Override + public SnapshotTable tableLocation(String location) { + Preconditions.checkArgument(!sourceTableLocation().equals(location), + "The snapshot table location cannot be same as the source table location. " + + "This would mix snapshot table files with original table files."); + this.destTableLocation = location; + return this; + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java similarity index 74% rename from spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java rename to spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java index a87776d6d9ae..6eadece65cb6 100644 --- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java @@ -17,13 +17,12 @@ * under the License. */ -package org.apache.iceberg.actions; +package org.apache.iceberg.spark.actions; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.function.Supplier; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.AlreadyExistsException; @@ -36,12 +35,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.JobGroupInfo; -import org.apache.iceberg.spark.JobGroupUtils; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.SparkSessionCatalog; import org.apache.iceberg.spark.source.StagedSparkTable; -import org.apache.spark.SparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.catalog.CatalogTable; import org.apache.spark.sql.catalyst.catalog.CatalogUtils; @@ -53,15 +49,13 @@ import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; -abstract class Spark3CreateAction implements CreateAction { +abstract class BaseTableCreationSparkAction extends BaseSparkAction { private static final Set ALLOWED_SOURCES = ImmutableSet.of("parquet", "avro", "orc", "hive"); protected static final String LOCATION = "location"; protected static final String ICEBERG_METADATA_FOLDER = "metadata"; protected static final List EXCLUDED_PROPERTIES = ImmutableList.of("path", "transient_lastDdlTime", "serialization.format"); - private final SparkSession spark; - // Source Fields private final V1Table sourceTable; private final CatalogTable sourceCatalogTable; @@ -69,21 +63,14 @@ abstract class Spark3CreateAction implements CreateAction { private final TableCatalog sourceCatalog; private final Identifier sourceTableIdent; - // Destination Fields - private final StagingTableCatalog destCatalog; - private final Identifier destTableIdent; - // Optional Parameters for destination private final Map additionalProperties = Maps.newHashMap(); - Spark3CreateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent, - CatalogPlugin destCatalog, Identifier destTableIdent) { + BaseTableCreationSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) { + super(spark); - this.spark = spark; this.sourceCatalog = checkSourceCatalog(sourceCatalog); this.sourceTableIdent = sourceTableIdent; - this.destCatalog = checkDestinationCatalog(destCatalog); - this.destTableIdent = destTableIdent; try { this.sourceTable = (V1Table) this.sourceCatalog.loadTable(sourceTableIdent); @@ -99,21 +86,13 @@ abstract class Spark3CreateAction implements CreateAction { this.sourceTableLocation = CatalogUtils.URIToString(sourceCatalogTable.storage().locationUri().get()); } - @Override - public CreateAction withProperties(Map properties) { - this.additionalProperties.putAll(properties); - return this; - } + protected abstract TableCatalog checkSourceCatalog(CatalogPlugin catalog); - @Override - public CreateAction withProperty(String key, String value) { - this.additionalProperties.put(key, value); - return this; - } + protected abstract StagingTableCatalog destCatalog(); - protected SparkSession spark() { - return spark; - } + protected abstract Identifier destTableIdent(); + + protected abstract Map destTableProps(); protected String sourceTableLocation() { return sourceTableLocation; @@ -131,12 +110,12 @@ protected Identifier sourceTableIdent() { return sourceTableIdent; } - protected StagingTableCatalog destCatalog() { - return destCatalog; + protected void setProperties(Map properties) { + additionalProperties.putAll(properties); } - protected Identifier destTableIdent() { - return destTableIdent; + protected void setProperty(String key, String value) { + additionalProperties.put(key, value); } protected Map additionalProperties() { @@ -151,7 +130,7 @@ private void validateSourceTable() { "Cannot create an Iceberg table from a source without an explicit location"); } - private StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog) { + protected StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog) { Preconditions.checkArgument(catalog instanceof SparkSessionCatalog || catalog instanceof SparkCatalog, "Cannot create Iceberg table in non-Iceberg Catalog. " + "Catalog '%s' was of class '%s' but '%s' or '%s' are required", @@ -163,15 +142,14 @@ private StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog) { protected StagedSparkTable stageDestTable() { try { - Map props = targetTableProps(); + Map props = destTableProps(); StructType schema = sourceTable.schema(); Transform[] partitioning = sourceTable.partitioning(); - return (StagedSparkTable) destCatalog.stageCreate(destTableIdent, schema, partitioning, props); + return (StagedSparkTable) destCatalog().stageCreate(destTableIdent(), schema, partitioning, props); } catch (org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException e) { - throw new NoSuchNamespaceException("Cannot create a table '%s' because the namespace does not exist", - destTableIdent); + throw new NoSuchNamespaceException("Cannot create table %s as the namespace does not exist", destTableIdent()); } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) { - throw new AlreadyExistsException("Cannot create table '%s' because it already exists", destTableIdent); + throw new AlreadyExistsException("Cannot create table %s as it already exists", destTableIdent()); } } @@ -187,20 +165,4 @@ protected String getMetadataLocation(Table table) { return table.properties().getOrDefault(TableProperties.WRITE_METADATA_LOCATION, table.location() + "/" + ICEBERG_METADATA_FOLDER); } - - protected abstract Map targetTableProps(); - - protected abstract TableCatalog checkSourceCatalog(CatalogPlugin catalog); - - protected T withJobGroupInfo(JobGroupInfo info, Supplier supplier) { - SparkContext context = spark().sparkContext(); - JobGroupInfo previousInfo = JobGroupUtils.getJobGroupInfo(context); - try { - JobGroupUtils.setJobGroupInfo(context, info); - return supplier.get(); - } finally { - JobGroupUtils.setJobGroupInfo(context, previousInfo); - } - } - }