Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

public class BaseMigrateTableActionResult implements MigrateTable.Result {

private final long migratedDataFilesCount;

public BaseMigrateTableActionResult(long migratedDataFilesCount) {
this.migratedDataFilesCount = migratedDataFilesCount;
}

@Override
public long migratedDataFilesCount() {
return migratedDataFilesCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

public class BaseSnapshotTableActionResult implements SnapshotTable.Result {

private final long importedDataFilesCount;

public BaseSnapshotTableActionResult(long importedDataFilesCount) {
this.importedDataFilesCount = importedDataFilesCount;
}

@Override
public long importedDataFilesCount() {
return importedDataFilesCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this pr, but now that I think about it we should probably also let the user know how many metadata files were created as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Now that we have the result interface, we can evolve it. Could you create an issue for this, @RussellSpitzer?

@flyrain, would you be interested to pick it up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Map;

@Deprecated
public interface CreateAction extends Action<CreateAction, Long> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.actions;

@Deprecated
public interface SnapshotAction extends CreateAction {
SnapshotAction withLocation(String location);
}
146 changes: 16 additions & 130 deletions spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,157 +20,43 @@
package org.apache.iceberg.actions;

import java.util.Map;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.iceberg.spark.actions.BaseMigrateTableSparkAction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Some;
import scala.collection.JavaConverters;

/**
* Takes a Spark table in the sourceCatalog and attempts to transform it into an Iceberg
* Table in the same location with the same identifier. Once complete the identifier which
* previously referred to a non-iceberg table will refer to the newly migrated iceberg
* table.
*
* @deprecated since 0.12.0, will be removed in 0.13.0; use {@link MigrateTable} instead.
*/
public class Spark3MigrateAction extends Spark3CreateAction {
private static final Logger LOG = LoggerFactory.getLogger(Spark3MigrateAction.class);
private static final String BACKUP_SUFFIX = "_BACKUP_";
@Deprecated
public class Spark3MigrateAction implements CreateAction {

private final Identifier backupIdent;
private final MigrateTable delegate;

public Spark3MigrateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableName) {
super(spark, sourceCatalog, sourceTableName, sourceCatalog, sourceTableName);
String backupName = sourceTableIdent().name() + BACKUP_SUFFIX;
this.backupIdent = Identifier.of(sourceTableIdent().namespace(), backupName);
}

private Long doExecute() {
LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());

// Move source table to a new name, halting all modifications and allowing us to stage
// the creation of a new Iceberg table in its place
renameAndBackupSourceTable();

StagedSparkTable stagedTable = null;
Table icebergTable;
boolean threw = true;
try {
LOG.info("Staging a new Iceberg table {}", destTableIdent());
stagedTable = stageDestTable();
icebergTable = stagedTable.table();

LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
ensureNameMappingPresent(icebergTable);

String stagingLocation = getMetadataLocation(icebergTable);
Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
TableIdentifier v1BackupIdentifier = new TableIdentifier(backupIdent.name(), backupNamespace);
LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
SparkTableUtil.importSparkTable(spark(), v1BackupIdentifier, icebergTable, stagingLocation);

LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
threw = false;
} finally {
if (threw) {
LOG.error("Error when attempting perform migration changes, aborting table creation and restoring backup.");

restoreSourceTable();

if (stagedTable != null) {
try {
stagedTable.abortStagedChanges();
} catch (Exception abortException) {
LOG.error("Cannot abort staged changes", abortException);
}
}
}
}

Snapshot snapshot = icebergTable.currentSnapshot();
long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
LOG.info("Successfully loaded Iceberg metadata for {} files", numMigratedFiles);
return numMigratedFiles;
this.delegate = new BaseMigrateTableSparkAction(spark, sourceCatalog, sourceTableName);
}

@Override
public Long execute() {
JobGroupInfo info = new JobGroupInfo("MIGRATE", "MIGRATE", false);
return withJobGroupInfo(info, this::doExecute);
public CreateAction withProperties(Map<String, String> properties) {
delegate.tableProperties(properties);
return this;
}

@Override
protected Map<String, String> targetTableProps() {
Map<String, String> properties = Maps.newHashMap();

// copy over relevant source table props
properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
EXCLUDED_PROPERTIES.forEach(properties::remove);

// set default and user-provided props
properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
properties.putAll(additionalProperties());

// make sure we mark this table as migrated
properties.put("migrated", "true");

// inherit the source table location
properties.putIfAbsent(LOCATION, sourceTableLocation());

return properties;
public CreateAction withProperty(String key, String value) {
delegate.tableProperty(key, value);
return this;
}

@Override
protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
// Currently the Import code relies on being able to look up the table in the session code
Preconditions.checkArgument(catalog instanceof SparkSessionCatalog,
"Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.",
catalog.name(), catalog.getClass().getName());

return (TableCatalog) catalog;
}

private void renameAndBackupSourceTable() {
try {
LOG.info("Renaming {} as {} for backup", sourceTableIdent(), backupIdent);
destCatalog().renameTable(sourceTableIdent(), backupIdent);

} catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
throw new NoSuchTableException("Cannot find source table %s", sourceTableIdent());

} catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
throw new AlreadyExistsException(
"Cannot rename %s as %s for backup. The backup table already exists.",
sourceTableIdent(), backupIdent);
}
}

private void restoreSourceTable() {
try {
LOG.info("Restoring {} from {}", sourceTableIdent(), backupIdent);
destCatalog().renameTable(backupIdent, sourceTableIdent());

} catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);

} catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
LOG.error("Cannot restore the original table, a table with the original name exists. " +
"Use the backup table {} to restore the original table manually.", backupIdent, e);
}
public Long execute() {
MigrateTable.Result result = delegate.execute();
return result.migratedDataFilesCount();
}
}
Loading