diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java index e5b5766f918d..5aec66003d67 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java +++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java @@ -35,6 +35,12 @@ default MigrateTable migrateTable(String tableIdent) { this.getClass().getName() + " does not implement migrateTable"); } + /** Instantiates an action to delete orphan files. */ + default MigrateDeltaLakeTable migrateDeltaLakeTable(String tableIdent, String deltaS3Location) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement migrateDeltaLakeTable"); + } + /** Instantiates an action to delete orphan files. */ default DeleteOrphanFiles deleteOrphanFiles(Table table) { throw new UnsupportedOperationException( diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateDeltaLakeTable.java b/api/src/main/java/org/apache/iceberg/actions/MigrateDeltaLakeTable.java new file mode 100644 index 000000000000..b3bb3146ea1f --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/actions/MigrateDeltaLakeTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import java.util.Map; + +/** Migrates a Delta Lake table to Iceberg in place. */ +public interface MigrateDeltaLakeTable + extends Action { + + MigrateDeltaLakeTable tableProperties(Map properties); + + interface Result { + + /** Returns the number of imported data files. */ + long importedDataFilesCount(); + } +} diff --git a/build.gradle b/build.gradle index 662eddf61d8e..4f9176c5bac0 100644 --- a/build.gradle +++ b/build.gradle @@ -98,6 +98,11 @@ allprojects { repositories { mavenCentral() mavenLocal() + // TODO: remove once Delta Lake 2.1.0 is officially released + maven { + name = 'staging-repo' + url = 'https://oss.sonatype.org/content/repositories/iodelta-1087/' + } } } diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseMigrateDeltaLakeTableActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseMigrateDeltaLakeTableActionResult.java new file mode 100644 index 000000000000..39dd1b94f3a4 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/BaseMigrateDeltaLakeTableActionResult.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +public class BaseMigrateDeltaLakeTableActionResult implements MigrateDeltaLakeTable.Result { + + private final long importedDataFilesCount; + + public BaseMigrateDeltaLakeTableActionResult(long importedDataFilesCount) { + this.importedDataFilesCount = importedDataFilesCount; + } + + @Override + public long importedDataFilesCount() { + return importedDataFilesCount; + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java index aa1885a31e8c..c598d0501939 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java +++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java @@ -161,7 +161,7 @@ private static Metrics getAvroMetrics(Path path, Configuration conf) { } } - private static Metrics getParquetMetrics( + public static Metrics getParquetMetrics( Path path, Configuration conf, MetricsConfig metricsSpec, NameMapping mapping) { try { InputFile file = HadoopInputFile.fromPath(path, conf); diff --git a/spark/v3.3/build.gradle b/spark/v3.3/build.gradle index 9d886447775e..7883f238c872 100644 --- a/spark/v3.3/build.gradle +++ b/spark/v3.3/build.gradle @@ -63,6 +63,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { implementation("org.apache.parquet:parquet-column") implementation("org.apache.parquet:parquet-hadoop") + compileOnly ("io.delta:delta-standalone_${scalaVersion}") + implementation("org.apache.orc:orc-core::nohive") { exclude group: 'org.apache.hadoop' exclude group: 'commons-lang' @@ -77,6 +79,9 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + // Needed to write Delta Lake tables for testing + testImplementation "io.delta:delta-core_${scalaVersion}" + testImplementation("org.apache.hadoop:hadoop-minicluster") { exclude group: 'org.apache.avro', module: 'avro' // to make sure io.netty.buffer only comes from project(':iceberg-arrow') diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java index 6075aba7ac5f..dab1f58976d3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java @@ -338,7 +338,7 @@ private static PartitionSpec identitySpec(Schema schema, Collection colu return identitySpec(schema, names); } - private static PartitionSpec identitySpec(Schema schema, List partitionNames) { + public static PartitionSpec identitySpec(Schema schema, List partitionNames) { if (partitionNames == null || partitionNames.isEmpty()) { return null; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java index 17499736fbeb..c865dceb616b 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java @@ -42,7 +42,7 @@ import org.apache.spark.sql.types.TimestampType; import org.apache.spark.sql.types.VarcharType; -class SparkTypeToType extends SparkTypeVisitor { +public class SparkTypeToType extends SparkTypeVisitor { private final StructType root; private int nextId = 0; @@ -50,7 +50,7 @@ class SparkTypeToType extends SparkTypeVisitor { this.root = null; } - SparkTypeToType(StructType root) { + public SparkTypeToType(StructType root) { this.root = root; // the root struct's fields use the first ids this.nextId = root.fields().length; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTypeVisitor.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTypeVisitor.java index 1ef694263fa4..d276816d1853 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTypeVisitor.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTypeVisitor.java @@ -27,8 +27,8 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.UserDefinedType; -class SparkTypeVisitor { - static T visit(DataType type, SparkTypeVisitor visitor) { +public class SparkTypeVisitor { + public static T visit(DataType type, SparkTypeVisitor visitor) { if (type instanceof StructType) { StructField[] fields = ((StructType) type).fields(); List fieldResults = Lists.newArrayListWithExpectedSize(fields.length); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java new file mode 100644 index 000000000000..8af26e012307 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import io.delta.standalone.DeltaLog; +import io.delta.standalone.VersionLog; +import io.delta.standalone.actions.Action; +import io.delta.standalone.actions.AddFile; +import io.delta.standalone.actions.RemoveFile; +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult; +import org.apache.iceberg.actions.MigrateDeltaLakeTable; +import org.apache.iceberg.data.TableMigrationUtil; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.StagingTableCatalog; +import org.apache.spark.sql.connector.expressions.LogicalExpressions; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + +/** + * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location + * with the same identifier. Once complete the identifier which previously referred to a non-Iceberg + * table will refer to the newly migrated Iceberg table. + */ +public class MigrateDeltaLakeTableSparkAction implements MigrateDeltaLakeTable { + + private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class); + + private final Map additionalProperties = Maps.newHashMap(); + private final SparkSession spark; + private final DeltaLog deltaLog; + private final StagingTableCatalog destCatalog; + private final String deltaTableLocation; + private final Identifier newIdentifier; + + MigrateDeltaLakeTableSparkAction( + SparkSession spark, + CatalogPlugin destCatalog, + String deltaTableLocation, + Identifier newIdentifier) { + this.spark = spark; + this.destCatalog = checkDestinationCatalog(destCatalog); + this.newIdentifier = newIdentifier; + this.deltaTableLocation = deltaTableLocation; + this.deltaLog = + DeltaLog.forTable(spark.sessionState().newHadoopConf(), this.deltaTableLocation); + } + + @Override + public Result execute() { + // Get a DeltaLog instance and retrieve the partitions (if applicable) of the table + io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update(); + + StructType structType = getStructTypeFromDeltaSnapshot(); + + StagedSparkTable stagedTable = + stageDestTable( + updatedSnapshot, + deltaTableLocation, + destCatalog, + newIdentifier, + structType, + additionalProperties); + PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(structType); + + Table icebergTable = stagedTable.table(); + copyFromDeltaLakeToIceberg(icebergTable, partitionSpec); + + stagedTable.commitStagedChanges(); + Snapshot snapshot = icebergTable.currentSnapshot(); + long totalDataFiles = + Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + LOG.info( + "Successfully loaded Iceberg metadata for {} files to {}", + totalDataFiles, + deltaTableLocation); + return new BaseMigrateDeltaLakeTableActionResult(totalDataFiles); + } + + private void copyFromDeltaLakeToIceberg(Table table, PartitionSpec spec) { + // Get all changes starting from version 0 + Iterator it = deltaLog.getChanges(0, false); + + while (it.hasNext()) { + VersionLog versionLog = it.next(); + List actions = versionLog.getActions(); + + // We first need to iterate through to see what kind of transaction this was. There are 3 + // cases: + // 1. AppendFile - when there are only AddFile instances (an INSERT on the table) + // 2. DeleteFiles - when there are only RemoveFile instances (a DELETE where all the records + // of file(s) were removed + // 3. OverwriteFiles - when there are a mix of AddFile and RemoveFile (a DELETE/UPDATE) + + // Create a map of Delta Lake Action (AddFile, RemoveFile, etc.) --> List + Map> deltaLakeActionsMap = + actions.stream() + .filter(action -> action instanceof AddFile || action instanceof RemoveFile) + .collect(Collectors.groupingBy(a -> a.getClass().getSimpleName())); + // Scan the map so that we know what type of transaction this will be in Iceberg + IcebergTransactionType icebergTransactionType = + getIcebergTransactionTypeFromDeltaActions(deltaLakeActionsMap); + if (icebergTransactionType == null) { + return; + } + + List filesToAdd = Lists.newArrayList(); + List filesToRemove = Lists.newArrayList(); + for (Action action : Iterables.concat(deltaLakeActionsMap.values())) { + DataFile dataFile = buildDataFileForAction(action, table, spec); + if (action instanceof AddFile) { + filesToAdd.add(dataFile); + } else { + // We would have thrown an exception above if it wasn't a RemoveFile + filesToRemove.add(dataFile); + } + } + + switch (icebergTransactionType) { + case APPEND_FILES: + AppendFiles appendFiles = table.newAppend(); + filesToAdd.forEach(appendFiles::appendFile); + appendFiles.commit(); + break; + case DELETE_FILES: + DeleteFiles deleteFiles = table.newDelete(); + filesToRemove.forEach(deleteFiles::deleteFile); + deleteFiles.commit(); + break; + case OVERWRITE_FILES: + OverwriteFiles overwriteFiles = table.newOverwrite(); + filesToAdd.forEach(overwriteFiles::addFile); + filesToRemove.forEach(overwriteFiles::deleteFile); + overwriteFiles.commit(); + break; + } + } + } + + private DataFile buildDataFileForAction(Action action, Table table, PartitionSpec spec) { + String path; + long size; + Map partitionValues; + + if (action instanceof AddFile) { + AddFile addFile = (AddFile) action; + path = addFile.getPath(); + size = addFile.getSize(); + partitionValues = addFile.getPartitionValues(); + } else if (action instanceof RemoveFile) { + RemoveFile removeFile = (RemoveFile) action; + path = removeFile.getPath(); + size = + removeFile + .getSize() + .orElseThrow( + () -> + new RuntimeException( + String.format("File %s removed with specifying a size", path))); + partitionValues = + Optional.ofNullable(removeFile.getPartitionValues()) + .orElseThrow( + () -> + new RuntimeException( + String.format( + "File %s removed without specifying partition values", path))); + } else { + throw new IllegalStateException( + String.format( + "Unexpected action type for Delta Lake: %s", action.getClass().getSimpleName())); + } + + String fullFilePath = deltaLog.getPath().toString() + File.separator + path; + Metrics metrics = getMetricsForFile(table, fullFilePath); + + String partition = + spec.fields().stream() + .map(PartitionField::name) + .map(name -> String.format("%s=%s", name, partitionValues.get(name))) + .collect(Collectors.joining("/")); + + return DataFiles.builder(spec) + .withPath(fullFilePath) + .withFormat(FileFormat.PARQUET) + .withFileSizeInBytes(size) + .withMetrics(metrics) + .withPartitionPath(partition) + .withRecordCount(metrics.recordCount()) + .build(); + } + + private Metrics getMetricsForFile(Table table, String fullFilePath) { + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + NameMapping nameMapping = + nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + return TableMigrationUtil.getParquetMetrics( + new Path(fullFilePath), spark.sessionState().newHadoopConf(), metricsConfig, nameMapping); + } + + @Nullable + private IcebergTransactionType getIcebergTransactionTypeFromDeltaActions( + Map> actionsMap) { + IcebergTransactionType icebergTransactionType; + if (actionsMap.containsKey(AddFile.class.getSimpleName()) + && !actionsMap.containsKey(RemoveFile.class.getSimpleName())) { + icebergTransactionType = IcebergTransactionType.APPEND_FILES; + } else if (actionsMap.containsKey(RemoveFile.class.getSimpleName()) + && !actionsMap.containsKey(AddFile.class.getSimpleName())) { + icebergTransactionType = IcebergTransactionType.DELETE_FILES; + } else if (actionsMap.containsKey(AddFile.class.getSimpleName()) + && actionsMap.containsKey(RemoveFile.class.getSimpleName())) { + icebergTransactionType = IcebergTransactionType.OVERWRITE_FILES; + } else { + // Some other type of transaction, we can ignore + return null; + } + return icebergTransactionType; + } + + private PartitionSpec getPartitionSpecFromDeltaSnapshot(StructType structType) { + Schema schema = SparkSchemaUtil.convert(structType); + PartitionSpec spec = + SparkSchemaUtil.identitySpec( + schema, deltaLog.snapshot().getMetadata().getPartitionColumns()); + return spec == null ? PartitionSpec.unpartitioned() : spec; + } + + private StructType getStructTypeFromDeltaSnapshot() { + io.delta.standalone.types.StructField[] fields = + Optional.ofNullable(deltaLog.snapshot().getMetadata().getSchema()) + .map(io.delta.standalone.types.StructType::getFields) + .orElseThrow(() -> new RuntimeException("Cannot determine table schema!")); + + // Convert from Delta StructFields to Spark StructFields + return new StructType( + Arrays.stream(fields) + .map( + s -> + new StructField( + s.getName(), + DataType.fromJson(s.getDataType().toJson()), + s.isNullable(), + Metadata.fromJson(s.getMetadata().toString()))) + .toArray(StructField[]::new)); + } + + @Override + public MigrateDeltaLakeTable tableProperties(Map properties) { + additionalProperties.putAll(properties); + return this; + } + + private static StagedSparkTable stageDestTable( + io.delta.standalone.Snapshot deltaSnapshot, + String tableLocation, + StagingTableCatalog destinationCatalog, + Identifier destIdentifier, + StructType structType, + Map additionalProperties) { + try { + Map props = + destTableProperties(deltaSnapshot, tableLocation, additionalProperties); + io.delta.standalone.types.StructType schema = deltaSnapshot.getMetadata().getSchema(); + if (schema == null) { + throw new IllegalStateException("Could not find schema in existing Delta Lake table."); + } + + Transform[] partitioning = getPartitioning(deltaSnapshot); + + return (StagedSparkTable) + destinationCatalog.stageCreate(destIdentifier, structType, partitioning, props); + } catch (org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException e) { + throw new NoSuchNamespaceException( + "Cannot create table %s as the namespace does not exist", destIdentifier); + } catch (TableAlreadyExistsException e) { + throw new AlreadyExistsException( + "Cannot create table %s as it already exists", destIdentifier); + } + } + + private static Transform[] getPartitioning(io.delta.standalone.Snapshot deltaSnapshot) { + return deltaSnapshot.getMetadata().getPartitionColumns().stream() + .map( + name -> + LogicalExpressions.identity( + LogicalExpressions.reference( + JavaConverters.asScalaBuffer(Collections.singletonList(name))))) + .toArray(Transform[]::new); + } + + private static Map destTableProperties( + io.delta.standalone.Snapshot deltaSnapshot, + String tableLocation, + Map additionalProperties) { + Map properties = Maps.newHashMap(); + + properties.putAll(deltaSnapshot.getMetadata().getConfiguration()); + properties.putAll( + ImmutableMap.of( + "provider", + "iceberg", + "migrated", + "true", + "table_type", + "iceberg", + "location", + tableLocation)); + properties.putAll(additionalProperties); + + return properties; + } + + private StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog) { + + return (StagingTableCatalog) catalog; + } + + private enum IcebergTransactionType { + APPEND_FILES, + DELETE_FILES, + OVERWRITE_FILES + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java index 8c886adf510e..619f8eacfcca 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java @@ -20,6 +20,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.actions.ActionsProvider; +import org.apache.iceberg.actions.MigrateDeltaLakeTable; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier; import org.apache.spark.sql.SparkSession; @@ -67,6 +68,17 @@ public MigrateTableSparkAction migrateTable(String tableIdent) { spark, catalogAndIdent.catalog(), catalogAndIdent.identifier()); } + @Override + public MigrateDeltaLakeTable migrateDeltaLakeTable( + String newTableIdentifier, String deltaTableLocation) { + String ctx = "migrate delta target"; + CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog(); + CatalogAndIdentifier catalogAndIdent = + Spark3Util.catalogAndIdentifier(ctx, spark, newTableIdentifier, defaultCatalog); + return new MigrateDeltaLakeTableSparkAction( + spark, catalogAndIdent.catalog(), deltaTableLocation, catalogAndIdent.identifier()); + } + @Override public RewriteDataFilesSparkAction rewriteDataFiles(Table table) { return new RewriteDataFilesSparkAction(spark, table); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java index 27fcb50817b1..9c3de50f67ae 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java @@ -71,6 +71,8 @@ public static void startMetastoreAndSpark() { .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + // Needed for Delta Lake tests + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .enableHiveSupport() .getOrCreate(); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateDeltaLakeTable.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateDeltaLakeTable.java new file mode 100644 index 000000000000..d6020bc0ac51 --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateDeltaLakeTable.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; +import org.apache.iceberg.actions.MigrateDeltaLakeTable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.connector.catalog.CatalogExtension; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.delta.catalog.DeltaCatalog; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runners.Parameterized; + +public class TestMigrateDeltaLakeTable extends SparkCatalogTestBase { + private static final String NAMESPACE = "default"; + + private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + private String partitionedIdentifier; + private String unpartitionedIdentifier; + + @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] { + "delta", + DeltaCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "parquet-enabled", "true", + "cache-enabled", + "false" // Spark will delete tables using v1, leaving the cache out of sync + ) + } + }; + } + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + @Rule public TemporaryFolder other = new TemporaryFolder(); + + private final String partitionedTableName = "partitioned_table"; + private final String unpartitionedTableName = "unpartitioned_table"; + + private final String defaultSparkCatalog = "spark_catalog"; + private String partitionedLocation; + private String unpartitionedLocation; + private final String type; + private TableCatalog catalog; + + private String catalogName; + + public TestMigrateDeltaLakeTable( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + spark + .conf() + .set("spark.sql.catalog." + defaultSparkCatalog, SparkSessionCatalog.class.getName()); + this.catalog = (TableCatalog) spark.sessionState().catalogManager().catalog(catalogName); + this.type = config.get("type"); + this.catalogName = catalogName; + } + + @Before + public void before() { + try { + File partitionedFolder = temp.newFolder(); + File unpartitionedFolder = other.newFolder(); + partitionedLocation = partitionedFolder.toURI().toString(); + unpartitionedLocation = unpartitionedFolder.toURI().toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + partitionedIdentifier = destName(partitionedTableName); + unpartitionedIdentifier = destName(unpartitionedTableName); + + CatalogExtension delta = + (CatalogExtension) spark.sessionState().catalogManager().catalog("delta"); + // This needs to be set, otherwise Delta operations fail as the catalog is designed to override + // the default catalog (spark_catalog). + delta.setDelegateCatalog(spark.sessionState().catalogManager().currentCatalog()); + + spark.sql(String.format("DROP TABLE IF EXISTS %s", partitionedIdentifier)); + spark.sql(String.format("DROP TABLE IF EXISTS %s", unpartitionedIdentifier)); + + // Create a partitioned and unpartitioned table, doing a few inserts on each + IntStream.range(0, 3) + .forEach( + i -> { + List record = + Lists.newArrayList(new SimpleRecord(i, ALPHABET.substring(i, i + 1))); + + Dataset df = spark.createDataFrame(record, SimpleRecord.class); + + df.write() + .format("delta") + .mode(i == 0 ? SaveMode.Overwrite : SaveMode.Append) + .partitionBy("id") + .option("path", partitionedLocation) + .saveAsTable(partitionedIdentifier); + + df.write() + .format("delta") + .mode(i == 0 ? SaveMode.Overwrite : SaveMode.Append) + .option("path", unpartitionedLocation) + .saveAsTable(unpartitionedIdentifier); + }); + + // Delete a record from the table + spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=0"); + spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=0"); + + // Update a record + spark.sql("UPDATE " + partitionedIdentifier + " SET id=3 WHERE id=1"); + spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1"); + } + + @After + public void after() throws IOException { + // Drop the hive table. + spark.sql(String.format("DROP TABLE IF EXISTS %s", destName(partitionedTableName))); + spark.sql(String.format("DROP TABLE IF EXISTS %s", destName(unpartitionedTableName))); + } + + @Test + public void testMigratePartitioned() { + // This will test the scenario that the user switches the configuration and sets the default + // catalog to be Iceberg + // AFTER they had made it Delta and written a delta table there + spark.sessionState().catalogManager().setCurrentCatalog(defaultSparkCatalog); + + catalogName = defaultSparkCatalog; + String newTableIdentifier = destName("iceberg_table"); + MigrateDeltaLakeTable.Result result = + SparkActions.get().migrateDeltaLakeTable(newTableIdentifier, partitionedLocation).execute(); + + // Compare the results + List oldResults = spark.sql("SELECT * FROM " + partitionedIdentifier).collectAsList(); + List newResults = spark.sql("SELECT * FROM " + newTableIdentifier).collectAsList(); + + Assert.assertEquals(oldResults.size(), newResults.size()); + Assert.assertTrue(newResults.containsAll(oldResults)); + Assert.assertTrue(oldResults.containsAll(newResults)); + } + + @Test + public void testMigrateUnpartitioned() { + // This will test the scenario that the user switches the configuration and sets the default + // catalog to be Iceberg + // AFTER they had made it Delta and written a delta table there + spark.sessionState().catalogManager().setCurrentCatalog(defaultSparkCatalog); + + catalogName = defaultSparkCatalog; + String newTableIdentifier = destName("iceberg_table_unpartitioned"); + MigrateDeltaLakeTable.Result result = + SparkActions.get() + .migrateDeltaLakeTable(newTableIdentifier, unpartitionedLocation) + .execute(); + + // Compare the results + List oldResults = spark.sql("SELECT * FROM " + unpartitionedIdentifier).collectAsList(); + List newResults = spark.sql("SELECT * FROM " + newTableIdentifier).collectAsList(); + + Assert.assertEquals(oldResults.size(), newResults.size()); + Assert.assertTrue(newResults.containsAll(oldResults)); + Assert.assertTrue(oldResults.containsAll(newResults)); + } + + private String destName(String dest) { + if (catalogName.equals("spark_catalog")) { + return NAMESPACE + "." + catalogName + "_" + type + "_" + dest; + } else { + return catalogName + "." + NAMESPACE + "." + catalogName + "_" + type + "_" + dest; + } + } +} diff --git a/versions.props b/versions.props index 3b9bb937eac4..b991fd8d1ae9 100644 --- a/versions.props +++ b/versions.props @@ -30,6 +30,7 @@ com.google.cloud:libraries-bom = 24.1.0 org.scala-lang.modules:scala-collection-compat_2.12 = 2.6.0 org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0 com.emc.ecs:object-client-bundle = 3.3.2 +io.delta:delta-standalone_2.12 = 0.5.0 # test deps org.junit.vintage:junit-vintage-engine = 5.8.2 @@ -44,3 +45,5 @@ org.springframework:* = 5.3.9 org.springframework.boot:* = 2.5.4 org.mock-server:mockserver-netty = 5.13.2 org.mock-server:mockserver-client-java = 5.13.2 +# TODO: update when Delta Lake 2.1.0 is officially out +io.delta:delta-core_2.12 = 2.1.0rc1