diff --git a/.github/workflows/hudi-conversion-ci.yaml b/.github/workflows/hudi-conversion-ci.yaml new file mode 100644 index 000000000000..3e2b1018acec --- /dev/null +++ b/.github/workflows/hudi-conversion-ci.yaml @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: "Hudi Conversion CI" +on: + push: + branches: + - 'master' + - '0.**' + tags: + - 'apache-iceberg-**' + pull_request: + paths-ignore: + - '.github/ISSUE_TEMPLATE/iceberg_bug_report.yml' + - '.github/workflows/python-ci.yml' + - '.github/workflows/flink-ci.yml' + - '.github/workflows/hive-ci.yml' + - '.gitignore' + - '.asf.yml' + - 'dev/**' + - 'mr/**' + - 'hive3/**' + - 'hive3-orc-bundle/**' + - 'hive-runtime/**' + - 'flink/**' + - 'pig/**' + - 'python/**' + - 'python_legacy/**' + - 'docs/**' + - 'open-api/**' + - 'format/**' + - '.gitattributes' + - 'README.md' + - 'CONTRIBUTING.md' + - 'LICENSE' + - 'NOTICE' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + +jobs: + hudi-conversion-scala-2-12-tests: + runs-on: ubuntu-20.04 + strategy: + matrix: + jvm: [8, 11] + env: + SPARK_LOCAL_IP: localhost + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + distribution: zulu + java-version: ${{ matrix.jvm }} + - uses: actions/cache@v3 + with: + path: | + ~/.gradle/caches + ~/.gradle/wrapper + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} + restore-keys: ${{ runner.os }}-gradle- + - run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts + - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-hudi:check -Pquick=true -x javadoc + - uses: actions/upload-artifact@v3 + if: failure() + with: + name: test logs + path: | + **/build/testlogs diff --git a/build.gradle b/build.gradle index 7b14f3b73163..8de344026481 100644 --- a/build.gradle +++ b/build.gradle @@ -48,6 +48,10 @@ plugins { id 'nebula.dependency-recommender' version '11.0.0' } +String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") +String sparkVersionsString = System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions") +List sparkVersions = sparkVersionsString != null && !sparkVersionsString.isEmpty() ? sparkVersionsString.split(",") : [] + try { // apply these plugins in a try-catch block so that we can handle cases without .git directory apply plugin: 'com.palantir.git-version' @@ -438,6 +442,81 @@ project(':iceberg-aws') { } } +project(':iceberg-hudi') { + + configurations { + integrationImplementation.extendsFrom testImplementation + integrationRuntime.extendsFrom testRuntimeOnly + } + + dependencies { + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + api project(':iceberg-api') + implementation project(':iceberg-common') + implementation project(':iceberg-core') + implementation project(':iceberg-parquet') + implementation project(':iceberg-orc') + implementation "com.fasterxml.jackson.core:jackson-databind" + + // Hudi uses java8, may need to assess if we can use hudi in java11. + compileOnly("org.apache.hudi:hudi-common") + // Added to resolve dependency conflicts with hudi-spark-bundle + compileOnly("org.apache.hudi:hudi-client-common") + implementation("org.apache.avro:avro") { + exclude group: 'org.tukaani' // xz compression is not supported + } + + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'javax.servlet', module: 'servlet-api' + exclude group: 'com.google.code.gson', module: 'gson' + } + if (sparkVersions.contains("3.3") && scalaVersion == "2.12") { + integrationImplementation project(':iceberg-data') + integrationImplementation("org.apache.hudi:hudi-spark3.3-bundle_2.12") { + exclude group: 'org.apache.hudi', module: 'hudi-common' + exclude group: 'org.apache.hudi', module: 'hudi-client-common' + } + integrationImplementation project(path: ":iceberg-spark:iceberg-spark-3.3_${scalaVersion}") + integrationImplementation("org.apache.hadoop:hadoop-minicluster") { + exclude group: 'org.apache.avro', module: 'avro' + // to make sure netty libs only come from project(':iceberg-arrow') + exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'io.netty', module: 'netty-common' + } + integrationImplementation project(path: ':iceberg-hive-metastore') + integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') + integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:3.3.1") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.apache.arrow' + exclude group: 'org.apache.parquet' + // to make sure netty libs only come from project(':iceberg-arrow') + exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'io.netty', module: 'netty-common' + exclude group: 'org.roaringbitmap' + } + } + } + + if (sparkVersions.contains("3.3") && scalaVersion == "2.12") { + sourceSets { + integration { + java.srcDir "$projectDir/src/integration/java" + resources.srcDir "$projectDir/src/integration/resources" + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + } + } + + task integrationTest(type: Test) { + testClassesDirs = sourceSets.integration.output.classesDirs + classpath = sourceSets.integration.runtimeClasspath + } + check.dependsOn integrationTest + } +} + project(':iceberg-gcp') { dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') diff --git a/hudi/src/integration/java/org/apache/iceberg/hudi/HudiToIcebergMigrationSparkIntegration.java b/hudi/src/integration/java/org/apache/iceberg/hudi/HudiToIcebergMigrationSparkIntegration.java new file mode 100644 index 000000000000..cf06fa9556cc --- /dev/null +++ b/hudi/src/integration/java/org/apache/iceberg/hudi/HudiToIcebergMigrationSparkIntegration.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hudi; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; + +public class HudiToIcebergMigrationSparkIntegration { + private HudiToIcebergMigrationSparkIntegration() {} + + static SnapshotHudiTable snapshotHudiTable( + SparkSession spark, String hudiTablePath, String newTableIdentifier) { + Preconditions.checkArgument( + spark != null, "The SparkSession cannot be null, please provide a valid SparkSession"); + Preconditions.checkArgument( + newTableIdentifier != null, + "The table identifier cannot be null, please provide a valid table identifier for the new iceberg table"); + Preconditions.checkArgument( + hudiTablePath != null, + "The hudi table location cannot be null, please provide a valid location of the delta lake table to be snapshot"); + String ctx = "hudi snapshot target"; + CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog(); + Spark3Util.CatalogAndIdentifier catalogAndIdentifier = + Spark3Util.catalogAndIdentifier(ctx, spark, newTableIdentifier, defaultCatalog); + return HudiToIcebergMigrationActionsProvider.defaultProvider() + .snapshotHudiTable(hudiTablePath) + .as(TableIdentifier.parse(catalogAndIdentifier.identifier().toString())) + .hoodieConfiguration(spark.sessionState().newHadoopConf()) + .icebergCatalog( + Spark3Util.loadIcebergCatalog(spark, catalogAndIdentifier.catalog().name())); + } +} diff --git a/hudi/src/integration/java/org/apache/iceberg/hudi/SparkHudiMigrationTestBase.java b/hudi/src/integration/java/org/apache/iceberg/hudi/SparkHudiMigrationTestBase.java new file mode 100644 index 000000000000..7fd5f9bd69f1 --- /dev/null +++ b/hudi/src/integration/java/org/apache/iceberg/hudi/SparkHudiMigrationTestBase.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hudi; + +import java.util.Map; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +@SuppressWarnings("VisibilityModifier") +public abstract class SparkHudiMigrationTestBase { + protected static TestHiveMetastore metastore = null; + protected static HiveConf hiveConf = null; + protected static SparkSession spark = null; + + @BeforeClass + public static void startMetastoreAndSpark() { + SparkHudiMigrationTestBase.metastore = new TestHiveMetastore(); + metastore.start(); + SparkHudiMigrationTestBase.hiveConf = metastore.hiveConf(); + + SparkHudiMigrationTestBase.spark = + SparkSession.builder() + .master("local[2]") + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config( + "spark.hadoop." + HiveConf.ConfVars.METASTOREURIS.varname, + hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)) + .config("spark.hadoop.parquet.avro.write-old-list-structure", "false") + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .enableHiveSupport() + .getOrCreate(); + } + + @AfterClass + public static void stopMetastoreAndSpark() throws Exception { + if (metastore != null) { + metastore.stop(); + SparkHudiMigrationTestBase.metastore = null; + } + if (spark != null) { + spark.stop(); + SparkHudiMigrationTestBase.spark = null; + } + } + + public SparkHudiMigrationTestBase( + String catalogName, String implementation, Map config) { + + spark.conf().set("spark.sql.catalog." + catalogName, implementation); + config.forEach( + (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value)); + } +} diff --git a/hudi/src/integration/java/org/apache/iceberg/hudi/TestSnapshotHudiTable.java b/hudi/src/integration/java/org/apache/iceberg/hudi/TestSnapshotHudiTable.java new file mode 100644 index 000000000000..02c5db45d0b9 --- /dev/null +++ b/hudi/src/integration/java/org/apache/iceberg/hudi/TestSnapshotHudiTable.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hudi; + +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.hudi.catalog.HoodieCatalog; +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class TestSnapshotHudiTable extends SparkHudiMigrationTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotHudiTable.class.getName()); + private static final String SNAPSHOT_SOURCE_PROP = "snapshot_source"; + private static final String HUDI_SOURCE_VALUE = "hudi"; + private static final String ORIGINAL_LOCATION_PROP = "original_location"; + private static final String NAMESPACE = "delta_conversion_test"; + private static final String defaultSparkCatalog = "spark_catalog"; + private static final String icebergCatalogName = "iceberg_hive"; + private String partitionedIdentifier; + private String unpartitionedIdentifier; + private String multiCommitIdentifier; + private final String partitionedTableName = "partitioned_table"; + private final String unpartitionedTableName = "unpartitioned_table"; + private final String multiCommitTableName = "multi_commit_table"; + private String partitionedLocation; + private String unpartitionedLocation; + private String newIcebergTableLocation; + private String multiCommitTableLocation; + private Dataset typeTestDataframe = typeTestDataFrame(); + private Dataset nestedDataframe = nestedDataFrame(); + + @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] { + icebergCatalogName, + SparkCatalog.class.getName(), + ImmutableMap.of( + "type", + "hive", + "default-namespace", + "default", + "parquet-enabled", + "true", + "cache-enabled", + "false" // Spark will delete tables using v1, leaving the cache out of sync + ) + } + }; + } + + @Rule public TemporaryFolder temp1 = new TemporaryFolder(); + @Rule public TemporaryFolder temp2 = new TemporaryFolder(); + @Rule public TemporaryFolder temp3 = new TemporaryFolder(); + @Rule public TemporaryFolder temp4 = new TemporaryFolder(); + + public TestSnapshotHudiTable( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + spark.conf().set("spark.sql.catalog." + defaultSparkCatalog, HoodieCatalog.class.getName()); + } + + @Before + public void before() throws IOException { + File partitionedFolder = temp1.newFolder(); + File unpartitionedFolder = temp2.newFolder(); + File newIcebergTableFolder = temp3.newFolder(); + File multiCommitTableFolder = temp4.newFolder(); + partitionedLocation = partitionedFolder.toURI().toString(); + unpartitionedLocation = unpartitionedFolder.toURI().toString(); + newIcebergTableLocation = newIcebergTableFolder.toURI().toString(); + multiCommitTableLocation = multiCommitTableFolder.toURI().toString(); + + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", NAMESPACE)); + + partitionedIdentifier = destName(defaultSparkCatalog, partitionedTableName); + unpartitionedIdentifier = destName(defaultSparkCatalog, unpartitionedTableName); + multiCommitIdentifier = destName(defaultSparkCatalog, multiCommitTableName); + + spark.sql(String.format("DROP TABLE IF EXISTS %s", partitionedIdentifier)); + spark.sql(String.format("DROP TABLE IF EXISTS %s", unpartitionedIdentifier)); + spark.sql(String.format("DROP TABLE IF EXISTS %s", multiCommitIdentifier)); + } + + @Test + public void testBasicPartitionedTable() { + writeHoodieTable( + typeTestDataframe, + "decimalCol", + "intCol", + "partitionPath", + SaveMode.Overwrite, + partitionedLocation, + partitionedIdentifier); + LOG.info("Alpha test reference: hoodie table path: {}", partitionedLocation); + String newTableIdentifier = destName(icebergCatalogName, "alpha_iceberg_table"); + SnapshotHudiTable.Result result = + HudiToIcebergMigrationSparkIntegration.snapshotHudiTable( + spark, partitionedLocation, newTableIdentifier) + .execute(); + checkSnapshotIntegrity(partitionedLocation, newTableIdentifier); + checkIcebergTableLocation(newTableIdentifier, partitionedLocation); + checkIcebergTableProperties(newTableIdentifier, ImmutableMap.of(), partitionedLocation); + } + + @Test + public void testBasicUnpartitionedTable() { + writeHoodieTable( + typeTestDataframe, + "decimalCol", + "intCol", + "", + SaveMode.Overwrite, + unpartitionedLocation, + unpartitionedIdentifier); + String newTableIdentifier = destName(icebergCatalogName, "alpha_iceberg_table_2"); + SnapshotHudiTable.Result result = + HudiToIcebergMigrationSparkIntegration.snapshotHudiTable( + spark, unpartitionedLocation, newTableIdentifier) + .execute(); + checkSnapshotIntegrity(unpartitionedLocation, newTableIdentifier); + checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation); + checkIcebergTableProperties(newTableIdentifier, ImmutableMap.of(), unpartitionedLocation); + } + + @Test + public void testMultiCommitTable() { + Dataset initialDataFrame = multiDataFrame(0, 2); + writeHoodieTable( + initialDataFrame, + "decimalCol", + "magic_number", + "partitionPath2", + SaveMode.Append, + multiCommitTableLocation, + multiCommitIdentifier); + writeHoodieTable( + initialDataFrame, + "decimalCol", + "magic_number", + "partitionPath2", + SaveMode.Append, + multiCommitTableLocation, + multiCommitIdentifier); + writeHoodieTable( + multiDataFrame(2, 5), + "decimalCol", + "magic_number", + "partitionPath2", + SaveMode.Append, + multiCommitTableLocation, + multiCommitIdentifier); + writeHoodieTable( + multiDataFrame(0, 1), + "decimalCol", + "magic_number", + "partitionPath2", + SaveMode.Append, + multiCommitTableLocation, + multiCommitIdentifier); + Dataset toDelete = multiDataFrame(4, 5); + writeHoodieTable( + toDelete, + "decimalCol", + "magic_number", + "partitionPath2", + SaveMode.Append, + multiCommitTableLocation, + multiCommitIdentifier); + writeHoodieTableOperation( + toDelete, + DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL(), + "decimalCol", + "magic_number", + "partitionPath2", + SaveMode.Append, + multiCommitTableLocation, + multiCommitIdentifier); + + String newTableIdentifier = destName(icebergCatalogName, "alpha_iceberg_table_3"); + SnapshotHudiTable.Result result = + HudiToIcebergMigrationSparkIntegration.snapshotHudiTable( + spark, multiCommitTableLocation, newTableIdentifier) + .execute(); + checkSnapshotIntegrity(multiCommitTableLocation, newTableIdentifier); + checkIcebergTableLocation(newTableIdentifier, multiCommitTableLocation); + checkIcebergTableProperties(newTableIdentifier, ImmutableMap.of(), multiCommitTableLocation); + } + + @Test + public void testSnapshotWithNewLocation() { + writeHoodieTable( + typeTestDataframe, + "decimalCol", + "intCol", + "partitionPath", + SaveMode.Overwrite, + partitionedLocation, + partitionedIdentifier); + String newTableIdentifier = destName(icebergCatalogName, "alpha_iceberg_table_4"); + SnapshotHudiTable.Result result = + HudiToIcebergMigrationSparkIntegration.snapshotHudiTable( + spark, partitionedLocation, newTableIdentifier) + .tableLocation(newIcebergTableLocation) + .execute(); + checkSnapshotIntegrity(partitionedLocation, newTableIdentifier); + checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation); + } + + @Test + public void testSnapshotWithAdditionalProperties() { + writeHoodieTable( + typeTestDataframe, + "decimalCol", + "intCol", + "partitionPath", + SaveMode.Overwrite, + partitionedLocation, + partitionedIdentifier); + String newTableIdentifier = destName(icebergCatalogName, "alpha_iceberg_table_5"); + SnapshotHudiTable.Result result = + HudiToIcebergMigrationSparkIntegration.snapshotHudiTable( + spark, partitionedLocation, newTableIdentifier) + .tableProperties(ImmutableMap.of("test", "test")) + .execute(); + checkSnapshotIntegrity(partitionedLocation, newTableIdentifier); + checkIcebergTableProperties( + newTableIdentifier, ImmutableMap.of("test", "test"), partitionedLocation); + } + + @Test + public void testSnapshotWithComplexKeyGen() { + writeHoodieTableKeyGenerator( + multiDataFrame(0, 1), + "decimalCol,dateCol", + "magic_number", + "zpartitionPath,partitionPath,partitionPath2", + SaveMode.Append, + partitionedLocation, + partitionedIdentifier); + String newTableIdentifier = destName(icebergCatalogName, "alpha_iceberg_table_6"); + SnapshotHudiTable.Result result = + HudiToIcebergMigrationSparkIntegration.snapshotHudiTable( + spark, partitionedLocation, newTableIdentifier) + .tableProperties(ImmutableMap.of("test", "test")) + .execute(); + checkSnapshotIntegrity(partitionedLocation, newTableIdentifier); + checkIcebergTableProperties( + newTableIdentifier, ImmutableMap.of("test", "test"), partitionedLocation); + } + + private void checkSnapshotIntegrity(String hudiTableLocation, String icebergTableIdentifier) { + Dataset hudiResult = + spark + .read() + .format("hudi") + .option( + DataSourceReadOptions.QUERY_TYPE().key(), + DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()) + .load(hudiTableLocation); + Dataset icebergResult = spark.sql("SELECT * FROM " + icebergTableIdentifier); + // TODO: adjust test technique since hudi tends to return the columns in a different order (put + // the one used for partitioning last) + List hudiTableContents = hudiResult.collectAsList(); + List icebergTableContents = icebergResult.collectAsList(); + + Assertions.assertThat(hudiTableContents).hasSize(icebergTableContents.size()); + Assertions.assertThat(hudiTableContents) + .containsExactlyInAnyOrderElementsOf(icebergTableContents); + } + + private void checkIcebergTableLocation(String icebergTableIdentifier, String expectedLoacation) { + Table table = getIcebergTable(icebergTableIdentifier); + Assertions.assertThat(table.location()).isEqualTo(expectedLoacation); + } + + private void checkIcebergTableProperties( + String icebergTableIdentifier, + Map expectedAdditionalProperties, + String hudiTableLocation) { + Table icebergTable = getIcebergTable(icebergTableIdentifier); + ImmutableMap.Builder expectedPropertiesBuilder = ImmutableMap.builder(); + // The snapshot action will put some fixed properties to the table + expectedPropertiesBuilder.put(SNAPSHOT_SOURCE_PROP, HUDI_SOURCE_VALUE); + expectedPropertiesBuilder.putAll(expectedAdditionalProperties); + ImmutableMap expectedProperties = expectedPropertiesBuilder.build(); + + Assertions.assertThat(icebergTable.properties().entrySet()) + .containsAll(expectedProperties.entrySet()); + Assertions.assertThat(icebergTable.properties()) + .containsEntry(ORIGINAL_LOCATION_PROP, hudiTableLocation); + } + + private Table getIcebergTable(String icebergTableIdentifier) { + CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog(); + Spark3Util.CatalogAndIdentifier catalogAndIdent = + Spark3Util.catalogAndIdentifier( + "test catalog", spark, icebergTableIdentifier, defaultCatalog); + return Spark3Util.loadIcebergCatalog(spark, catalogAndIdent.catalog().name()) + .loadTable(TableIdentifier.parse(catalogAndIdent.identifier().toString())); + } + + private String destName(String catalogName, String dest) { + if (catalogName.equals(defaultSparkCatalog)) { + return NAMESPACE + "." + catalogName + "_" + dest; + } + return catalogName + "." + NAMESPACE + "." + catalogName + "_" + dest; + } + + private Dataset typeTestDataFrame() { + return spark + .range(0, 5, 1, 5) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) + .withColumn("booleanCol", expr("longCol > 5")) + .withColumn("binaryCol", expr("CAST(longCol AS BINARY)")) + .withColumn("byteCol", expr("CAST(longCol AS BYTE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) + .withColumn("shortCol", expr("CAST(longCol AS SHORT)")) + .withColumn("mapCol", expr("MAP(stringCol, intCol)")) // Hudi requires Map key to be String + .withColumn("arrayCol", expr("ARRAY(dateCol)")) + .withColumn("structCol", expr("STRUCT(longCol AS a, longCol AS b)")) + .withColumn("partitionPath", expr("CAST(longCol AS STRING)")); + } + + private Dataset multiDataFrame(int start, int end) { + return spark + .range(start, end, 1, end - start) + .withColumn("longCol", expr("id")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) + .withColumn("magic_number", expr("rand(5) * 100")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("dateString", expr("CAST(dateCol AS STRING)")) + .withColumn("random1", expr("CAST(rand(5) * 100 as LONG)")) + .withColumn("random2", expr("CAST(rand(51) * 100 as LONG)")) + .withColumn("random3", expr("CAST(rand(511) * 100 as LONG)")) + .withColumn("random4", expr("CAST(rand(15) * 100 as LONG)")) + .withColumn("random5", expr("CAST(rand(115) * 100 as LONG)")) + .withColumn("innerStruct1", expr("STRUCT(random1, random2)")) + .withColumn("innerStruct2", expr("STRUCT(random3, random4)")) + .withColumn("structCol1", expr("STRUCT(innerStruct1, innerStruct2)")) + .withColumn( + "innerStruct3", + expr("STRUCT(SHA1(CAST(random5 AS BINARY)), SHA1(CAST(random1 AS BINARY)))")) + .withColumn( + "structCol2", + expr( + "STRUCT(innerStruct3, STRUCT(SHA1(CAST(random2 AS BINARY)), SHA1(CAST(random3 AS BINARY))))")) + .withColumn("zpartitionPath", expr("CAST(dateCol AS STRING)")) + .withColumn("partitionPath", expr("CAST(id AS STRING)")) + .withColumn("partitionPath2", expr("CAST(random1 AS STRING)")); + } + + private Dataset nestedDataFrame() { + return spark + .range(0, 5, 1, 5) + .withColumn("longCol", expr("id")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) + .withColumn("magic_number", expr("rand(5) * 100")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("dateString", expr("CAST(dateCol AS STRING)")) + .withColumn("random1", expr("CAST(rand(5) * 100 as LONG)")) + .withColumn("random2", expr("CAST(rand(51) * 100 as LONG)")) + .withColumn("random3", expr("CAST(rand(511) * 100 as LONG)")) + .withColumn("random4", expr("CAST(rand(15) * 100 as LONG)")) + .withColumn("random5", expr("CAST(rand(115) * 100 as LONG)")) + .withColumn("innerStruct1", expr("STRUCT(random1, random2)")) + .withColumn("innerStruct2", expr("STRUCT(random3, random4)")) + .withColumn("structCol1", expr("STRUCT(innerStruct1, innerStruct2)")) + .withColumn( + "innerStruct3", + expr("STRUCT(SHA1(CAST(random5 AS BINARY)), SHA1(CAST(random1 AS BINARY)))")) + .withColumn( + "structCol2", + expr( + "STRUCT(innerStruct3, STRUCT(SHA1(CAST(random2 AS BINARY)), SHA1(CAST(random3 AS BINARY))))")) + .withColumn("arrayCol", expr("ARRAY(random1, random2, random3, random4, random5)")) + .withColumn("mapCol1", expr("MAP(structCol1, structCol2)")) + .withColumn("mapCol2", expr("MAP(longCol, dateString)")) + .withColumn("mapCol3", expr("MAP(dateCol, arrayCol)")) + .withColumn("structCol3", expr("STRUCT(structCol2, mapCol3, arrayCol)")); + } + + private void writeHoodieTable( + Dataset df, + String recordKey, + String preCombineKey, + String partitionPathField, + SaveMode saveMode, + String tableLocation, + String tableIdentifier) { + df.write() + .format("hudi") + // .options(QuickstartUtils.getQuickstartWriteConfigs()) + .option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey) + .option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), preCombineKey) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), partitionPathField) + .option(HoodieWriteConfig.TBL_NAME.key(), tableIdentifier) + .mode(saveMode) + .save(tableLocation); + } + + private void writeHoodieTableKeyGenerator( + Dataset df, + String recordKey, + String preCombineKey, + String partitionPathField, + SaveMode saveMode, + String tableLocation, + String tableIdentifier) { + df.write() + .format("hudi") + // .options(QuickstartUtils.getQuickstartWriteConfigs()) + .option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey) + .option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), preCombineKey) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), partitionPathField) + .option( + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), + "org.apache.hudi.keygen.ComplexKeyGenerator") + .option(HoodieWriteConfig.TBL_NAME.key(), tableIdentifier) + .mode(saveMode) + .save(tableLocation); + } + + private void writeHoodieTableOperation( + Dataset df, + String operationKey, + String recordKey, + String preCombineKey, + String partitionPathField, + SaveMode saveMode, + String tableLocation, + String tableIdentifier) { + df.write() + .format("hudi") + .option(DataSourceWriteOptions.OPERATION().key(), operationKey) + .option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey) + .option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), preCombineKey) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), partitionPathField) + .option(HoodieWriteConfig.TBL_NAME.key(), tableIdentifier) + .mode(saveMode) + .save(tableLocation); + } +} diff --git a/hudi/src/main/java/org/apache/iceberg/hudi/BaseSnapshotHudiTableAction.java b/hudi/src/main/java/org/apache/iceberg/hudi/BaseSnapshotHudiTableAction.java new file mode 100644 index 000000000000..c80a6600f64f --- /dev/null +++ b/hudi/src/main/java/org/apache/iceberg/hudi/BaseSnapshotHudiTableAction.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hudi; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseSnapshotHudiTableAction implements SnapshotHudiTable { + + private static final Logger LOG = + LoggerFactory.getLogger(BaseSnapshotHudiTableAction.class.getName()); + private static final String SNAPSHOT_SOURCE_PROP = "snapshot_source"; + private static final String HOODIE_SOURCE_VALUE = "hudi"; + private static final String ORIGINAL_LOCATION_PROP = "original_location"; + private static final String PARQUET_SUFFIX = ".parquet"; + private static final String AVRO_SUFFIX = ".avro"; + private static final String ORC_SUFFIX = ".orc"; + private HoodieTableMetaClient hoodieTableMetaClient; + private HoodieTableConfig hoodieTableConfig; + private HoodieEngineContext hoodieEngineContext; + private HoodieMetadataConfig hoodieMetadataConfig; + private String hoodieTableBasePath; + private Catalog icebergCatalog; + private TableIdentifier newTableIdentifier; + private String newTableLocation; + private HadoopFileIO hoodieFileIO; + private ImmutableMap.Builder additionalPropertiesBuilder = ImmutableMap.builder(); + + public BaseSnapshotHudiTableAction(String hoodieTableBasePath) { + this.hoodieTableBasePath = hoodieTableBasePath; + this.newTableLocation = hoodieTableBasePath; + } + + @Override + public SnapshotHudiTable tableProperties(Map properties) { + additionalPropertiesBuilder.putAll(properties); + return this; + } + + @Override + public SnapshotHudiTable tableProperty(String name, String value) { + additionalPropertiesBuilder.put(name, value); + return this; + } + + @Override + public SnapshotHudiTable tableLocation(String location) { + this.newTableLocation = location; + return this; + } + + @Override + public SnapshotHudiTable as(TableIdentifier identifier) { + this.newTableIdentifier = identifier; + return this; + } + + @Override + public SnapshotHudiTable icebergCatalog(Catalog catalog) { + this.icebergCatalog = catalog; + return this; + } + + @Override + public SnapshotHudiTable hoodieConfiguration(Configuration configuration) { + this.hoodieTableMetaClient = buildTableMetaClient(configuration, hoodieTableBasePath); + this.hoodieTableConfig = hoodieTableMetaClient.getTableConfig(); + this.hoodieEngineContext = new HoodieLocalEngineContext(configuration); + this.hoodieMetadataConfig = buildMetadataConfig(configuration); + this.hoodieFileIO = new HadoopFileIO(configuration); + return this; + } + + @Override + public Result execute() { + + // Convert Hoodie table schema to Iceberg schema and extract the partition spec + InternalSchema hudiSchema = getHudiSchema(); + Schema icebergSchema = convertToIcebergSchema(hudiSchema); + PartitionSpec partitionSpec = getPartitionSpecFromHoodieMetadataData(icebergSchema); + + Transaction icebergTransaction = + icebergCatalog.newCreateTableTransaction( + newTableIdentifier, + icebergSchema, + partitionSpec, + newTableLocation, + destTableProperties()); + // Need name mapping to ensure we can read data files correctly as iceberg table has its own + // rule to assign field id + NameMapping nameMapping = MappingUtil.create(icebergTransaction.table().schema()); + icebergTransaction + .table() + .updateProperties() + .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping)) + .commit(); + + // Pre-process the timeline, we only need to process all COMPLETED commit for COW table + // Commit that has been rollbacked will not be in either REQUESTED or INFLIGHT state + HoodieTimeline timeline = + hoodieTableMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + // Initialize the FileSystemView for querying table data files + HoodieTableFileSystemView hoodieTableFileSystemView = + new HoodieTableFileSystemView(hoodieTableMetaClient, timeline); + List partitionPaths = + FSUtils.getAllPartitionPaths( + hoodieEngineContext, + hoodieMetadataConfig, + hoodieTableMetaClient.getBasePathV2().toString()); + try { + for (String partitionPath : partitionPaths) { + Path fullPartitionPath = + FSUtils.getPartitionPath(hoodieTableMetaClient.getBasePathV2(), partitionPath); + hoodieTableFileSystemView.addFilesToView( + FSUtils.getAllDataFilesInPartition(hoodieTableMetaClient.getFs(), fullPartitionPath)); + } + } catch (IOException e) { + throw new RuntimeException("Failed to get all data files in partition", e); + } + // get all instants on the timeline + Stream completedInstants = timeline.getInstants(); + // file group id -> Map + // This pre-process aims to make a timestamp to HoodieBaseFile map for each file group + Map> allStampedDataFiles = + hoodieTableFileSystemView + .fetchAllStoredFileGroups() + .collect( + ImmutableMap.toImmutableMap( + HoodieFileGroup::getFileGroupId, + fileGroup -> + fileGroup + .getAllBaseFiles() + .collect( + ImmutableMap.toImmutableMap( + HoodieBaseFile::getCommitTime, baseFile -> baseFile)))); + + // Help tracked if a previous version of the data file has been added to the iceberg table + Map convertedDataFiles = Maps.newHashMap(); + // Replay the timeline from beginning to the end + completedInstants.forEachOrdered( + instant -> { + // commit each instant as a transaction to the iceberg table + commitHoodieInstantToIcebergTransaction( + instant, + hoodieTableFileSystemView.getAllFileGroups(), + allStampedDataFiles, + convertedDataFiles, + icebergTransaction); + }); + Snapshot icebergSnapshot = icebergTransaction.table().currentSnapshot(); + long totalDataFiles = + icebergSnapshot != null + ? Long.parseLong(icebergSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)) + : 0; + icebergTransaction.commitTransaction(); + LOG.info( + "Successfully created Iceberg table {} from hudi table at {}, total data file count: {}", + newTableIdentifier, + hoodieTableBasePath, + totalDataFiles); + return new BaseSnapshotHudiTableActionResult(totalDataFiles); + } + + /** + * In COW Hoodie table, each file group is a combination of different versions of the same data + * file. + * + *

During each write, a new version of the file will be copied and modified to be a new version + * in the file group. Therefore, when committing the datafile to the iceberg table, we need to + * make sure that the older version of the data file is deleted before adding the newer version of + * the data file. + * + *

In other words, the COW behavior can be mapped to the overwrite operation in the iceberg. + */ + public void commitHoodieInstantToIcebergTransaction( + HoodieInstant instant, + Stream fileGroups, + Map> allStampedDataFiles, + Map convertedDataFiles, + Transaction transaction) { + List filesToAdd = Lists.newArrayList(); + List filesToRemove = Lists.newArrayList(); + + fileGroups + .sequential() + .forEach( + fileGroup -> { + HoodieFileGroupId fileGroupId = fileGroup.getFileGroupId(); + LOG.info("Alpha test: get file group: {}", fileGroup); + DataFile currentDataFile = + buildDataFileFromHoodieBaseFile( + instant, + fileGroup, + allStampedDataFiles.get(fileGroupId), + transaction.table()); + + if (currentDataFile != null) { + filesToAdd.add(currentDataFile); + + DataFile previousDataFile = convertedDataFiles.get(fileGroupId); + if (previousDataFile != null) { + // need to delete the previous data file since a new version will be added + filesToRemove.add(previousDataFile); + } + + // update the converted data file map + convertedDataFiles.put(fileGroupId, currentDataFile); + } + }); + LOG.info("Alpha test: get files to add: {} at instant {}", filesToAdd, instant); + if (filesToAdd.size() > 0 && filesToRemove.size() > 0) { + // OverwriteFiles case + OverwriteFiles overwriteFiles = transaction.newOverwrite(); + filesToAdd.forEach(overwriteFiles::addFile); + filesToRemove.forEach(overwriteFiles::deleteFile); + overwriteFiles.commit(); + } else if (filesToAdd.size() > 0) { + // AppendFiles case + AppendFiles appendFiles = transaction.newAppend(); + filesToAdd.forEach(appendFiles::appendFile); + appendFiles.commit(); + } else if (filesToRemove.size() > 0) { + // DeleteFiles case + DeleteFiles deleteFiles = transaction.newDelete(); + filesToRemove.forEach(deleteFiles::deleteFile); + deleteFiles.commit(); + } + } + + @Nullable + private DataFile buildDataFileFromHoodieBaseFile( + HoodieInstant instant, + HoodieFileGroup fileGroup, + Map stampedDataFiles, + Table table) { + HoodieBaseFile baseFile = stampedDataFiles.get(instant.getTimestamp()); + if (baseFile == null) { + LOG.info( + "Alpha test: does not have base file for instant: {}, fileGroupId {}", + instant, + fileGroup.getFileGroupId()); + return null; + } + + PartitionSpec spec = table.spec(); + String path = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); + String[] partitionValues = fileGroup.getPartitionPath().split("/"); + List partitionFields = spec.fields(); + Preconditions.checkState( + partitionValues.length == partitionFields.size() || partitionFields.isEmpty(), + "Invalid partition values"); + // map partition values to spec + ImmutableMap.Builder partitionValueMapBuilder = ImmutableMap.builder(); + ImmutableMap partitionValueMap; + for (int i = 0; i < partitionFields.size(); i++) { + partitionValueMapBuilder.put(partitionFields.get(i).name(), partitionValues[i]); + } + partitionValueMap = partitionValueMapBuilder.build(); + + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + NameMapping nameMapping = + nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + + InputFile file = hoodieFileIO.newInputFile(path); + FileFormat format = determineFileFormatFromPath(path); + Metrics metrics = getMetricsForFile(file, format, metricsConfig, nameMapping); + + String partition = + partitionValueMap.entrySet().stream() + .map(e -> String.format("%s=%s", e.getKey(), e.getValue())) + .collect(Collectors.joining("/")); + + return DataFiles.builder(spec) + .withPath(path) + .withFormat(format) + .withFileSizeInBytes(fileSize) + .withPartitionPath(partition) // TODO: need to handle multiple partition fields + .withMetrics(metrics) + .build(); + } + + /** + * Taken from getInternalSchema + * in HoodieWriteClient. + */ + private InternalSchema getHudiSchema() { + TableSchemaResolver schemaUtil = new TableSchemaResolver(hoodieTableMetaClient); + Option hudiSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + return hudiSchema.orElseGet( + () -> { + try { + return AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema()); + } catch (Exception e) { + throw new HoodieException("cannot find schema for current table"); + } + }); + } + + /** + * Use nested type visitor to convert the internal schema to iceberg schema. + * + *

just like what we did with spark table's schema and delta lake table's schema. + */ + private Schema convertToIcebergSchema(InternalSchema hudiSchema) { + Type converted = + HudiDataTypeVisitor.visit( + hudiSchema.getRecord(), new HudiDataTypeToType(hudiSchema.getRecord())); + return new Schema(converted.asNestedType().asStructType().fields()); + } + + private PartitionSpec getPartitionSpecFromHoodieMetadataData(Schema schema) { + Option partitionNames = hoodieTableConfig.getPartitionFields(); + if (partitionNames.isPresent() && partitionNames.get().length > 0) { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + for (String partitionName : partitionNames.get()) { + builder.identity(partitionName); + } + return builder.build(); + } + + return PartitionSpec.unpartitioned(); + } + + private Map destTableProperties() { + additionalPropertiesBuilder.putAll(hoodieTableConfig.propsMap()); + additionalPropertiesBuilder.putAll( + ImmutableMap.of( + SNAPSHOT_SOURCE_PROP, + HOODIE_SOURCE_VALUE, + ORIGINAL_LOCATION_PROP, + hoodieTableBasePath)); + + return additionalPropertiesBuilder.build(); + } + + private static HoodieTableMetaClient buildTableMetaClient(Configuration conf, String basePath) { + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder() + .setConf(conf) + .setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true) + .build(); + return metaClient; + } + + private FileFormat determineFileFormatFromPath(String path) { + if (path.endsWith(PARQUET_SUFFIX)) { + return FileFormat.PARQUET; + } else if (path.endsWith(AVRO_SUFFIX)) { + return FileFormat.AVRO; + } else if (path.endsWith(ORC_SUFFIX)) { + return FileFormat.ORC; + } else { + throw new ValidationException("Cannot determine file format from path %s", path); + } + } + + private Metrics getMetricsForFile( + InputFile file, FileFormat format, MetricsConfig metricsSpec, NameMapping mapping) { + switch (format) { + case AVRO: + long rowCount = Avro.rowCount(file); + return new Metrics(rowCount, null, null, null, null); + case PARQUET: + return ParquetUtil.fileMetrics(file, metricsSpec, mapping); + case ORC: + return OrcMetrics.fromInputFile(file, metricsSpec, mapping); + default: + throw new ValidationException("Cannot get metrics from file format: %s", format); + } + } + + private HoodieMetadataConfig buildMetadataConfig(Configuration conf) { + return HoodieMetadataConfig.newBuilder() + .enable( + conf.getBoolean( + HoodieMetadataConfig.ENABLE.key(), + HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)) + .build(); + } +} diff --git a/hudi/src/main/java/org/apache/iceberg/hudi/BaseSnapshotHudiTableActionResult.java b/hudi/src/main/java/org/apache/iceberg/hudi/BaseSnapshotHudiTableActionResult.java new file mode 100644 index 000000000000..ba6c85ab97d3 --- /dev/null +++ b/hudi/src/main/java/org/apache/iceberg/hudi/BaseSnapshotHudiTableActionResult.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hudi; + +public class BaseSnapshotHudiTableActionResult implements SnapshotHudiTable.Result { + + private final long snapshotFilesCount; + + public BaseSnapshotHudiTableActionResult(long snapshotFilesCount) { + this.snapshotFilesCount = snapshotFilesCount; + } + + @Override + public long snapshotFilesCount() { + return snapshotFilesCount; + } +} diff --git a/hudi/src/main/java/org/apache/iceberg/hudi/HudiDataTypeToType.java b/hudi/src/main/java/org/apache/iceberg/hudi/HudiDataTypeToType.java new file mode 100644 index 000000000000..370e192fead8 --- /dev/null +++ b/hudi/src/main/java/org/apache/iceberg/hudi/HudiDataTypeToType.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hudi; + +import java.util.List; +import org.apache.hudi.internal.schema.Types; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; + +public class HudiDataTypeToType extends HudiDataTypeVisitor { + private final Types.RecordType root; + private int nextId = 0; + + HudiDataTypeToType() { + this.root = null; + } + + HudiDataTypeToType(Types.RecordType root) { + this.root = root; + this.nextId = root.fields().size(); + } + + private int getNextId() { + int next = nextId; + nextId += 1; + return next; + } + + @SuppressWarnings("ReferenceEquality") + @Override + public Type record(Types.RecordType record, List fieldResults) { + List fields = record.fields(); + List newFields = + Lists.newArrayListWithExpectedSize(fields.size()); + boolean isRoot = root == record; + for (int i = 0; i < fields.size(); i += 1) { + Types.Field field = fields.get(i); + Type type = fieldResults.get(i); + int id; + if (isRoot) { + id = i; + } else { + id = getNextId(); + } + + String doc = field.doc(); + if (field.isOptional()) { + newFields.add( + org.apache.iceberg.types.Types.NestedField.optional(id, field.name(), type, doc)); + } else { + newFields.add( + org.apache.iceberg.types.Types.NestedField.required(id, field.name(), type, doc)); + } + } + + return org.apache.iceberg.types.Types.StructType.of(newFields); + } + + @Override + public Type field(Types.Field field, Type typeResult) { + return typeResult; + } + + @Override + public Type map(Types.MapType map, Type keyResult, Type valueResult) { + if (map.isValueOptional()) { + return org.apache.iceberg.types.Types.MapType.ofOptional( + getNextId(), getNextId(), keyResult, valueResult); + } else { + return org.apache.iceberg.types.Types.MapType.ofRequired( + getNextId(), getNextId(), keyResult, valueResult); + } + } + + @Override + public Type array(Types.ArrayType array, Type elementResult) { + if (array.isElementOptional()) { + return org.apache.iceberg.types.Types.ListType.ofOptional(getNextId(), elementResult); + } else { + return org.apache.iceberg.types.Types.ListType.ofRequired(getNextId(), elementResult); + } + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + @Override + public Type atomic(org.apache.hudi.internal.schema.Type atomic) { + if (atomic instanceof Types.BooleanType) { + return org.apache.iceberg.types.Types.BooleanType.get(); + } else if (atomic instanceof Types.IntType) { + return org.apache.iceberg.types.Types.IntegerType.get(); + } else if (atomic instanceof Types.LongType) { + return org.apache.iceberg.types.Types.LongType.get(); + } else if (atomic instanceof Types.FloatType) { + return org.apache.iceberg.types.Types.FloatType.get(); + } else if (atomic instanceof Types.DoubleType) { + return org.apache.iceberg.types.Types.DoubleType.get(); + } else if (atomic instanceof Types.DateType) { + return org.apache.iceberg.types.Types.DateType.get(); + } else if (atomic instanceof Types.TimestampType) { + return org.apache.iceberg.types.Types.TimestampType.withZone(); + } else if (atomic instanceof Types.StringType) { + return org.apache.iceberg.types.Types.StringType.get(); + } else if (atomic instanceof Types.BinaryType) { + return org.apache.iceberg.types.Types.BinaryType.get(); + } else if (atomic instanceof Types.UUIDType) { + return org.apache.iceberg.types.Types.UUIDType.get(); + } else if (atomic instanceof Types.DecimalType) { + return org.apache.iceberg.types.Types.DecimalType.of( + ((Types.DecimalType) atomic).precision(), ((Types.DecimalType) atomic).scale()); + } else if (atomic instanceof Types.FixedType) { + return org.apache.iceberg.types.Types.FixedType.ofLength( + ((Types.FixedType) atomic).getFixedSize()); + } else if (atomic instanceof Types.TimeType) { + return org.apache.iceberg.types.Types.TimeType.get(); + } + + throw new ValidationException("Not a supported type: %s", atomic.getClass().getName()); + } +} diff --git a/hudi/src/main/java/org/apache/iceberg/hudi/HudiDataTypeVisitor.java b/hudi/src/main/java/org/apache/iceberg/hudi/HudiDataTypeVisitor.java new file mode 100644 index 000000000000..caedeb5eba29 --- /dev/null +++ b/hudi/src/main/java/org/apache/iceberg/hudi/HudiDataTypeVisitor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hudi; + +import java.util.List; +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public abstract class HudiDataTypeVisitor { + + public static T visit(Type type, HudiDataTypeVisitor visitor) { + if (type instanceof Types.RecordType) { + List fields = ((Types.RecordType) type).fields(); + List fieldResults = Lists.newArrayListWithExpectedSize(fields.size()); + + for (Types.Field field : fields) { + fieldResults.add(visitor.field(field, visit(field.type(), visitor))); + } + + return visitor.record((Types.RecordType) type, fieldResults); + } else if (type instanceof Types.MapType) { + return visitor.map( + (Types.MapType) type, + visit(((Types.MapType) type).keyType(), visitor), + visit(((Types.MapType) type).valueType(), visitor)); + } else if (type instanceof Types.ArrayType) { + return visitor.array( + (Types.ArrayType) type, visit(((Types.ArrayType) type).elementType(), visitor)); + } + return visitor.atomic(type); + } + + public abstract T record(Types.RecordType record, List fieldResults); + + public abstract T field(Types.Field field, T typeResult); + + public abstract T array(Types.ArrayType array, T elementResult); + + public abstract T map(Types.MapType map, T keyResult, T valueResult); + + public abstract T atomic(Type atomic); +} diff --git a/hudi/src/main/java/org/apache/iceberg/hudi/HudiToIcebergMigrationActionsProvider.java b/hudi/src/main/java/org/apache/iceberg/hudi/HudiToIcebergMigrationActionsProvider.java new file mode 100644 index 000000000000..8ba58e2ed203 --- /dev/null +++ b/hudi/src/main/java/org/apache/iceberg/hudi/HudiToIcebergMigrationActionsProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hudi; + +/** + * An API that provide actions for migration from an Apache Hudi table to an Iceberg table. Query + * engines can use {@code defaultActions()} to access default action implementations, or implement + * this provider to supply a different implementation if necessary. + */ +public interface HudiToIcebergMigrationActionsProvider { + + /** + * Initiates an action to snapshot an existing Delta Lake table to an Iceberg table. + * + * @param sourceTableLocation the location of the Delta Lake table + * @return a {@link SnapshotHudiTable} action + */ + default SnapshotHudiTable snapshotHudiTable(String sourceTableLocation) { + return new BaseSnapshotHudiTableAction(sourceTableLocation); + } + + /** + * Get the default implementation of {@link HudiToIcebergMigrationActionsProvider} + * + * @return an instance with access to all default actions + */ + static HudiToIcebergMigrationActionsProvider defaultProvider() { + return DefaultHudiToIcebergMigrationActions.defaultMigrationActions(); + } + + class DefaultHudiToIcebergMigrationActions implements HudiToIcebergMigrationActionsProvider { + private static final DefaultHudiToIcebergMigrationActions INSTANCE = + new DefaultHudiToIcebergMigrationActions(); + + private DefaultHudiToIcebergMigrationActions() {} + + public static DefaultHudiToIcebergMigrationActions defaultMigrationActions() { + return INSTANCE; + } + } +} diff --git a/hudi/src/main/java/org/apache/iceberg/hudi/SnapshotHudiTable.java b/hudi/src/main/java/org/apache/iceberg/hudi/SnapshotHudiTable.java new file mode 100644 index 000000000000..cf86139516d8 --- /dev/null +++ b/hudi/src/main/java/org/apache/iceberg/hudi/SnapshotHudiTable.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hudi; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.actions.Action; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; + +public interface SnapshotHudiTable extends Action { + + /** + * Sets table properties in the newly created Iceberg table. Any properties with the same key name + * will be overwritten. + * + * @param properties a map of properties to set + * @return this for method chaining + */ + SnapshotHudiTable tableProperties(Map properties); + + /** + * Sets a table property in the newly created Iceberg table. Any properties with the same key will + * be overwritten. + * + * @param name a table property name + * @param value a table property value + * @return this for method chaining + */ + SnapshotHudiTable tableProperty(String name, String value); + + /** + * Sets the location of the newly created Iceberg table. Default location is the same as the Hudi + * table. + * + * @param location a path to the new table location + * @return this for method chaining + */ + SnapshotHudiTable tableLocation(String location); + + /** + * Sets the identifier of the newly created Iceberg table. This is required to be set before + * execute the action. + * + * @param identifier a table identifier (namespace, name) @Returns this for method chaining + */ + SnapshotHudiTable as(TableIdentifier identifier); + + /** + * Sets the catalog of the newly created Iceberg table. This is required to be set before execute + * the action + * + * @param catalog a catalog @Returns this for method chaining + */ + SnapshotHudiTable icebergCatalog(Catalog catalog); + + /** + * Sets the Hadoop configuration used to access hudi table's timeline and file groups. This is + * required to be set before execute the action. + * + * @param conf a Hadoop configuration @Returns this for method chaining + */ + SnapshotHudiTable hoodieConfiguration(Configuration conf); + + /** The action result that contains a summary of the execution. */ + interface Result { + + /** Returns the number of snapshot data files. */ + long snapshotFilesCount(); + } +} diff --git a/settings.gradle b/settings.gradle index c5ac07e080c2..5201184f42af 100644 --- a/settings.gradle +++ b/settings.gradle @@ -35,6 +35,7 @@ include 'nessie' include 'gcp' include 'dell' include 'snowflake' +include 'hudi' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -53,6 +54,7 @@ project(':nessie').name = 'iceberg-nessie' project(':gcp').name = 'iceberg-gcp' project(':dell').name = 'iceberg-dell' project(':snowflake').name = 'iceberg-snowflake' +project(':hudi').name = 'iceberg-hudi' if (null != System.getProperty("allVersions")) { System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions")) diff --git a/versions.props b/versions.props index 99dbea48a244..3739ab1748cf 100644 --- a/versions.props +++ b/versions.props @@ -29,6 +29,7 @@ org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0 com.emc.ecs:object-client-bundle = 3.3.2 org.immutables:value = 2.9.2 net.snowflake:snowflake-jdbc = 3.13.22 +org.apache.hudi:* = 0.12.0 # test deps org.junit.vintage:junit-vintage-engine = 5.8.2