Skip to content

Conversation

@vicennial
Copy link
Contributor

What changes were proposed in this pull request?

This PR fixs the bug where invalid JAR URIs were being generated because the URI was stored as artifactURI + "/" + target.toString (here, target is the absolute path of the file) instead of artifactURI + "/" + remoteRelativePath.toString (here, the remoteRelativePath is in the form of jars/...)

Why are the changes needed?

Without this change, Spark Connect users attempting to use a custom JAR (such as in UDFs) will hit task failure issue as an exception would be thrown during the JAR file fetch operation.
Example stacktrace:

23/07/03 17:00:15 INFO Executor: Fetching spark://ip-10-110-22-170.us-west-2.compute.internal:43743/artifacts/d9548b02-ff3b-4278-ab52-aef5d1fc724e//home/venkata.gudesa/spark/artifacts/spark-d6141194-c487-40fd-ba40-444d922808ea/d9548b02-ff3b-4278-ab52-aef5d1fc724e/jars/TestHelloV2.jar with timestamp 0
23/07/03 17:00:15 ERROR Executor: Exception in task 6.0 in stage 4.0 (TID 55)
java.lang.RuntimeException: Stream '/artifacts/d9548b02-ff3b-4278-ab52-aef5d1fc724e//home/venkata.gudesa/spark/artifacts/spark-d6141194-c487-40fd-ba40-444d922808ea/d9548b02-ff3b-4278-ab52-aef5d1fc724e/jars/TestHelloV2.jar' was not found.
	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:260)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Does this PR introduce any user-facing change?

No (the bug-fix is consistent with what users expect)

How was this patch tested?

New E2E test in ReplE2ESuite.

|}
|val classLoaderUdf = udf(classLoadingTest _)
|
|val jarPath = Paths.get("$sparkHome/connector/connect/client/jvm/src/test/resources/TestHelloV2.jar").toUri
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there must be a better way to pass in the JAR path here. Open to suggestions!

// scalastyle:off classforname line.size.limit
val sparkHome = IntegrationTestUtils.sparkHome
val testJar = Paths
.get(s"$sparkHome/connector/connect/client/jvm/src/test/resources/TestHelloV2.jar")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine but for doubly sure, does it need Scala 2.13 jar too, @LuciferYang ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, should wait #41852

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#41852 merged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've updated the code to account for the scala version

@vicennial vicennial requested a review from HyukjinKwon July 4, 2023 21:28
@HyukjinKwon
Copy link
Member

Merged to master.

LuciferYang added a commit that referenced this pull request Sep 26, 2023
…ry files

### What changes were proposed in this pull request?
The purpose of this pr is to clean up the binary files used to assist with Scala 2.12 testing.

They include:
- `core/src/test/resources/TestHelloV3_2.12.jar` and `core/src/test/resources/TestHelloV2_2.12.jar` added by SPARK-44246(#41789).
- `connector/connect/client/jvm/src/test/resources/udf2.12` and `connector/connect/client/jvm/src/test/resources/udf2.12.jar` added by SPARK-43744(#42069)
- `connector/connect/client/jvm/src/test/resources/TestHelloV2_2.12.jar` added by SPARK-44293(#41844)
- `sql/hive/src/test/resources/regression-test-SPARK-8489/test-2.12.jar` added by SPARK-25304(#22308)

### Why are the changes needed?
Spark 4.0 no longer supports Scala 2.12.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43106 from LuciferYang/SPARK-45321.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants