Skip to content

Conversation

@boneanxs
Copy link
Contributor

@boneanxs boneanxs commented Jan 13, 2022

Tips

What is the purpose of the pull request

When using structure streaming in spark3, the error would throw:

Exception in thread "stream execution thread for testHudi [id = 463b211d-50a1-46b2-b304-589a9e1d05bb, runId = 636aaf61-f360-4ada-80b4-353d77e52ccb]" java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.hudi.streaming.HoodieSourceOffset$
	at org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.json(HoodieSourceOffset.scala:30)
	at org.apache.spark.sql.connector.read.streaming.Offset.toString(Offset.java:64)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.spark.sql.execution.streaming.StreamProgress.$anonfun$toString$1(StreamProgress.scala:39)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.streaming.StreamProgress.toString(StreamProgress.scala:39)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.spark.sql.execution.streaming.StreamExecution.toDebugString(StreamExecution.scala:595)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:352)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

This is the incompatible issue of ScalaObjectMapper under scala_2.11 and scala_2.12, so simply remove this.

before the patch:

scala> import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.table.timeline.HoodieTimeline

scala> import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset

scala>   val INIT_OFFSET: HoodieSourceOffset = HoodieSourceOffset(HoodieTimeline.INIT_INSTANT_TS)
java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper.$init$(Lcom/fasterxml/jackson/module/scala/experimental/ScalaObjectMapper;)V
  at org.apache.spark.sql.hudi.streaming.HoodieSourceOffset$$anon$1.<init>(HoodieSourceOffset.scala:48)
  at org.apache.spark.sql.hudi.streaming.HoodieSourceOffset$.<init>(HoodieSourceOffset.scala:48)
  at org.apache.spark.sql.hudi.streaming.HoodieSourceOffset$.<clinit>(HoodieSourceOffset.scala)
  at org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.json(HoodieSourceOffset.scala:30)
  at org.apache.spark.sql.connector.read.streaming.Offset.toString(Offset.java:64)
  at scala.runtime.ScalaRunTime$.inner$1(ScalaRunTime.scala:254)
  at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:259)
  at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:267)
  at .$print$lzycompute(<console>:9)
  at .$print(<console>:6)
  at $print(<console>)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
  at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
  at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
  at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
  at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
  at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
  at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
  at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:894)
  at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:762)
  at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:464)
  at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:485)
  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:239)
  at org.apache.spark.repl.Main$.doMain(Main.scala:78)
  at org.apache.spark.repl.Main$.main(Main.scala:58)
  at org.apache.spark.repl.Main.main(Main.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
  at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
  at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
  at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
  at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

After this patch

scala> import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.table.timeline.HoodieTimeline

scala> import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset

scala>   val INIT_OFFSET: HoodieSourceOffset = HoodieSourceOffset(HoodieTimeline.INIT_INSTANT_TS)
INIT_OFFSET: org.apache.spark.sql.hudi.streaming.HoodieSourceOffset = {"commitTime":"00000000000000"}

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@boneanxs
Copy link
Contributor Author

Hi, @xushiyan, can you review this fix?

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@boneanxs
Copy link
Contributor Author

@xushiyan @leesf @zhedoubushishi gentle ping...

mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)

lazy val mapper: ObjectMapper = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we just need to remove the with ScalaObjectMapper in line#48 before?

Copy link
Contributor Author

@boneanxs boneanxs Jan 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as scalaObjectMapper we can replaced with ClassTag, we can safely and simply solve the conflict issue with different scala version(or Jackson-scala version to be more accurate)


def fromJson(json: String): HoodieSourceOffset = {
mapper.readValue[HoodieSourceOffset](json)
mapper.readValue(json, classOf[HoodieSourceOffset])
Copy link
Contributor

@leesf leesf Jan 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the change necessary? I was thinking the meaning after the changing is same as before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After remove ScalaObjectMapper, we need ClassTag to readValue.

case class HoodieSourceOffset(commitTime: String) extends Offset {

override def json(): String = {
override val json: String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also the change is necessary? would you please clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just is an improve change, json no need to recalculate when calling this again, as it's value can not be changed.

@leesf leesf merged commit f184474 into apache:master Jan 18, 2022
@vinishjail97 vinishjail97 mentioned this pull request Jan 24, 2022
5 tasks
vingov pushed a commit to vingov/hudi that referenced this pull request Jan 26, 2022
liusenhua pushed a commit to liusenhua/hudi that referenced this pull request Mar 1, 2022
vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants