diff --git a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java index 24b76b0057dc..72352e71358a 100644 --- a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java +++ b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java @@ -84,6 +84,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase { private String newIcebergTableLocation; private String externalDataFilesTableLocation; private String typeTestTableLocation; + private Dataset typeTestDataFrame; + private Dataset nestedDataFrame; @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}") public static Object[][] parameters() { @@ -143,15 +145,12 @@ public void before() throws IOException { spark.sql(String.format("DROP TABLE IF EXISTS %s", typeTestIdentifier)); // generate the dataframe - Dataset nestedDataFrame = nestedDataFrame(); - Dataset typeTestDataFrame = typeTestDataFrame(); + nestedDataFrame = nestedDataFrame(); + typeTestDataFrame = typeTestDataFrame(); // write to delta tables writeDeltaTable(nestedDataFrame, partitionedIdentifier, partitionedLocation, "id"); writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, unpartitionedLocation, null); - writeDeltaTable( - nestedDataFrame, externalDataFilesIdentifier, externalDataFilesTableLocation, null); - writeDeltaTable(typeTestDataFrame, typeTestIdentifier, typeTestTableLocation, "stringCol"); // Delete a record from the table spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3"); @@ -273,6 +272,8 @@ public void testSnapshotWithAdditionalProperties() { @Test public void testSnapshotTableWithExternalDataFiles() { + writeDeltaTable( + nestedDataFrame, externalDataFilesIdentifier, externalDataFilesTableLocation, null); // Add parquet files to default.external_data_files_table. The newly added parquet files // are not at the same location as the table. addExternalDatafiles(externalDataFilesTableLocation, unpartitionedLocation); @@ -290,6 +291,7 @@ public void testSnapshotTableWithExternalDataFiles() { @Test public void testSnapshotSupportedTypes() { + writeDeltaTable(typeTestDataFrame, typeTestIdentifier, typeTestTableLocation, "stringCol"); String newTableIdentifier = destName(icebergCatalogName, snapshotTypeTestTableName); SnapshotDeltaLakeTable.Result result = DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java index 786a357ed658..29cfc45a9b21 100644 --- a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java @@ -61,7 +61,6 @@ import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.slf4j.Logger; @@ -90,6 +89,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable { private TableIdentifier newTableIdentifier; private String newTableLocation; private HadoopFileIO deltaLakeFileIO; + private long deltaStartVersion; /** * Snapshot a delta lake table to be an iceberg table. The action will read the delta lake table's @@ -139,6 +139,8 @@ public SnapshotDeltaLakeTable icebergCatalog(Catalog catalog) { public SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf) { this.deltaLog = DeltaLog.forTable(conf, deltaTableLocation); this.deltaLakeFileIO = new HadoopFileIO(conf); + // get the earliest version available in the delta lake table + this.deltaStartVersion = deltaLog.getVersionAtOrAfterTimestamp(0L); return this; } @@ -150,6 +152,10 @@ public Result execute() { Preconditions.checkArgument( deltaLog != null && deltaLakeFileIO != null, "Make sure to configure the action with a valid deltaLakeConfiguration"); + Preconditions.checkArgument( + deltaLog.tableExists(), + "Delta lake table does not exist at the given location: %s", + deltaTableLocation); io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update(); Schema schema = convertDeltaLakeSchema(updatedSnapshot.getMetadata().getSchema()); PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(schema); @@ -169,8 +175,8 @@ public Result execute() { .commit(); Iterator versionLogIterator = deltaLog.getChanges( - 0, // retrieve actions starting from the initial version - false); // not throw exception when data loss detected + deltaStartVersion, false // not throw exception when data loss detected + ); while (versionLogIterator.hasNext()) { VersionLog versionLog = versionLogIterator.next(); commitDeltaVersionLogToIcebergTransaction(versionLog, icebergTransaction); @@ -227,17 +233,25 @@ private PartitionSpec getPartitionSpecFromDeltaSnapshot(Schema schema) { */ private void commitDeltaVersionLogToIcebergTransaction( VersionLog versionLog, Transaction transaction) { - List actions = versionLog.getActions(); - - // Create a map of Delta Lake Action (AddFile, RemoveFile, etc.) --> List - Map> deltaLakeActionMap = - actions.stream() - .filter(action -> action instanceof AddFile || action instanceof RemoveFile) - .collect(Collectors.groupingBy(a -> a.getClass().getSimpleName())); + List dataFileActions; + if (versionLog.getVersion() == deltaStartVersion) { + // The first version log is a special case, since it contains the initial table state. + // we need to get all dataFiles from the corresponding delta snapshot to construct the table. + dataFileActions = + deltaLog.getSnapshotForVersionAsOf(deltaStartVersion).getAllFiles().stream() + .map(addFile -> (Action) addFile) + .collect(Collectors.toList()); + } else { + // Only need actions related to data change: AddFile and RemoveFile + dataFileActions = + versionLog.getActions().stream() + .filter(action -> action instanceof AddFile || action instanceof RemoveFile) + .collect(Collectors.toList()); + } List filesToAdd = Lists.newArrayList(); List filesToRemove = Lists.newArrayList(); - for (Action action : Iterables.concat(deltaLakeActionMap.values())) { + for (Action action : dataFileActions) { DataFile dataFile = buildDataFileFromAction(action, transaction.table()); if (action instanceof AddFile) { filesToAdd.add(dataFile); diff --git a/delta-lake/src/test/java/org/apache/iceberg/delta/TestBaseSnapshotDeltaLakeTableAction.java b/delta-lake/src/test/java/org/apache/iceberg/delta/TestBaseSnapshotDeltaLakeTableAction.java index 941bc4a990b3..41261d4ab975 100644 --- a/delta-lake/src/test/java/org/apache/iceberg/delta/TestBaseSnapshotDeltaLakeTableAction.java +++ b/delta-lake/src/test/java/org/apache/iceberg/delta/TestBaseSnapshotDeltaLakeTableAction.java @@ -87,6 +87,20 @@ public void testRequiredDeltaLakeConfiguration() { .hasMessage("Make sure to configure the action with a valid deltaLakeConfiguration"); } + @Test + public void testDeltaTableNotExist() { + SnapshotDeltaLakeTable testAction = + new BaseSnapshotDeltaLakeTableAction(sourceTableLocation) + .as(TableIdentifier.of("test", "test")) + .deltaLakeConfiguration(testHadoopConf) + .icebergCatalog(testCatalog) + .tableLocation(newTableLocation); + Assertions.assertThatThrownBy(testAction::execute) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Delta lake table does not exist at the given location: %s", sourceTableLocation); + } + private static class TestCatalog extends BaseMetastoreCatalog { TestCatalog() {}