diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 49bdda2f25264..9d198641389de 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -22,7 +22,7 @@ on: - master - 'release-*' env: - MVN_ARGS: -e -ntp -B -V -Pwarn-log -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn + MVN_ARGS: -e -ntp -B -V -Dgpg.skip -Djacoco.skip -Pwarn-log -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn SPARK_COMMON_MODULES: hudi-spark-datasource/hudi-spark,hudi-spark-datasource/hudi-spark-common jobs: @@ -51,22 +51,18 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.11" + - scalaProfile: "scala-2.11 -Dscala.binary.version=2.11" sparkProfile: "spark2.4" sparkModules: "hudi-spark-datasource/hudi-spark2" - scalaProfile: "scala-2.12" - sparkProfile: "spark2.4" - sparkModules: "hudi-spark-datasource/hudi-spark2" + sparkProfile: "spark3.0" + sparkModules: "hudi-spark-datasource/hudi-spark3.0.x" - scalaProfile: "scala-2.12" sparkProfile: "spark3.1" sparkModules: "hudi-spark-datasource/hudi-spark3.1.x" - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.0" - sparkModules: "hudi-spark-datasource/hudi-spark3.0.x" - - scalaProfile: "scala-2.12" sparkProfile: "spark3.2" sparkModules: "hudi-spark-datasource/hudi-spark3.2.x" @@ -88,7 +84,7 @@ jobs: SCALA_PROFILE: ${{ matrix.scalaProfile }} SPARK_PROFILE: ${{ matrix.sparkProfile }} run: - mvn clean install -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl hudi-examples/hudi-examples-spark,hudi-spark-datasource/hudi-spark -am -DskipTests=true $MVN_ARGS + mvn clean install -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS - name: Quickstart Test env: SCALA_PROFILE: ${{ matrix.scalaProfile }} @@ -100,7 +96,7 @@ jobs: SCALA_PROFILE: ${{ matrix.scalaProfile }} SPARK_PROFILE: ${{ matrix.sparkProfile }} SPARK_MODULES: ${{ matrix.sparkModules }} - if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI + if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI run: mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS - name: FT - Spark @@ -108,7 +104,7 @@ jobs: SCALA_PROFILE: ${{ matrix.scalaProfile }} SPARK_PROFILE: ${{ matrix.sparkProfile }} SPARK_MODULES: ${{ matrix.sparkModules }} - if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI + if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI run: mvn test -Pfunctional-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS @@ -142,6 +138,14 @@ jobs: FLINK_PROFILE: ${{ matrix.flinkProfile }} run: mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-examples/hudi-examples-flink $MVN_ARGS + - name: Integration Test + env: + SCALA_PROFILE: 'scala-2.12' + FLINK_PROFILE: ${{ matrix.flinkProfile }} + if: ${{ endsWith(env.FLINK_PROFILE, '1.17') }} + run: | + mvn clean install -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink -am -Davro.version=1.10.0 -DskipTests=true $MVN_ARGS + mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink $MVN_ARGS validate-bundles: runs-on: ubuntu-latest @@ -269,3 +273,44 @@ jobs: if: ${{ endsWith(env.SPARK_PROFILE, '3.3') }} # Only Spark 3.3 supports Java 17 as of now run: | ./packaging/bundle-validation/ci_run.sh $HUDI_VERSION openjdk17 $STAGING_REPO_NUM + + integration-tests: + runs-on: ubuntu-latest + strategy: + matrix: + include: + - sparkProfile: 'spark2.4' + sparkArchive: 'spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz' + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 8 + uses: actions/setup-java@v2 + with: + java-version: '8' + distribution: 'adopt' + architecture: x64 + - name: Build Project + env: + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SCALA_PROFILE: '-Dscala-2.11 -Dscala.binary.version=2.11' + run: + mvn clean install $SCALA_PROFILE -D"$SPARK_PROFILE" -Pintegration-tests -DskipTests=true $MVN_ARGS + - name: 'UT integ-test' + env: + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SCALA_PROFILE: '-Dscala-2.11 -Dscala.binary.version=2.11' + run: + mvn test $SCALA_PROFILE -D"$SPARK_PROFILE" -Pintegration-tests -DskipUTs=false -DskipITs=true -pl hudi-integ-test $MVN_ARGS + - name: 'IT' + env: + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_ARCHIVE: ${{ matrix.sparkArchive }} + SCALA_PROFILE: '-Dscala-2.11 -Dscala.binary.version=2.11' + run: | + echo "Downloading $SPARK_ARCHIVE" + curl https://archive.apache.org/dist/spark/$SPARK_ARCHIVE --create-dirs -o $GITHUB_WORKSPACE/$SPARK_ARCHIVE + tar -xvf $GITHUB_WORKSPACE/$SPARK_ARCHIVE -C $GITHUB_WORKSPACE/ + mkdir /tmp/spark-events/ + SPARK_ARCHIVE_BASENAME=$(basename $SPARK_ARCHIVE) + export SPARK_HOME=$GITHUB_WORKSPACE/${SPARK_ARCHIVE_BASENAME%.*} + mvn verify $SCALA_PROFILE -D"$SPARK_PROFILE" -Pintegration-tests -pl !hudi-flink-datasource/hudi-flink $MVN_ARGS diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml index 7d391d4a4c35c..d7963cf1e47f0 100644 --- a/azure-pipelines-20230430.yml +++ b/azure-pipelines-20230430.yml @@ -46,8 +46,9 @@ parameters: default: - 'hudi-spark-datasource' - 'hudi-spark-datasource/hudi-spark' - - 'hudi-spark-datasource/hudi-spark2' - - 'hudi-spark-datasource/hudi-spark2-common' + - 'hudi-spark-datasource/hudi-spark3.2.x' + - 'hudi-spark-datasource/hudi-spark3.2plus-common' + - 'hudi-spark-datasource/hudi-spark3-common' - 'hudi-spark-datasource/hudi-spark-common' - name: job4UTModules type: object @@ -68,8 +69,9 @@ parameters: - '!hudi-flink-datasource/hudi-flink1.17.x' - '!hudi-spark-datasource' - '!hudi-spark-datasource/hudi-spark' - - '!hudi-spark-datasource/hudi-spark2' - - '!hudi-spark-datasource/hudi-spark2-common' + - '!hudi-spark-datasource/hudi-spark3.2.x' + - '!hudi-spark-datasource/hudi-spark3.2plus-common' + - '!hudi-spark-datasource/hudi-spark3-common' - '!hudi-spark-datasource/hudi-spark-common' - name: job4FTModules type: object @@ -90,13 +92,10 @@ parameters: - '!hudi-flink-datasource/hudi-flink1.17.x' variables: - BUILD_PROFILES: '-Dscala-2.11 -Dspark2.4 -Dflink1.17' + BUILD_PROFILES: '-Dscala-2.12 -Dspark3.2 -Dflink1.17' PLUGIN_OPTS: '-Dcheckstyle.skip=true -Drat.skip=true -Djacoco.skip=true -ntp -B -V -Pwarn-log -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn' MVN_OPTS_INSTALL: '-Phudi-platform-service -DskipTests $(BUILD_PROFILES) $(PLUGIN_OPTS)' MVN_OPTS_TEST: '-fae -Pwarn-log $(BUILD_PROFILES) $(PLUGIN_OPTS)' - SPARK_VERSION: '2.4.4' - HADOOP_VERSION: '2.7' - SPARK_ARCHIVE: spark-$(SPARK_VERSION)-bin-hadoop$(HADOOP_VERSION) JOB1_MODULES: ${{ join(',',parameters.job1Modules) }} JOB2_MODULES: ${{ join(',',parameters.job2Modules) }} JOB3_MODULES: ${{ join(',',parameters.job3UTModules) }} @@ -220,39 +219,3 @@ stages: - script: | grep "testcase" */target/surefire-reports/*.xml */*/target/surefire-reports/*.xml | awk -F'"' ' { print $6,$4,$2 } ' | sort -nr | head -n 100 displayName: Top 100 long-running testcases - - job: IT - displayName: IT modules - timeoutInMinutes: '150' - steps: - - task: Maven@4 - displayName: maven install - inputs: - mavenPomFile: 'pom.xml' - goals: 'clean install' - options: $(MVN_OPTS_INSTALL) -Pintegration-tests - publishJUnitResults: false - jdkVersionOption: '1.8' - - task: Maven@4 - displayName: UT integ-test - inputs: - mavenPomFile: 'pom.xml' - goals: 'test' - options: $(MVN_OPTS_TEST) -Pintegration-tests -DskipUTs=false -DskipITs=true -pl hudi-integ-test - publishJUnitResults: false - jdkVersionOption: '1.8' - mavenOptions: '-Xmx4g' - - task: AzureCLI@2 - displayName: Prepare for IT - inputs: - azureSubscription: apachehudici-service-connection - scriptType: bash - scriptLocation: inlineScript - inlineScript: | - echo 'Downloading $(SPARK_ARCHIVE)' - az storage blob download -c ci-caches -n $(SPARK_ARCHIVE).tgz -f $(Pipeline.Workspace)/$(SPARK_ARCHIVE).tgz --account-name apachehudici - tar -xvf $(Pipeline.Workspace)/$(SPARK_ARCHIVE).tgz -C $(Pipeline.Workspace)/ - mkdir /tmp/spark-events/ - - script: | - export SPARK_HOME=$(Pipeline.Workspace)/$(SPARK_ARCHIVE) - mvn $(MVN_OPTS_TEST) -Pintegration-tests verify - displayName: IT diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml index 633497494e14c..8de4b51e9d3f2 100644 --- a/hudi-cli/pom.xml +++ b/hudi-cli/pom.xml @@ -246,6 +246,16 @@ org.apache.spark spark-core_${scala.binary.version} + + + org.apache.hadoop + hadoop-client-api + + + org.apache.hadoop + hadoop-client-runtime + + org.apache.spark diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java index 70d2ebfec034a..33dc03e004e8f 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -39,6 +39,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; + +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -198,10 +200,10 @@ public void testWriteReadHFileWithMetaFields(boolean populateMetaFields, boolean } } + @Disabled("Disable the test with evolved schema for HFile since it's not supported") + @ParameterizedTest @Override - @Test - public void testWriteReadWithEvolvedSchema() throws Exception { - // Disable the test with evolved schema for HFile since it's not supported + public void testWriteReadWithEvolvedSchema(String evolvedSchemaPath) throws Exception { // TODO(HUDI-3683): fix the schema evolution for HFile } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java index fde1315a34d5c..86859ea7ca16e 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java @@ -20,8 +20,8 @@ package org.apache.hudi.io.storage; import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.avro.Schema; @@ -34,6 +34,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; @@ -143,20 +145,19 @@ public void testWriteReadComplexRecord() throws Exception { verifyComplexRecords(createReader(conf).getRecordIterator()); } - @Test - public void testWriteReadWithEvolvedSchema() throws Exception { + @ParameterizedTest + @ValueSource(strings = { + "/exampleEvolvedSchema.avsc", + "/exampleEvolvedSchemaChangeOrder.avsc", + "/exampleEvolvedSchemaColumnRequire.avsc", + "/exampleEvolvedSchemaColumnType.avsc", + "/exampleEvolvedSchemaDeleteColumn.avsc" + }) + public void testWriteReadWithEvolvedSchema(String evolvedSchemaPath) throws Exception { writeFileWithSimpleSchema(); - Configuration conf = new Configuration(); HoodieAvroFileReader hoodieReader = createReader(conf); - String[] schemaList = new String[] { - "/exampleEvolvedSchema.avsc", "/exampleEvolvedSchemaChangeOrder.avsc", - "/exampleEvolvedSchemaColumnRequire.avsc", "/exampleEvolvedSchemaColumnType.avsc", - "/exampleEvolvedSchemaDeleteColumn.avsc"}; - - for (String evolvedSchemaPath : schemaList) { - verifyReaderWithSchema(evolvedSchemaPath, hoodieReader); - } + verifyReaderWithSchema(evolvedSchemaPath, hoodieReader); } @Test @@ -182,7 +183,7 @@ protected void writeFileWithSimpleSchema() throws Exception { writer.close(); } - protected void writeFileWithSchemaWithMeta() throws Exception { + private void writeFileWithSchemaWithMeta() throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchemaWithMetaFields.avsc"); HoodieAvroFileWriter writer = createWriter(avroSchema, true); for (int i = 0; i < NUM_RECORDS; i++) { @@ -209,7 +210,7 @@ protected void verifySimpleRecords(Iterator> iterato } } - protected void verifyComplexRecords(Iterator> iterator) { + private void verifyComplexRecords(Iterator> iterator) { int index = 0; while (iterator.hasNext()) { GenericRecord record = (GenericRecord) iterator.next().getData(); @@ -259,13 +260,15 @@ private void verifyRecord(String schemaPath, GenericRecord record, int index) { String numStr = String.format("%02d", index); assertEquals("key" + numStr, record.get("_row_key").toString()); assertEquals(Integer.toString(index), record.get("time").toString()); - if ("/exampleEvolvedSchemaColumnType.avsc".equals(schemaPath)) { + if (schemaPath.equals("/exampleEvolvedSchemaColumnType.avsc")) { assertEquals(Integer.toString(index), record.get("number").toString()); - } else if ("/exampleEvolvedSchemaDeleteColumn.avsc".equals(schemaPath)) { - assertNull(record.get("number")); + assertNull(record.getSchema().getField("added_field")); + } else if (schemaPath.equals("/exampleEvolvedSchemaDeleteColumn.avsc")) { + assertNull(record.getSchema().getField("number")); + assertNull(record.getSchema().getField("added_field")); } else { assertEquals(index, record.get("number")); + assertNull(record.get("added_field")); } - assertNull(record.get("added_field")); } } diff --git a/hudi-client/hudi-spark-client/pom.xml b/hudi-client/hudi-spark-client/pom.xml index 16aeed7b4a2a9..4ed64ee4c56a1 100644 --- a/hudi-client/hudi-spark-client/pom.xml +++ b/hudi-client/hudi-spark-client/pom.xml @@ -59,6 +59,16 @@ org.apache.spark spark-core_${scala.binary.version} + + + org.apache.hadoop + hadoop-client-api + + + org.apache.hadoop + hadoop-client-runtime + + org.apache.spark diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java index 6a34bff433250..178097f2241ad 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java @@ -114,6 +114,7 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness @BeforeAll public static void init() throws Exception { // Initialize HbaseMiniCluster + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); hbaseConfig = HBaseConfiguration.create(); hbaseConfig.set(ZOOKEEPER_ZNODE_PARENT, "/hudi-hbase-test"); diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index bcf3b0fee89b3..ae63a48343275 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -15,7 +15,8 @@ See the License for the specific language governing permissions and limitations under the License. --> - + hudi org.apache.hudi @@ -174,23 +175,11 @@ provided - - org.apache.hadoop - hadoop-common - tests - test - org.apache.hadoop hadoop-hdfs provided - - org.apache.hadoop - hadoop-hdfs - tests - test - org.apache.hudi @@ -255,12 +244,12 @@ - - - commons-io - commons-io - ${commons.io.version} - + + + commons-io + commons-io + ${commons.io.version} + diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/ZookeeperTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/ZookeeperTestService.java index e98b2d8cda340..bed846393ccfd 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/ZookeeperTestService.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/ZookeeperTestService.java @@ -163,6 +163,7 @@ private static void setupTestEnv() { // resulting in test failure (client timeout on first session). // set env and directly in order to handle static init/gc issues System.setProperty("zookeeper.preAllocSize", "100"); + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); FileTxnLog.setPreallocSize(100 * 1024); } diff --git a/hudi-examples/bin/hudi-delta-streamer b/hudi-examples/bin/hudi-delta-streamer index 352d2a515ec8a..475a962216b8a 100755 --- a/hudi-examples/bin/hudi-delta-streamer +++ b/hudi-examples/bin/hudi-delta-streamer @@ -19,8 +19,8 @@ EXAMPLES_DIR="$(dirname $(dirname "${BASH_SOURCE[0]}"))" PROJECT_DIR="$(dirname ${EXAMPLES_DIR})" -JAR_FILE=`ls ${PROJECT_DIR}/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_*.jar | grep -v sources | grep -v tests.jar` -EXAMPLES_JARS=`ls ${PROJECT_DIR}/hudi-examples/target/hudi-examples-*.jar | grep -v sources | grep -v tests.jar` +JAR_FILE=`ls ${PROJECT_DIR}/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_*.jar | grep -v sources | grep -v javadoc | grep -v tests.jar` +EXAMPLES_JARS=`ls ${PROJECT_DIR}/hudi-examples/target/hudi-examples-*.jar | grep -v sources | grep -v javadoc | grep -v tests.jar` if [ -z "${SPARK_MASTER}" ]; then SPARK_MASTER="yarn-cluster" diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml index f41700b2adbae..eb2a8c70436a3 100644 --- a/hudi-hadoop-mr/pom.xml +++ b/hudi-hadoop-mr/pom.xml @@ -15,7 +15,8 @@ See the License for the specific language governing permissions and limitations under the License. --> - + hudi org.apache.hudi @@ -101,18 +102,6 @@ test-jar test - - org.apache.hadoop - hadoop-common - tests - test - - - org.apache.hadoop - hadoop-hdfs - tests - test - diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml index 5cf429d4077e4..f79d26f65ece4 100644 --- a/hudi-integ-test/pom.xml +++ b/hudi-integ-test/pom.xml @@ -49,6 +49,12 @@ docker-java 3.1.2 test + + + io.netty + * + + @@ -68,30 +74,22 @@ org.apache.spark - spark-sql_${scala.binary.version} + spark-core_${scala.binary.version} - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - - org.eclipse.jetty - * + org.apache.hadoop + hadoop-client-api - org.apache.curator - * + org.apache.hadoop + hadoop-client-runtime + + org.apache.spark + spark-sql_${scala.binary.version} + org.apache.spark @@ -100,6 +98,14 @@ test + + + org.apache.parquet + parquet-avro + ${parquet.version} + test + + org.apache.hudi @@ -129,6 +135,14 @@ ${project.version} provided + + org.apache.hadoop + hadoop-client-api + + + org.apache.hadoop + hadoop-client-runtime + org.mortbay.jetty * @@ -233,6 +247,14 @@ test-jar test + + org.apache.hadoop + hadoop-client-api + + + org.apache.hadoop + hadoop-client-runtime + org.mortbay.jetty * @@ -296,50 +318,6 @@ test - - - - org.apache.hadoop - hadoop-common - tests - test - - - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - - - - - org.apache.hadoop - hadoop-hdfs - tests - test - - - javax.servlet - * - - - netty - io.netty - - - netty-all - io.netty - - - - ${hive.groupid} @@ -389,13 +367,6 @@ trino-jdbc - - org.scalatest - scalatest_${scala.binary.version} - ${scalatest.version} - test - - diff --git a/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh b/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh index ba5eb6ed56521..461276b40bcad 100755 --- a/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh +++ b/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh @@ -23,7 +23,7 @@ function error_exit { DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" #Ensure we pick the right jar even for hive11 builds -HUDI_JAR=`ls -c $DIR/../../packaging/hudi-spark-bundle/target/hudi-spark*-bundle*.jar | grep -v sources | head -1` +HUDI_JAR=`ls -c $DIR/../../packaging/hudi-spark-bundle/target/hudi-spark*-bundle*.jar | grep -v sources | grep -v javadoc | head -1` if [ -z "$HADOOP_CONF_DIR" ]; then echo "setting hadoop conf dir" diff --git a/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh b/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh index 15c6c0d48cc2e..9dcb712ed0b0e 100755 --- a/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh +++ b/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh @@ -23,7 +23,7 @@ function error_exit { DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" #Ensure we pick the right jar even for hive11 builds -HUDI_JAR=`ls -c $DIR/../../packaging/hudi-spark-bundle/target/hudi-spark*-bundle*.jar | grep -v sources | head -1` +HUDI_JAR=`ls -c $DIR/../../packaging/hudi-spark-bundle/target/hudi-spark*-bundle*.jar | grep -v sources | grep -v javadoc | head -1` if [ -z "$HADOOP_CONF_DIR" ]; then echo "setting hadoop conf dir" diff --git a/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh b/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh index 0501ff8f43bde..3fa96e2dfc60b 100755 --- a/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh +++ b/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh @@ -23,7 +23,7 @@ function error_exit { DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" #Ensure we pick the right jar even for hive11 builds -HUDI_JAR=`ls -c $DIR/../../packaging/hudi-spark-bundle/target/hudi-spark*-bundle*.jar | grep -v sources | head -1` +HUDI_JAR=`ls -c $DIR/../../packaging/hudi-spark-bundle/target/hudi-spark*-bundle*.jar | grep -v sources | grep -v javadoc | head -1` if [ -z "$HADOOP_CONF_DIR" ]; then echo "setting hadoop conf dir" diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index eaf1839d5dc7a..a62ef840ab941 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -238,7 +238,6 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with ("rider,driver", 14167), ("rider,driver,tip_history", 14167)) else if (HoodieSparkUtils.isSpark2) - // TODO re-enable tests (these tests are very unstable currently) Array( ("rider", 14160), ("rider,driver", 14160), diff --git a/hudi-spark-datasource/hudi-spark3-common/pom.xml b/hudi-spark-datasource/hudi-spark3-common/pom.xml index 97420c21332dd..e921e15998ed9 100644 --- a/hudi-spark-datasource/hudi-spark3-common/pom.xml +++ b/hudi-spark-datasource/hudi-spark3-common/pom.xml @@ -233,6 +233,18 @@ test-jar test + + org.apache.spark + spark-core_${scala.binary.version} + ${spark3.version} + tests + test + + + org.apache.parquet + parquet-avro + test + diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java similarity index 94% rename from hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java rename to hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java index 96b06937504f1..206d4931b15e1 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.spark3.internal; diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java similarity index 95% rename from hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java rename to hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java index 176b67bbe98f4..31d606de4a1ef 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java +++ b/hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.spark3.internal; @@ -70,7 +71,7 @@ private static Stream bulkInsertTypeParams() { @ParameterizedTest @MethodSource("bulkInsertTypeParams") public void testDataSourceWriter(boolean populateMetaFields) throws Exception { - testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP, populateMetaFields); + testDataSourceWriterInternal(Collections.emptyMap(), Collections.emptyMap(), populateMetaFields); } private void testDataSourceWriterInternal(Map extraMetadata, Map expectedExtraMetadata, boolean populateMetaFields) throws Exception { @@ -150,7 +151,7 @@ public void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception { extraMeta.put("keyB", "valB"); extraMeta.put("commit_extra_c", "valC"); // none of the keys has commit metadata key prefix. - testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP, true); + testDataSourceWriterInternal(extraMeta, Collections.emptyMap(), true); } @ParameterizedTest @@ -166,7 +167,7 @@ public void testMultipleDataSourceWrites(boolean populateMetaFields) throws Exce String instantTime = "00" + i; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, populateMetaFields, false); + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.emptyMap(), populateMetaFields, false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); @@ -213,7 +214,7 @@ public void testLargeWrites(boolean populateMetaFields) throws Exception { String instantTime = "00" + i; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, populateMetaFields, false); + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.emptyMap(), populateMetaFields, false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); @@ -261,7 +262,7 @@ public void testAbort(boolean populateMetaFields) throws Exception { String instantTime0 = "00" + 0; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, populateMetaFields, false); + new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.emptyMap(), populateMetaFields, false); DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); @@ -299,7 +300,7 @@ public void testAbort(boolean populateMetaFields) throws Exception { // 2nd batch. abort in the end String instantTime1 = "00" + 1; dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, populateMetaFields, false); + new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.emptyMap(), populateMetaFields, false); writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, RANDOM.nextLong()); for (int j = 0; j < batches; j++) { diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java b/hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java similarity index 92% rename from hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java rename to hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java index 0d1867047847b..075e4242cb006 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java +++ b/hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java @@ -23,10 +23,14 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.plans.logical.InsertIntoStatement; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Collections; + +import static scala.collection.JavaConverters.asScalaBuffer; + + /** * Unit tests {@link ReflectUtil}. */ @@ -42,7 +46,7 @@ public void testDataSourceWriterExtraCommitMetadata() throws Exception { InsertIntoStatement newStatment = ReflectUtil.createInsertInto( statement.table(), statement.partitionSpec(), - scala.collection.immutable.List.empty(), + asScalaBuffer(Collections.emptyList()).toSeq(), statement.query(), statement.overwrite(), statement.ifPartitionNotExists()); diff --git a/hudi-spark-datasource/hudi-spark3.0.x/pom.xml b/hudi-spark-datasource/hudi-spark3.0.x/pom.xml index 432b37a2d1ecd..5d141ba0b5fd2 100644 --- a/hudi-spark-datasource/hudi-spark3.0.x/pom.xml +++ b/hudi-spark-datasource/hudi-spark3.0.x/pom.xml @@ -152,18 +152,8 @@ org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark30.version} - - - slf4j-log4j12 - org.slf4j - - - log4j - log4j - - true @@ -273,17 +263,6 @@ - - - org.junit.jupiter - junit-jupiter-api - test - - - org.junit.jupiter - junit-jupiter-params - test - diff --git a/hudi-spark-datasource/hudi-spark3.1.x/pom.xml b/hudi-spark-datasource/hudi-spark3.1.x/pom.xml index 4cf72458be612..6d4c8cb7e428f 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/pom.xml +++ b/hudi-spark-datasource/hudi-spark3.1.x/pom.xml @@ -152,7 +152,7 @@ org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark31.version} true diff --git a/hudi-spark-datasource/hudi-spark3.2.x/pom.xml b/hudi-spark-datasource/hudi-spark3.2.x/pom.xml index c6a1072f04f95..03d0bc73134be 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/pom.xml +++ b/hudi-spark-datasource/hudi-spark3.2.x/pom.xml @@ -296,12 +296,6 @@ tests test-jar test - - - org.apache.spark - * - - @@ -320,12 +314,11 @@ tests test-jar test - - - org.apache.spark - * - - + + + org.apache.parquet + parquet-avro + test diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/pom.xml b/hudi-spark-datasource/hudi-spark3.2plus-common/pom.xml index e49389349fa2a..f18afb84f6a28 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/pom.xml +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/pom.xml @@ -160,7 +160,7 @@ org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark3.version} provided true @@ -218,6 +218,18 @@ test-jar test + + org.apache.spark + spark-core_${scala.binary.version} + ${spark3.version} + tests + test + + + org.apache.parquet + parquet-avro + test + diff --git a/hudi-spark-datasource/hudi-spark3.3.x/pom.xml b/hudi-spark-datasource/hudi-spark3.3.x/pom.xml index 809bf2765bf63..d3a442d25073a 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/pom.xml +++ b/hudi-spark-datasource/hudi-spark3.3.x/pom.xml @@ -300,6 +300,18 @@ test-jar test + + org.apache.spark + spark-core_${scala.binary.version} + ${spark3.version} + tests + test + + + org.apache.parquet + parquet-avro + test + diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml index 74265bb50ddaf..56de4ffd416db 100644 --- a/hudi-sync/hudi-hive-sync/pom.xml +++ b/hudi-sync/hudi-hive-sync/pom.xml @@ -15,7 +15,8 @@ See the License for the specific language governing permissions and limitations under the License. --> - + hudi org.apache.hudi @@ -86,18 +87,6 @@ org.apache.hadoop hadoop-auth - - org.apache.hadoop - hadoop-common - tests - test - - - org.apache.hadoop - hadoop-hdfs - tests - test - @@ -156,6 +145,16 @@ org.apache.spark spark-core_${scala.binary.version} test + + + org.apache.hadoop + hadoop-client-api + + + org.apache.hadoop + hadoop-client-runtime + + diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index ad61d6e0d30a0..de9861c9d9840 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -188,7 +188,9 @@ public static void shutdown() throws IOException { if (zkServer != null) { zkServer.shutdown(true); } - fileSystem.close(); + if (fileSystem != null) { + fileSystem.close(); + } } public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata, diff --git a/hudi-tests-common/pom.xml b/hudi-tests-common/pom.xml index 01fb4982d0105..201826f7567c1 100644 --- a/hudi-tests-common/pom.xml +++ b/hudi-tests-common/pom.xml @@ -186,10 +186,6 @@ javax.servlet * - - io.netty - * - diff --git a/hudi-timeline-service/pom.xml b/hudi-timeline-service/pom.xml index bdc16f44608d7..b38089e7e4f3e 100644 --- a/hudi-timeline-service/pom.xml +++ b/hudi-timeline-service/pom.xml @@ -15,7 +15,8 @@ See the License for the specific language governing permissions and limitations under the License. --> - + hudi org.apache.hudi @@ -130,47 +131,6 @@ - - org.apache.hadoop - hadoop-hdfs - tests - test - - - - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - - - - org.apache.hadoop - hadoop-common - tests - test - - - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - - org.apache.hadoop hadoop-client diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 533f951c67712..6341181c9e456 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -232,6 +232,14 @@ org.apache.spark spark-core_${scala.binary.version} + + org.apache.hadoop + hadoop-client-api + + + org.apache.hadoop + hadoop-client-runtime + javax.servlet * @@ -267,6 +275,16 @@ org.apache.spark spark-streaming-kafka-0-10_${scala.binary.version} ${spark.version} + + + org.apache.hadoop + hadoop-client-api + + + org.apache.hadoop + hadoop-client-runtime + + org.apache.spark diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java index 67dfe74fe39c3..8c0b3eb244230 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; import org.junit.jupiter.api.io.TempDir; import java.io.File; @@ -166,6 +167,7 @@ public void testPullerWithoutSourceInSql() throws IOException, URISyntaxExceptio } @Test + @EnabledIf(value = "org.apache.hudi.HoodieSparkUtils#isSpark2", disabledReason = "Disable due to hive not support avro 1.10.2.") public void testPuller() throws IOException, URISyntaxException { createTables(); HiveIncrementalPuller.Config cfg = getHivePullerConfig("select name from testdb.test1 where `_hoodie_commit_time` > '%s'"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 2a2c3b2ea24d7..f6b70356ec213 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -158,6 +158,7 @@ import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; +import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -1767,7 +1768,7 @@ private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic } } HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA))); + testUtils.sendMessages(topicName, jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA), numPartitions)); } private void testParquetDFSSource(boolean useSchemaProvider, List transformerClassNames) throws Exception { @@ -1946,8 +1947,8 @@ public void testJsonKafkaDFSSource() throws Exception { @Test public void testJsonKafkaDFSSourceWithOffsets() throws Exception { topicName = "topic" + testNum; - int numRecords = 15; - int numPartitions = 3; + int numRecords = 30; + int numPartitions = 2; int recsPerPartition = numRecords / numPartitions; long beforeTime = Instant.now().toEpochMilli(); prepareJsonKafkaDFSFiles(numRecords, true, topicName, numPartitions); @@ -2315,7 +2316,7 @@ public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() th testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); }, "Should error out when doing the transformation."); LOG.debug("Expected error during transformation", e); - assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:")); + assertTrue(e.getMessage().contains("cannot resolve 'begin_lat' given input columns:")); } @Test diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java index f75620bbf11de..9cc61292213f3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java @@ -159,8 +159,8 @@ public void testAppendKafkaOffsetsSourceFormatAdapter() throws IOException { UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>()); props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", ByteArrayDeserializer.class.getName()); - int numPartitions = 3; - int numMessages = 15; + int numPartitions = 2; + int numMessages = 30; testUtils.createTopic(topic,numPartitions); sendMessagesToKafka(topic, numMessages, numPartitions); AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 016f251b927d5..072e20c4c4741 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -63,6 +63,7 @@ import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords; +import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -200,7 +201,7 @@ public void testJsonKafkaSourceWithConfigurableUpperCap() { @Override void sendMessagesToKafka(String topic, int count, int numPartitions) { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); + testUtils.sendMessages(topic, jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000", count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA), numPartitions)); } void sendJsonSafeMessagesToKafka(String topic, int count, int numPartitions) { @@ -308,30 +309,35 @@ public boolean upsertAndCommit(String baseTableInstantTime, Option commitedInsta @Test public void testAppendKafkaOffset() { final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend"; - int numPartitions = 3; - int numMessages = 15; + int numPartitions = 2; + int numMessages = 30; testUtils.createTopic(topic, numPartitions); sendMessagesToKafka(topic, numMessages, numPartitions); TypedProperties props = createPropsForKafkaSource(topic, null, "earliest"); Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); - Dataset c = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get(); - assertEquals(numMessages, c.count()); - List columns = Arrays.stream(c.columns()).collect(Collectors.toList()); + Dataset dfNoOffsetInfo = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get().cache(); + assertEquals(numMessages, dfNoOffsetInfo.count()); + List columns = Arrays.stream(dfNoOffsetInfo.columns()).collect(Collectors.toList()); props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true"); jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); kafkaSource = new SourceFormatAdapter(jsonSource); - Dataset d = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get(); - assertEquals(numMessages, d.count()); + Dataset dfWithOffsetInfo = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get().cache(); + assertEquals(numMessages, dfWithOffsetInfo.count()); for (int i = 0; i < numPartitions; i++) { - assertEquals(numMessages / numPartitions, d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size()); + assertEquals(numMessages / numPartitions, dfWithOffsetInfo.filter("_hoodie_kafka_source_partition=" + i).count()); } - assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN).except(c).count()); - List withKafkaOffsetColumns = Arrays.stream(d.columns()).collect(Collectors.toList()); + assertEquals(0, dfWithOffsetInfo + .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN) + .except(dfNoOffsetInfo).count()); + List withKafkaOffsetColumns = Arrays.stream(dfWithOffsetInfo.columns()).collect(Collectors.toList()); assertEquals(3, withKafkaOffsetColumns.size() - columns.size()); List appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN); assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size())); + + dfNoOffsetInfo.unpersist(); + dfWithOffsetInfo.unpersist(); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index e7b5a0ab5eeab..e3d2ec5a60287 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -151,7 +151,7 @@ public void testGetNextOffsetRangesFromMultiplePartitions() { public void testGetNextOffsetRangesFromGroup() { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); testUtils.createTopic(testTopicName, 2); - testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.sendMessages(testTopicName, Helpers.jsonifyRecordsByPartitions(dataGenerator.generateInserts("000", 1000), 2)); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string")); String lastCheckpointString = testTopicName + ",0:250,1:249"; kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 697757f1b4314..f23a0419bd432 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -56,12 +56,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hive.service.server.HiveServer2; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetFileWriter.Mode; import org.apache.parquet.hadoop.ParquetWriter; @@ -85,6 +85,8 @@ import java.util.List; import java.util.Properties; +import scala.Tuple2; + import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER; @@ -435,6 +437,16 @@ public static String[] jsonifyRecords(List records) { return records.stream().map(Helpers::toJsonString).toArray(String[]::new); } + public static Tuple2[] jsonifyRecordsByPartitions(List records, int partitions) { + Tuple2[] data = new Tuple2[records.size()]; + for (int i = 0; i < records.size(); i++) { + int key = i % partitions; + String value = Helpers.toJsonString(records.get(i)); + data[i] = new Tuple2<>(Long.toString(key), value); + } + return data; + } + private static void addAvroRecord( VectorizedRowBatch batch, GenericRecord record, diff --git a/pom.xml b/pom.xml index b14a8ac7ae1b2..123f767e953f7 100644 --- a/pom.xml +++ b/pom.xml @@ -93,15 +93,15 @@ 1.8 4.0.2 - 2.6.7 - 2.6.7.3 - 2.6.7.1 - 2.7.4 2.10.0 + ${fasterxml.spark3.version} + ${fasterxml.spark3.version} + ${fasterxml.spark3.version} + ${fasterxml.spark3.version} 2.0.0 - 2.4.1 + 2.8.0 2.8.1 - ${pulsar.spark.scala11.version} + ${pulsar.spark.scala12.version} 2.4.5 3.1.1.4 5.3.4 @@ -129,7 +129,7 @@ 0.16 0.8.0 4.4.1 - ${spark2.version} + ${spark3.version} 2.4.4 3.3.1 @@ -155,28 +155,29 @@ flink-connector-kafka flink-hadoop-compatibility_2.12 5.17.2 - 3.1.3 3.0.2 + 3.1.3 3.2.3 3.3.1 - hudi-spark2 + hudi-spark3.2.x - hudi-spark2-common - + hudi-spark3-common + hudi-spark3.2plus-common 1.8.2 2.9.1 2.11.0 2.11.12 2.12.10 - ${scala11.version} + ${scala12.version} 2.8.1 - 2.11 + 2.12 0.13 3.3.1 - 3.0.1 + 3.0.1 3.1.0 + ${scalatest.spark3.version} log4j2-surefire.properties 0.12.0 4.6.7 @@ -207,7 +208,7 @@ true 2.7.1 3.4.2 - 4.7 + 4.8 1.12.22 3.21.7 3.21.5 @@ -1663,6 +1664,10 @@ org.apache.logging.log4j * + + org.junit.jupiter + * + @@ -1960,27 +1965,6 @@ - - org.jacoco - jacoco-maven-plugin - - - - prepare-agent - - - - post-integration-tests - test - - report - - - ${project.reporting.outputDirectory}/jacoco-it - - - - @@ -2093,12 +2077,18 @@ - scala-2.11 + ${scala11.version} + 2.11 ${pulsar.spark.scala11.version} + + + scala-2.11 + + scala-2.12 @@ -2140,7 +2130,7 @@ - + spark2 @@ -2149,15 +2139,31 @@ hudi-spark-datasource/hudi-spark2-common - true + ${spark2.version} + 2.4 + ${scala11.version} + 2.11 + ${scalatest.spark_pre31.version} + hudi-spark2 + hudi-spark2-common + + 2.0.0 + 1.10.1 + 1.6.0 1.8.2 + 4.7 + 2.6.7 + 2.6.7.3 + 2.6.7.1 + 2.7.4 + ${pulsar.spark.scala11.version} + false + true + true - true spark2 - - !disabled @@ -2169,9 +2175,27 @@ hudi-spark-datasource/hudi-spark2-common + ${spark2.version} 2.4 - true + ${scala11.version} + 2.11 + ${scalatest.spark_pre31.version} + hudi-spark2 + hudi-spark2-common + + 2.0.0 + 1.10.1 + 1.6.0 1.8.2 + 4.7 + 2.6.7 + 2.6.7.3 + 2.6.7.1 + 2.7.4 + ${pulsar.spark.scala11.version} + false + true + false @@ -2204,11 +2228,11 @@ 2.12.15 ${scala12.version} 2.12 + ${scalatest.spark3.version} hudi-spark3.3.x hudi-spark3-common hudi-spark3.2plus-common - ${scalatest.spark3.version} ${kafka.spark3.version} hudi-spark3-common hudi-spark3.2plus-common - ${scalatest.spark3.version} ${kafka.spark3.version} hudi-spark3-common hudi-spark3.2plus-common - ${scalatest.spark3.version} ${kafka.spark3.version}