-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Delta: Support Snapshot Delta Lake Table to Iceberg Table #6449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
57 commits
Select commit
Hold shift + click to select a range
73e38e5
Cannot pass tests for unknown reason
JonasJ-ap 5544f45
fix test config issue and formatting
JonasJ-ap b8b6119
transfer icebergCatalog to parent class
JonasJ-ap 274560c
fix formatting
JonasJ-ap 39e3541
implement direct schema transformation and stop using spark context t…
JonasJ-ap 92f962c
refactor and simplify the implementation
JonasJ-ap 033c997
no need to make spark utils public
JonasJ-ap 681a32f
simplify the implementation in Spark Action
JonasJ-ap 77bdb27
add support for scala 2.13
JonasJ-ap 3dd540a
add format check for data files
JonasJ-ap 27ece93
fix naming issue
JonasJ-ap a9faabf
make delta type visitor abstract
JonasJ-ap 3982711
fix typo and nit problems
JonasJ-ap 9a7c443
migrate from iceberg-core to delta-lake
JonasJ-ap 173534e
move get Metrics for File to iceberg-delta-lake
JonasJ-ap bdd1ccf
fix comment
JonasJ-ap 85abac2
fix wrong import
JonasJ-ap 32e1af8
Migrate delta to iceberg round 1 (#29)
JonasJ-ap ac1141d
Migrate delta to iceberg util refactor (#30)
JonasJ-ap 8e9b3fc
Migrate delta to iceberg refactor 1.5 (#31)
JonasJ-ap 12b60ca
Merge branch 'master' into migrate_delta_to_iceberg
JonasJ-ap 8a8adef
use transaction, refactor structure, add optional newTableLocation, a…
JonasJ-ap 6fbf740
fix the potential path error due to ambiguous return value of AddFile…
JonasJ-ap 69671b9
refactor getFullPath with unit tests, use newCreateTableTransaction, …
JonasJ-ap e3138a6
allow user to specify a custom location for migrated table, fix load …
JonasJ-ap 2e8dfd0
Fix nit problems and optimize some implementation (#38)
JonasJ-ap f4589e8
optimize the constructor to be more clean
JonasJ-ap 59c96cb
move everthing to iceberg-delta-lake, build demo integration test (#35)
JonasJ-ap afd783b
optimize api structure, refactor the integration test, add more tests…
JonasJ-ap 5b95925
refactor the interfaces, add new tests to integration tests, add new …
JonasJ-ap f43c325
fix error messages and add default implementation for actionProvider …
JonasJ-ap b2a8bfe
refactor the default implementation and javadoc (#43)
JonasJ-ap 450a08c
fix error when migrating table with nested fields, add CI, upgrade te…
JonasJ-ap 300d39b
remove unused helper class in test
JonasJ-ap a285c4a
add null check for stopMetastoreAndSpark, remove unnecessary try-catch
JonasJ-ap 5760a83
use assertions.assertThatThrownBy to test precondition checks
JonasJ-ap e41c787
use assertThat to fix assert.True in TestDeltaLakeTypeToType
JonasJ-ap 7a16809
use AssertionJ in TestSnapshotDeltaLakeTable
JonasJ-ap 7072612
fix format and nit issue
JonasJ-ap c2293c9
remove unnecessary fields and class and let integrationTest collected…
JonasJ-ap f38d7b1
remove unnecessary try catch
JonasJ-ap 10163f8
fix wrong modifier of a private method
JonasJ-ap 99dbba8
Merge remote-tracking branch 'origin/master' into migrate_delta_to_ic…
JonasJ-ap a7c3de1
simplify the test base (#46)
JonasJ-ap 6c4ab2c
save getLength Call for AddFile and when RemoveFile contains size (#47)
JonasJ-ap dadd76a
add null check for table.currentSnapshot() when querying the total nu…
JonasJ-ap 1cd36b9
Refactor iceberg-delta's integration test(#48)
JonasJ-ap 4463f30
Adapt for delta.logRetentionDuration (#49)
JonasJ-ap d3ccc86
fix comment and format issue
JonasJ-ap 1affcb3
remove support for avro, orc since it can allow use to get rid of a d…
JonasJ-ap 098a3a2
using resolvingFileIO instead
JonasJ-ap f0d1536
Merge remote-tracking branch 'origin/master' into migrate_delta_to_ic…
JonasJ-ap a98461a
rollback to hadoopFileIO
JonasJ-ap fe6da17
add test for array of structs
JonasJ-ap 8e9a3e2
use Do not Support instead of cannot determine, remove support for av…
JonasJ-ap 24405e0
nit fix
JonasJ-ap c5a6186
error message nit fix
JonasJ-ap File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| # | ||
|
|
||
| name: "Delta Conversion CI" | ||
| on: | ||
| push: | ||
| branches: | ||
| - 'master' | ||
| - '0.**' | ||
| tags: | ||
| - 'apache-iceberg-**' | ||
| pull_request: | ||
| paths-ignore: | ||
| - '.github/ISSUE_TEMPLATE/iceberg_bug_report.yml' | ||
| - '.github/workflows/python-ci.yml' | ||
| - '.github/workflows/flink-ci.yml' | ||
| - '.github/workflows/hive-ci.yml' | ||
| - '.gitignore' | ||
| - '.asf.yml' | ||
| - 'dev/**' | ||
| - 'mr/**' | ||
| - 'hive3/**' | ||
| - 'hive3-orc-bundle/**' | ||
| - 'hive-runtime/**' | ||
| - 'flink/**' | ||
| - 'pig/**' | ||
| - 'python/**' | ||
| - 'python_legacy/**' | ||
| - 'docs/**' | ||
| - 'open-api/**' | ||
| - 'format/**' | ||
| - '.gitattributes' | ||
| - 'README.md' | ||
| - 'CONTRIBUTING.md' | ||
| - 'LICENSE' | ||
| - 'NOTICE' | ||
|
|
||
| concurrency: | ||
| group: ${{ github.workflow }}-${{ github.ref }} | ||
| cancel-in-progress: ${{ github.event_name == 'pull_request' }} | ||
|
|
||
| jobs: | ||
| delta-conversion-scala-2-12-tests: | ||
| runs-on: ubuntu-20.04 | ||
| strategy: | ||
| matrix: | ||
| jvm: [8, 11] | ||
| env: | ||
| SPARK_LOCAL_IP: localhost | ||
| steps: | ||
| - uses: actions/checkout@v3 | ||
| - uses: actions/setup-java@v3 | ||
| with: | ||
| distribution: zulu | ||
| java-version: ${{ matrix.jvm }} | ||
| - uses: actions/cache@v3 | ||
| with: | ||
| path: | | ||
| ~/.gradle/caches | ||
| ~/.gradle/wrapper | ||
| key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} | ||
| restore-keys: ${{ runner.os }}-gradle- | ||
| - run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts | ||
| - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc | ||
| - uses: actions/upload-artifact@v3 | ||
| if: failure() | ||
| with: | ||
| name: test logs | ||
| path: | | ||
| **/build/testlogs | ||
|
|
||
| delta-conversion-scala-2-13-tests: | ||
| runs-on: ubuntu-20.04 | ||
| strategy: | ||
| matrix: | ||
| jvm: [ 8, 11 ] | ||
| env: | ||
| SPARK_LOCAL_IP: localhost | ||
| steps: | ||
| - uses: actions/checkout@v3 | ||
| - uses: actions/setup-java@v3 | ||
| with: | ||
| distribution: zulu | ||
| java-version: ${{ matrix.jvm }} | ||
| - uses: actions/cache@v3 | ||
| with: | ||
| path: | | ||
| ~/.gradle/caches | ||
| ~/.gradle/wrapper | ||
| key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} | ||
| restore-keys: ${{ runner.os }}-gradle- | ||
| - run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts | ||
| - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc | ||
| - uses: actions/upload-artifact@v3 | ||
| if: failure() | ||
| with: | ||
| name: test logs | ||
| path: | | ||
| **/build/testlogs |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
...ntegration/java/org/apache/iceberg/delta/DeltaLakeToIcebergMigrationSparkIntegration.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.delta; | ||
|
|
||
| import org.apache.iceberg.catalog.TableIdentifier; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.spark.Spark3Util; | ||
| import org.apache.spark.sql.SparkSession; | ||
| import org.apache.spark.sql.connector.catalog.CatalogPlugin; | ||
|
|
||
| /** An example class shows how to use the delta lake migration actions in SparkContext. */ | ||
| class DeltaLakeToIcebergMigrationSparkIntegration { | ||
|
|
||
| private DeltaLakeToIcebergMigrationSparkIntegration() {} | ||
|
|
||
| /** | ||
| * Example of how to use a {@link SparkSession}, a table identifier and a delta table location to | ||
| * construct an action for snapshotting the delta table to an iceberg table. | ||
| * | ||
| * @param spark a SparkSession with iceberg catalog configured. | ||
| * @param newTableIdentifier can be both 2 parts and 3 parts identifier, if it is 2 parts, the | ||
| * default spark catalog will be used | ||
| * @param deltaTableLocation the location of the delta table | ||
| * @return an instance of snapshot delta lake table action. | ||
| */ | ||
| static SnapshotDeltaLakeTable snapshotDeltaLakeTable( | ||
jackye1995 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| SparkSession spark, String newTableIdentifier, String deltaTableLocation) { | ||
JonasJ-ap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Preconditions.checkArgument( | ||
| spark != null, "The SparkSession cannot be null, please provide a valid SparkSession"); | ||
| Preconditions.checkArgument( | ||
| newTableIdentifier != null, | ||
| "The table identifier cannot be null, please provide a valid table identifier for the new iceberg table"); | ||
| Preconditions.checkArgument( | ||
| deltaTableLocation != null, | ||
| "The delta lake table location cannot be null, please provide a valid location of the delta lake table to be snapshot"); | ||
|
|
||
| String ctx = "delta lake snapshot target"; | ||
| CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog(); | ||
| Spark3Util.CatalogAndIdentifier catalogAndIdent = | ||
| Spark3Util.catalogAndIdentifier(ctx, spark, newTableIdentifier, defaultCatalog); | ||
| return DeltaLakeToIcebergMigrationActionsProvider.defaultActions() | ||
| .snapshotDeltaLakeTable(deltaTableLocation) | ||
| .as(TableIdentifier.parse(catalogAndIdent.identifier().toString())) | ||
| .deltaLakeConfiguration(spark.sessionState().newHadoopConf()) | ||
| .icebergCatalog(Spark3Util.loadIcebergCatalog(spark, catalogAndIdent.catalog().name())); | ||
| } | ||
| } | ||
73 changes: 73 additions & 0 deletions
73
delta-lake/src/integration/java/org/apache/iceberg/delta/SparkDeltaLakeSnapshotTestBase.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.delta; | ||
|
|
||
| import java.util.Map; | ||
| import org.apache.hadoop.hive.conf.HiveConf; | ||
| import org.apache.iceberg.hive.TestHiveMetastore; | ||
| import org.apache.spark.sql.SparkSession; | ||
| import org.apache.spark.sql.internal.SQLConf; | ||
| import org.junit.AfterClass; | ||
| import org.junit.BeforeClass; | ||
|
|
||
| @SuppressWarnings("VisibilityModifier") | ||
| public abstract class SparkDeltaLakeSnapshotTestBase { | ||
| protected static TestHiveMetastore metastore = null; | ||
| protected static HiveConf hiveConf = null; | ||
| protected static SparkSession spark = null; | ||
|
|
||
| @BeforeClass | ||
| public static void startMetastoreAndSpark() { | ||
| SparkDeltaLakeSnapshotTestBase.metastore = new TestHiveMetastore(); | ||
| metastore.start(); | ||
| SparkDeltaLakeSnapshotTestBase.hiveConf = metastore.hiveConf(); | ||
|
|
||
| SparkDeltaLakeSnapshotTestBase.spark = | ||
| SparkSession.builder() | ||
| .master("local[2]") | ||
| .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") | ||
| .config( | ||
| "spark.hadoop." + HiveConf.ConfVars.METASTOREURIS.varname, | ||
| hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)) | ||
| .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") | ||
| .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") | ||
| .enableHiveSupport() | ||
| .getOrCreate(); | ||
| } | ||
|
|
||
| @AfterClass | ||
| public static void stopMetastoreAndSpark() throws Exception { | ||
JonasJ-ap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (metastore != null) { | ||
| metastore.stop(); | ||
| SparkDeltaLakeSnapshotTestBase.metastore = null; | ||
| } | ||
| if (spark != null) { | ||
| spark.stop(); | ||
| SparkDeltaLakeSnapshotTestBase.spark = null; | ||
| } | ||
| } | ||
|
|
||
| public SparkDeltaLakeSnapshotTestBase( | ||
| String catalogName, String implementation, Map<String, String> config) { | ||
|
|
||
| spark.conf().set("spark.sql.catalog." + catalogName, implementation); | ||
| config.forEach( | ||
| (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value)); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.