Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ project(':iceberg-delta-lake') {
implementation project(':iceberg-core')
implementation project(':iceberg-parquet')
implementation "com.fasterxml.jackson.core:jackson-databind"
annotationProcessor "org.immutables:value"
compileOnly "org.immutables:value"

compileOnly "io.delta:delta-standalone_${scalaVersion}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Operation;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.VersionLog;
import io.delta.standalone.actions.Action;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.RemoveFile;
import io.delta.standalone.exceptions.DeltaConcurrentModificationException;
Expand All @@ -34,6 +36,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -43,7 +46,6 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -176,7 +178,8 @@ public void testBasicSnapshotPartitioned() throws IOException {
spark, newTableIdentifier, partitionedLocation)
.execute();

checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
checkSnapshotIntegrity(
partitionedLocation, partitionedIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, partitionedLocation);
}
Expand All @@ -197,7 +200,7 @@ public void testBasicSnapshotUnpartitioned() throws IOException {
.execute();

checkSnapshotIntegrity(
unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
}
Expand All @@ -219,7 +222,8 @@ public void testSnapshotWithNewLocation() throws IOException {
.tableLocation(newIcebergTableLocation)
.execute();

checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
checkSnapshotIntegrity(
partitionedLocation, partitionedIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation);
}
Expand Down Expand Up @@ -251,7 +255,7 @@ public void testSnapshotWithAdditionalProperties() throws IOException {
.execute();

checkSnapshotIntegrity(
unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
checkIcebergTableProperties(
Expand Down Expand Up @@ -285,7 +289,7 @@ public void testSnapshotTableWithExternalDataFiles() throws IOException {
spark, newTableIdentifier, externalDataFilesTableLocation)
.execute();
checkSnapshotIntegrity(
externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result);
externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(externalDataFilesTableLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, externalDataFilesTableLocation);
checkDataFilePathsIntegrity(newTableIdentifier, externalDataFilesTableLocation);
Expand All @@ -301,15 +305,12 @@ public void testSnapshotSupportedTypes() throws IOException {
SnapshotDeltaLakeTable.Result result =
DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable(
spark, newTableIdentifier, typeTestTableLocation)
.tableProperty(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false")
.execute();
checkSnapshotIntegrity(typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result);
checkSnapshotIntegrity(
typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(typeTestTableLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, typeTestTableLocation);
checkIcebergTableProperties(
newTableIdentifier,
ImmutableMap.of(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false"),
typeTestTableLocation);
checkIcebergTableProperties(newTableIdentifier, ImmutableMap.of(), typeTestTableLocation);
}

@Test
Expand Down Expand Up @@ -342,9 +343,9 @@ public void testSnapshotVacuumTable() throws IOException {
spark, newTableIdentifier, vacuumTestTableLocation)
.execute();
checkSnapshotIntegrity(
vacuumTestTableLocation, vacuumTestIdentifier, newTableIdentifier, result);
checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
vacuumTestTableLocation, vacuumTestIdentifier, newTableIdentifier, result, 13);
checkTagContentAndOrder(vacuumTestTableLocation, newTableIdentifier, 13);
checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
}

@Test
Expand Down Expand Up @@ -376,7 +377,7 @@ public void testSnapshotLogCleanTable() throws IOException {
spark, newTableIdentifier, logCleanTestTableLocation)
.execute();
checkSnapshotIntegrity(
logCleanTestTableLocation, logCleanTestIdentifier, newTableIdentifier, result);
logCleanTestTableLocation, logCleanTestIdentifier, newTableIdentifier, result, 10);
checkTagContentAndOrder(logCleanTestTableLocation, newTableIdentifier, 10);
checkIcebergTableLocation(newTableIdentifier, logCleanTestTableLocation);
}
Expand All @@ -385,7 +386,8 @@ private void checkSnapshotIntegrity(
String deltaTableLocation,
String deltaTableIdentifier,
String icebergTableIdentifier,
SnapshotDeltaLakeTable.Result snapshotReport) {
SnapshotDeltaLakeTable.Result snapshotReport,
long firstConstructableVersion) {
DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation);

List<Row> deltaTableContents =
Expand All @@ -394,8 +396,8 @@ private void checkSnapshotIntegrity(
spark.sql("SELECT * FROM " + icebergTableIdentifier).collectAsList();

Assertions.assertThat(deltaTableContents).hasSize(icebergTableContents.size());
Assertions.assertThat(deltaLog.update().getAllFiles())
.hasSize((int) snapshotReport.snapshotDataFilesCount());
Assertions.assertThat(snapshotReport.snapshotDataFilesCount())
.isEqualTo(countDataFilesInDeltaLakeTable(deltaLog, firstConstructableVersion));
Assertions.assertThat(icebergTableContents)
.containsExactlyInAnyOrderElementsOf(deltaTableContents);
}
Expand Down Expand Up @@ -555,4 +557,28 @@ private void writeDeltaTable(
df.write().format("delta").mode(SaveMode.Append).option("path", path).saveAsTable(identifier);
}
}

private long countDataFilesInDeltaLakeTable(DeltaLog deltaLog, long firstConstructableVersion) {
long dataFilesCount = 0;

List<AddFile> initialDataFiles =
deltaLog.getSnapshotForVersionAsOf(firstConstructableVersion).getAllFiles();
dataFilesCount += initialDataFiles.size();

Iterator<VersionLog> versionLogIterator =
deltaLog.getChanges(
firstConstructableVersion + 1, false // not throw exception when data loss detected
);

while (versionLogIterator.hasNext()) {
VersionLog versionLog = versionLogIterator.next();
List<Action> addFiles =
versionLog.getActions().stream()
.filter(action -> action instanceof AddFile)
.collect(Collectors.toList());
dataFilesCount += addFiles.size();
}

return dataFilesCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
Expand All @@ -63,6 +61,7 @@
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.slf4j.Logger;
Expand Down Expand Up @@ -147,7 +146,7 @@ public SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf) {
}

@Override
public Result execute() {
public SnapshotDeltaLakeTable.Result execute() {
Preconditions.checkArgument(
icebergCatalog != null && newTableIdentifier != null,
"Iceberg catalog and identifier cannot be null. Make sure to configure the action with a valid Iceberg catalog and identifier.");
Expand All @@ -158,6 +157,7 @@ public Result execute() {
deltaLog.tableExists(),
"Delta Lake table does not exist at the given location: %s",
deltaTableLocation);
ImmutableSet.Builder<String> migratedDataFilesBuilder = ImmutableSet.builder();
io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
Schema schema = convertDeltaLakeSchema(updatedSnapshot.getMetadata().getSchema());
PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(schema, updatedSnapshot);
Expand All @@ -177,29 +177,27 @@ public Result execute() {
.commit();
long constructableStartVersion =
commitInitialDeltaSnapshotToIcebergTransaction(
updatedSnapshot.getVersion(), icebergTransaction);
updatedSnapshot.getVersion(), icebergTransaction, migratedDataFilesBuilder);
Iterator<VersionLog> versionLogIterator =
deltaLog.getChanges(
constructableStartVersion + 1, false // not throw exception when data loss detected
);
while (versionLogIterator.hasNext()) {
VersionLog versionLog = versionLogIterator.next();
commitDeltaVersionLogToIcebergTransaction(versionLog, icebergTransaction);
commitDeltaVersionLogToIcebergTransaction(
versionLog, icebergTransaction, migratedDataFilesBuilder);
}

Snapshot icebergSnapshot = icebergTransaction.table().currentSnapshot();
long totalDataFiles =
icebergSnapshot != null
? Long.parseLong(icebergSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP))
: 0;

icebergTransaction.commitTransaction();

long totalDataFiles = migratedDataFilesBuilder.build().size();
LOG.info(
"Successfully created Iceberg table {} from Delta Lake table at {}, total data file count: {}",
newTableIdentifier,
deltaTableLocation,
totalDataFiles);
return new BaseSnapshotDeltaLakeTableActionResult(totalDataFiles);
return ImmutableSnapshotDeltaLakeTable.Result.builder()
.snapshotDataFilesCount(totalDataFiles)
.build();
}

private Schema convertDeltaLakeSchema(io.delta.standalone.types.StructType deltaSchema) {
Expand Down Expand Up @@ -241,7 +239,9 @@ private PartitionSpec getPartitionSpecFromDeltaSnapshot(
* @return the initial version of the delta lake table that is successfully committed to iceberg
*/
private long commitInitialDeltaSnapshotToIcebergTransaction(
long latestVersion, Transaction transaction) {
long latestVersion,
Transaction transaction,
ImmutableSet.Builder<String> migratedDataFilesBuilder) {
long constructableStartVersion = deltaStartVersion;
while (constructableStartVersion <= latestVersion) {
try {
Expand All @@ -251,6 +251,7 @@ private long commitInitialDeltaSnapshotToIcebergTransaction(
for (AddFile addFile : initDataFiles) {
DataFile dataFile = buildDataFileFromAction(addFile, transaction.table());
filesToAdd.add(dataFile);
migratedDataFilesBuilder.add(dataFile.path().toString());
}

// AppendFiles case
Expand Down Expand Up @@ -286,7 +287,9 @@ private long commitInitialDeltaSnapshotToIcebergTransaction(
* @param transaction the iceberg table transaction to commit to
*/
private void commitDeltaVersionLogToIcebergTransaction(
VersionLog versionLog, Transaction transaction) {
VersionLog versionLog,
Transaction transaction,
ImmutableSet.Builder<String> migratedDataFilesBuilder) {
// Only need actions related to data change: AddFile and RemoveFile
List<Action> dataFileActions =
versionLog.getActions().stream()
Expand All @@ -305,6 +308,7 @@ private void commitDeltaVersionLogToIcebergTransaction(
throw new ValidationException(
"The action %s's is unsupported", action.getClass().getSimpleName());
}
migratedDataFilesBuilder.add(dataFile.path().toString());
}

if (filesToAdd.size() > 0 && filesToRemove.size() > 0) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.apache.iceberg.actions.Action;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.immutables.value.Value;

/** Snapshot an existing Delta Lake table to Iceberg in place. */
@Value.Enclosing
public interface SnapshotDeltaLakeTable
extends Action<SnapshotDeltaLakeTable, SnapshotDeltaLakeTable.Result> {

Expand Down Expand Up @@ -81,6 +83,7 @@ public interface SnapshotDeltaLakeTable
SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf);

/** The action result that contains a summary of the execution. */
@Value.Immutable
interface Result {

/** Returns the number of migrated data files. */
Expand Down