Skip to content

Conversation

@bschell
Copy link
Contributor

@bschell bschell commented Jun 23, 2020

Modifies use of spark apis for compatibility with both spark2 and spark3

What is the purpose of the pull request

Updates spark apis and allows compatibility with both spark3/spark2

Verify this pull request

Existing tests verify the functionality for spark2.
Manual tests for spark3 have been performed on EMR clusters.
You can verify by building Hudi successfully with spark v3.0.0

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@bschell bschell changed the title [HUDI-1040] Update apis for spark3 [HUDI-1040] Update apis for spark3 compatibility Jun 23, 2020
@vinothchandar vinothchandar self-assigned this Jun 23, 2020
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how expensive would this be in practice? any thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we can use SparkContext.version or something to figure it out upfront..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think checking spark version would be a good idea to prevent the failed call everytime, let me look into it. I don't believe the reflection performance is significant as a whole, especially if we can figure out the spark version upfront. We do something similar here:
9f12843

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was concerned about the exception thrown each time.. in the case you mentioned, its obtaining the description once.. this would be in the fast path.. Anyways.. lets hope the version thing is feasible..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think org.apache.spark.SPARK_VERSION could help us out here. I've implemented it in this commit: sbernauer@a4f1866. But I noticed that it had a huge performance impact (I assume due to the reflection)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to change the method to something like this

import org.apache.spark.SPARK_VERSION

  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
    // TODO remove reflection if Spark 2.x support is dropped
    if (SPARK_VERSION.startsWith("2.")) {
      val spark2method = encoder.getClass.getMethod("fromRow", classOf[InternalRow])
      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
    } else {
      val deserializer = encoder.getClass.getMethod("createDeserializer").invoke(encoder)
      val aboveSpark2method = deserializer.getClass.getMethod("apply", classOf[InternalRow])
      aboveSpark2method.invoke(deserializer, internalRow).asInstanceOf[Row]
    }
  }

Copy link
Contributor Author

@bschell bschell Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I will try making the same change as that commit you linked but I am going to see if I can test the performance impact first.

If you have any more details on how it affected performance please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit with all the patches (including better reflection) is this here: sbernauer@0dd7172

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan are you able to take a swing at this for 0.6.0? this would be good to have

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbernauer 's check is better I think.

On the reflection itself, @bschell would it help if we cached the method objects, instead of recreating everytime. have you attempted anything like that?

@vinothchandar
Copy link
Member

@bschell is this tested and ready to go? would like to get it into the RC if possible

@bschell
Copy link
Contributor Author

bschell commented Aug 11, 2020

@vinothchandar While this works, the reflection does hurt performance as this is a frequently used path. I was looking into any better options to workaround the performance hit.

@nsivabalan
Copy link
Contributor

nsivabalan commented Aug 11, 2020

@bschell @vinothchandar : I gave it a shot on this. I don't have permission to push to your branch to update this PR.
Diff1: adding support to spark 3. but does not upgrade spark version in pom.xml : #1950
Diff2: also upgrades spark version to 3.0.0 : #1951
Diff2 results in compilation failure as one of the classes that Hoodie uses(SparkHadoopUtil) is not accessible anymore.

@bschell
Copy link
Contributor Author

bschell commented Aug 12, 2020

@nsivabalan Looks like a recent commit made this Class private: apache/spark@ce2cdc3

However I think we can workaround this by porting the code for those methods over to Hudi (it's not too long).
For example:

spark3.txt

@nsivabalan
Copy link
Contributor

@bschell : cool. I will let you drive the patch then. I gave it a shot thinking if we can get it to 0.6.0. But we couldn't make it. So, will let you drive this.

@vinothchandar
Copy link
Member

@bschell this is a heavily requested feature. are you still working on this? :)

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bschell would something like below (I think we need to lazily initialize the methods instead of serializing from the driver, for this to actually work), where we just fetch the methods upfront help.

def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
  : RDD[GenericRecord] = {
    // Use the Avro schema to derive the StructType which has the correct nullability information
    val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
    val encoder = RowEncoder.apply(dataType).resolveAndBind()
    val spark2method = if (SPARK_VERSION.startsWith("2.")) {
      encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
    } else {
      null
    }

    val spark3method = encoder.getClass.getMethods.filter(method => method.getName.equals("createDeserializer")).last
    val deserializer = spark3method.invoke(encoder)
    val applyMethod = deserializer.getClass.getMethods.filter(method => method.getName.equals("apply")).last

    df.queryExecution
      .toRdd.map[Row](internalRow => {
        if (spark2method != null) {
          spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
        } else {
          applyMethod.invoke(deserializer, internalRow).asInstanceOf[Row]
        }
      }).mapPartitions { records =>
        if (records.isEmpty) Iterator.empty
        else {
          val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
          records.map { x => convertor(x).asInstanceOf[GenericRecord] }
        }
      }
  }

@vinothchandar
Copy link
Member

May not work actually

class A {
    def doSomething(arg: Int) = {
    }
  }

  @Test def testReflectionPerformance(): Unit = {
    // warmup the jvm and ignore.
    val a = new A()
    (1 to 100000000).foreach(_ => a.doSomething(1))

    val normalStartMs = System.currentTimeMillis();
    (1 to 100000000).foreach(_ => a.doSomething(1))
    println(s"Normal elapsed : ${System.currentTimeMillis() - normalStartMs}")

    val reflectStartMs = System.currentTimeMillis();
    val method = a.getClass.getMethods.filter(method => method.getName.equals("doSomething")).last
    (1 to 100000000).foreach(_ => method.invoke(a, 1.asInstanceOf[Object]))
    println(s"Reflect elapsed : ${System.currentTimeMillis() - reflectStartMs}")
  }

gives the following results

Normal elapsed : 30
Reflect elapsed : 312

let's abandon the reflection approach altogether. Either we munge the APIs somehow or just make a separate module for spark3 . e.g hudi-spark3 with its own bundling. This may not be a bad choice per se. wdyt

@vinothchandar
Copy link
Member

@bschell here's a path forward.

  • We can create an abstraction for deserializing : RowDeserializer and which implements row. deser differently based on spark 2 and spark 3.
  • Spark 2 impl lives in hudi-spark and we pick the deser impl based on spark version within hudi-spark itself using reflection. Note that we will create the instance alone this way, i.e reflection used only for initing the deserializer object
  • lets create a new module hudi-spark3 which contain just the one class that implementation the deserialization using spark 3 apis, we will override the spark version to 3 for that module alone. i.e this class alone gets compiled against spark 3. (this needs to be confirmed)
  • we include hudi-spark3 in the spark bundle. Spark is anyway a provided dependency, so as long as we invoke the spark3 deserializer only with the spark 3 jars around, things should work well.

@bschell
Copy link
Contributor Author

bschell commented Sep 3, 2020

@vinothchandar Thank you for the suggestions! I agree with this approach and will try to update this PR to match.

Add the new separate spark3 module to handle the reflection required to
support both spark2 & spark3 together.
@bschell
Copy link
Contributor Author

bschell commented Sep 18, 2020

Seems like travis doesn't have the hudi_spark_2.12 bundle available, only the 2.11. Any suggestions?

@vinothchandar
Copy link
Member

@bschell our spark install may be 2.11 on these images. As for hudi_spark_2.12 bundle, if we run integ-test with 2_12, I think it would happen automatically?
@bvaradar should we switch to 2.12 and repush the images?

@bvaradar
Copy link
Contributor

bvaradar commented Sep 21, 2020

@bschell : For running integration tests with hudi packages built with scala 2.12, we just need to change scripts/run_travis_tests.sh. The docker container should automatically load those jars for running integration tests.

index 63fb959c..b77b4f64 100755
--- a/scripts/run_travis_tests.sh
+++ b/scripts/run_travis_tests.sh
@@ -35,7 +35,7 @@ elif [ "$mode" = "integration" ]; then
   export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion}
   mkdir /tmp/spark-events/
   echo "Running Integration Tests"
-  mvn verify -Pintegration-tests -B
+  mvn verify -Pintegration-tests -Dscala-2.12 -B
 else
   echo "Unknown mode $mode"
   exit 1

@bvaradar
Copy link
Contributor

@bschell : Did you try the above change I mentioned ? Let me know

@bschell
Copy link
Contributor Author

bschell commented Sep 28, 2020

Looks like only integ tests are failing, but I'm not sure of the error. Will take a closer look.

@bschell
Copy link
Contributor Author

bschell commented Sep 30, 2020

In our testing we've found some larger scale issues with this approach and conflicts with hudi-0.6.0 refactors. Mainly with spark datasource api interface changes, need to reevaluate this.

@vinothchandar
Copy link
Member

@bschell if you could expand on them, we can hash out a solution.

@bschell
Copy link
Contributor Author

bschell commented Sep 30, 2020

@vinothchandar We are compiling the issues right now. I will update here once we are finished later today/tomorrow

@aniejo
Copy link

aniejo commented Oct 7, 2020

@vinothchandar Has Hudi been tested for compatibility with spark v3.0.0 ? did a mvn build but pyspark throws a dependency error - org.apache.hudi#hudi-spark-bundle_2.12;0.6.1: not found

@vinothchandar
Copy link
Member

@aniejo there are some known issues since some spark APIs have changed in 3.

@bschell any updates for us? This is being requested heavily, love to do this sooner if possible. are we blocked on something?

@aniejo
Copy link

aniejo commented Oct 12, 2020

@vinothchandar @bschell appreciate any guidance regarding this error, I am using COW hudi options on Spark 3.0
pyspark --packages org.apache.hudi:hudi-spark-bundle_2.12:0.6.1-SNAPSHOT,org.apache.spark:spark-avro_2.12:3.0.1

df.write.format("hudi").
options(**hudi_options).
mode("overwrite").
save(basePath)

On writing df as hudi parquet , it throws the below error :

to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, 6049cd42243a, executor driver): java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
at org.apache.hudi.AvroConversionHelper$.createConverterToAvro(AvroConversionHelper.scala:344)
at org.apache.hudi.AvroConversionUtils$$anonfun$2.apply(AvroConversionUtils.scala:50)
at org.apache.hudi.AvroConversionUtils$$anonfun$2.apply(AvroConversionUtils.scala:47)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1423)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.take(RDD.scala:1396)
at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1531)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1531)
at org.apache.spark.api.java.JavaRDDLike.isEmpty(JavaRDDLike.scala:544)
at org.apache.spark.api.java.JavaRDDLike.isEmpty$(JavaRDDLike.scala:544)
at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:162)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:125)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

@giaosudau
Copy link
Contributor

giaosudau commented Oct 14, 2020

Got hive class error

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient;
	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3600)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3652)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3632)
	at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3894)
	at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
	at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
	at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:388)
	at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332)
	at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312)
	at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288)
	at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:260)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:286)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
	at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:389)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:137)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:127)
	at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:157)
	at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:155)
	at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:59)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:93)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:93)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:206)
	at org.apache.spark.sql.execution.command.CreateDatabaseCommand.run(ddl.scala:81)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
	at bb.gtd.de.CanalJsonToOds$.initSchema(CanalJsonToOds.scala:81)
	at bb.gtd.de.CanalJsonToOds$.delayedEndpoint$bb$gtd$de$CanalJsonToOds$1(CanalJsonToOds.scala:42)
	at bb.gtd.de.CanalJsonToOds$delayedInit$body.apply(CanalJsonToOds.scala:11)
	at scala.Function0.apply$mcV$sp(Function0.scala:39)
	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
	at scala.App.$anonfun$main$1$adapted(App.scala:80)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at scala.App.main(App.scala:80)
	at scala.App.main$(App.scala:78)
	at bb.gtd.de.CanalJsonToOds$.main(CanalJsonToOds.scala:11)
	at bb.gtd.de.CanalJsonToOds.main(CanalJsonToOds.scala)
20/10/14 15:18:12 INFO SparkContext: Invoking stop() from shutdown hook
20/10/14 15:18:12 INFO AbstractConnector: Stopped Spark@64b31700{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
20/10/14 15:18:12 INFO SparkUI: Stopped Spark web UI at http://192.168.200.57:4041
20/10/14 15:18:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/10/14 15:18:12 INFO MemoryStore: MemoryStore cleared
20/10/14 15:18:12 INFO BlockManager: BlockManager stopped
20/10/14 15:18:12 INFO BlockManagerMaster: BlockManagerMaster stopped
20/10/14 15:18:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/10/14 15:18:12 INFO SparkContext: Successfully stopped SparkContext
20/10/14 15:18:12 INFO ShutdownHookManager: Shutdown hook called
20/10/14 15:18:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/vv/5d3clfpj22q_c12ghwdnpfl80000gn/T/spark-a0c47cb6-1512-4f8c-8d10-38e58796fed6
Disconnected from the target VM, address: '127.0.0.1:50019', transport: 'socket'

@zhedoubushishi
Copy link
Contributor

zhedoubushishi commented Oct 19, 2020

@bschell @vinothchandar to make clear, just wondering what is the exact goal for this pr? Do we want to make Hudi support both compile & run with spark 3 or we want to make Hudi compile with spark 2 and then run with spark3?

Ideally we should make Hudi both compile and run with Spark3. But current code change cannot compile with spark 3.

Run

mvn clean install -DskipTests -DskipITs -Dspark.version=3.0.0 -Pscala-2.12

returns

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile (default-testCompile) on project hudi-client: Compilation failure
[ERROR] /Users/wenningd/workplace/hudi/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java:[146,27] cannot find symbol
[ERROR]   symbol:   method toRow(org.apache.spark.sql.Row)
[ERROR]   location: variable encoder of type org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
[ERROR] 

@vinothchandar
Copy link
Member

we want to make Hudi compile with spark 2 and then run with spark3?

this was the intention. but as @bschell pointed out some classes have changed and we need to make parts of hudi-spark modular and plugin spark version specific implementations.

@zhedoubushishi
Copy link
Contributor

zhedoubushishi commented Oct 20, 2020

Got hive class error

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient;
	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3600)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3652)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3632)
	at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3894)
	at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
	at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
	at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:388)
	at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332)
	at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312)
	at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288)
	at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:260)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:286)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
	at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:389)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:137)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:127)
	at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:157)
	at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:155)
	at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:59)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:93)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:93)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:206)
	at org.apache.spark.sql.execution.command.CreateDatabaseCommand.run(ddl.scala:81)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
	at bb.gtd.de.CanalJsonToOds$.initSchema(CanalJsonToOds.scala:81)
	at bb.gtd.de.CanalJsonToOds$.delayedEndpoint$bb$gtd$de$CanalJsonToOds$1(CanalJsonToOds.scala:42)
	at bb.gtd.de.CanalJsonToOds$delayedInit$body.apply(CanalJsonToOds.scala:11)
	at scala.Function0.apply$mcV$sp(Function0.scala:39)
	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
	at scala.App.$anonfun$main$1$adapted(App.scala:80)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at scala.App.main(App.scala:80)
	at scala.App.main$(App.scala:78)
	at bb.gtd.de.CanalJsonToOds$.main(CanalJsonToOds.scala:11)
	at bb.gtd.de.CanalJsonToOds.main(CanalJsonToOds.scala)
20/10/14 15:18:12 INFO SparkContext: Invoking stop() from shutdown hook
20/10/14 15:18:12 INFO AbstractConnector: Stopped Spark@64b31700{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
20/10/14 15:18:12 INFO SparkUI: Stopped Spark web UI at http://192.168.200.57:4041
20/10/14 15:18:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/10/14 15:18:12 INFO MemoryStore: MemoryStore cleared
20/10/14 15:18:12 INFO BlockManager: BlockManager stopped
20/10/14 15:18:12 INFO BlockManagerMaster: BlockManagerMaster stopped
20/10/14 15:18:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/10/14 15:18:12 INFO SparkContext: Successfully stopped SparkContext
20/10/14 15:18:12 INFO ShutdownHookManager: Shutdown hook called
20/10/14 15:18:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/vv/5d3clfpj22q_c12ghwdnpfl80000gn/T/spark-a0c47cb6-1512-4f8c-8d10-38e58796fed6
Disconnected from the target VM, address: '127.0.0.1:50019', transport: 'socket'

I suspect that this is because Spark 3.0.0 uses Hive 2.3.7 but Spark 2.x uses Hive 1.2.1.spark2 and this causes some API conflict.

Looks like this signature: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z) only exists in Hive 1.2.1.spark2: https://github.com/JoshRosen/hive/blob/release-1.2.1-spark2/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java#L101 but no longer exists in Hive 2.3.7: https://github.com/apache/hive/blob/rel/release-2.3.7/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java.

If we compile with Spark 2 and then run with Spark 3, we might run into this kind of issue.

But I didn't see any Hudi class in the error stack, can you provide more information about this error?

@zhedoubushishi
Copy link
Contributor

zhedoubushishi commented Oct 21, 2020

@bschell : For running integration tests with hudi packages built with scala 2.12, we just need to change scripts/run_travis_tests.sh. The docker container should automatically load those jars for running integration tests.

index 63fb959c..b77b4f64 100755
--- a/scripts/run_travis_tests.sh
+++ b/scripts/run_travis_tests.sh
@@ -35,7 +35,7 @@ elif [ "$mode" = "integration" ]; then
   export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion}
   mkdir /tmp/spark-events/
   echo "Running Integration Tests"
-  mvn verify -Pintegration-tests -B
+  mvn verify -Pintegration-tests -Dscala-2.12 -B
 else
   echo "Unknown mode $mode"
   exit 1

Is this a permanent change or we just try to run test here? @bschell @vinothchandar @bvaradar

I ran into a scala class not found error when running docker integ testing for Hudi:

20/10/21 00:04:17 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.JavaConversions$.deprecated$u0020asScalaIterator(Ljava/util/Iterator;)Lscala/collection/Iterator;
	at org.apache.hudi.IncrementalRelation.<init>(IncrementalRelation.scala:78)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:95)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:51)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at org.apache.hudi.utilities.sources.HoodieIncrSource.fetchNextBatch(HoodieIncrSource.java:122)
	at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
	at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:75)
	at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:68)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:364)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:253)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:163)
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:161)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:466)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I suspect that although Hudi is able to pick the hudi-spark_2.12-bundle.jar but since the docker environment still uses spark_2.11, so there's still some conflict between scala 2.11 & 2.12.

@zhedoubushishi
Copy link
Contributor

Found another runtime error when updating MOR table:

java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V
  at org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:72)
  at org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
  at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:87)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:51)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
  ... 57 elided

This is because the signature of org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init> is different between spark 2.4.4 & spark 3.0.0.

@nsivabalan
Copy link
Contributor

@bschell or others: I was just trying this patch out locally. My local compilation is failing w/ unapproved license. Do you also hit the same or wondering what am I missing. Thought the patch in itself should successfully compile.

mvn package -DskipTests

And its strange that I get unapproved licenses for iml files which I usually don't in my repo branches.

Files with unapproved licenses:

  /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_aug2020/hudi/hudi-client/hudi-flink-client/hudi-flink-client.iml
  /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_aug2020/hudi/hudi-client/hudi-client-common/hudi-client-common.iml
  /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_aug2020/hudi/hudi-client/hudi-spark-client/hudi-spark-client.iml

@nsivabalan
Copy link
Contributor

my bad didn't notice the other patch. I will check out the other one.

@umehrot2
Copy link
Contributor

umehrot2 commented Nov 3, 2020

The work for spark 3 has been moved over to #2208 . Hence closing this.

@umehrot2 umehrot2 closed this Nov 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants