Skip to content

Commit 9321379

Browse files
committed
Merge branch 'master' into issues/SPARK-3063
2 parents d8a900a + cc36487 commit 9321379

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1259
-2984
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ private[spark] class Executor(
9999
private val urlClassLoader = createClassLoader()
100100
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
101101

102+
// Set the classloader for serializer
103+
env.serializer.setDefaultClassLoader(urlClassLoader)
104+
102105
// Akka's message frame size. If task result is bigger than this, we use the block manager
103106
// to send the result back.
104107
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.netty
19+
20+
import java.util.concurrent.TimeUnit
21+
22+
import io.netty.bootstrap.Bootstrap
23+
import io.netty.channel.{Channel, ChannelOption, EventLoopGroup}
24+
import io.netty.channel.oio.OioEventLoopGroup
25+
import io.netty.channel.socket.oio.OioSocketChannel
26+
27+
import org.apache.spark.Logging
28+
29+
class FileClient(handler: FileClientHandler, connectTimeout: Int) extends Logging {
30+
31+
private var channel: Channel = _
32+
private var bootstrap: Bootstrap = _
33+
private var group: EventLoopGroup = _
34+
private val sendTimeout = 60
35+
36+
def init(): Unit = {
37+
group = new OioEventLoopGroup
38+
bootstrap = new Bootstrap
39+
bootstrap.group(group)
40+
.channel(classOf[OioSocketChannel])
41+
.option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
42+
.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
43+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(connectTimeout))
44+
.handler(new FileClientChannelInitializer(handler))
45+
}
46+
47+
def connect(host: String, port: Int) {
48+
try {
49+
channel = bootstrap.connect(host, port).sync().channel()
50+
} catch {
51+
case e: InterruptedException =>
52+
logWarning("FileClient interrupted while trying to connect", e)
53+
close()
54+
}
55+
}
56+
57+
def waitForClose(): Unit = {
58+
try {
59+
channel.closeFuture.sync()
60+
} catch {
61+
case e: InterruptedException =>
62+
logWarning("FileClient interrupted", e)
63+
}
64+
}
65+
66+
def sendRequest(file: String): Unit = {
67+
try {
68+
val bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS)
69+
if (!bSent) {
70+
throw new RuntimeException("Failed to send")
71+
}
72+
} catch {
73+
case e: InterruptedException =>
74+
logError("Error", e)
75+
}
76+
}
77+
78+
def close(): Unit = {
79+
if (group != null) {
80+
group.shutdownGracefully()
81+
group = null
82+
bootstrap = null
83+
}
84+
}
85+
}

core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala renamed to core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,17 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.storage
18+
package org.apache.spark.network.netty
1919

20-
import java.nio.ByteBuffer
20+
import io.netty.channel.ChannelInitializer
21+
import io.netty.channel.socket.SocketChannel
22+
import io.netty.handler.codec.string.StringEncoder
2123

2224

23-
/**
24-
* An interface for providing data for blocks.
25-
*
26-
* getBlockData returns either a FileSegment (for zero-copy send), or a ByteBuffer.
27-
*
28-
* Aside from unit tests, [[BlockManager]] is the main class that implements this.
29-
*/
30-
private[spark] trait BlockDataProvider {
31-
def getBlockData(blockId: String): Either[FileSegment, ByteBuffer]
25+
class FileClientChannelInitializer(handler: FileClientHandler)
26+
extends ChannelInitializer[SocketChannel] {
27+
28+
def initChannel(channel: SocketChannel) {
29+
channel.pipeline.addLast("encoder", new StringEncoder).addLast("handler", handler)
30+
}
3231
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.netty
19+
20+
import io.netty.buffer.ByteBuf
21+
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
22+
23+
import org.apache.spark.storage.BlockId
24+
25+
26+
abstract class FileClientHandler extends SimpleChannelInboundHandler[ByteBuf] {
27+
28+
private var currentHeader: FileHeader = null
29+
30+
@volatile
31+
private var handlerCalled: Boolean = false
32+
33+
def isComplete: Boolean = handlerCalled
34+
35+
def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader)
36+
37+
def handleError(blockId: BlockId)
38+
39+
override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) {
40+
if (currentHeader == null && in.readableBytes >= FileHeader.HEADER_SIZE) {
41+
currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE))
42+
}
43+
if (in.readableBytes >= currentHeader.fileLen) {
44+
handle(ctx, in, currentHeader)
45+
handlerCalled = true
46+
currentHeader = null
47+
ctx.close()
48+
}
49+
}
50+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.netty
19+
20+
import io.netty.buffer._
21+
22+
import org.apache.spark.Logging
23+
import org.apache.spark.storage.{BlockId, TestBlockId}
24+
25+
private[spark] class FileHeader (
26+
val fileLen: Int,
27+
val blockId: BlockId) extends Logging {
28+
29+
lazy val buffer: ByteBuf = {
30+
val buf = Unpooled.buffer()
31+
buf.capacity(FileHeader.HEADER_SIZE)
32+
buf.writeInt(fileLen)
33+
buf.writeInt(blockId.name.length)
34+
blockId.name.foreach((x: Char) => buf.writeByte(x))
35+
// padding the rest of header
36+
if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
37+
buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
38+
} else {
39+
throw new Exception("too long header " + buf.readableBytes)
40+
logInfo("too long header")
41+
}
42+
buf
43+
}
44+
45+
}
46+
47+
private[spark] object FileHeader {
48+
49+
val HEADER_SIZE = 40
50+
51+
def getFileLenOffset = 0
52+
def getFileLenSize = Integer.SIZE/8
53+
54+
def create(buf: ByteBuf): FileHeader = {
55+
val length = buf.readInt
56+
val idLength = buf.readInt
57+
val idBuilder = new StringBuilder(idLength)
58+
for (i <- 1 to idLength) {
59+
idBuilder += buf.readByte().asInstanceOf[Char]
60+
}
61+
val blockId = BlockId(idBuilder.toString())
62+
new FileHeader(length, blockId)
63+
}
64+
65+
def main(args:Array[String]) {
66+
val header = new FileHeader(25, TestBlockId("my_block"))
67+
val buf = header.buffer
68+
val newHeader = FileHeader.create(buf)
69+
System.out.println("id=" + newHeader.blockId + ",size=" + newHeader.fileLen)
70+
}
71+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.netty
19+
20+
import java.net.InetSocketAddress
21+
22+
import io.netty.bootstrap.ServerBootstrap
23+
import io.netty.channel.{ChannelFuture, ChannelOption, EventLoopGroup}
24+
import io.netty.channel.oio.OioEventLoopGroup
25+
import io.netty.channel.socket.oio.OioServerSocketChannel
26+
27+
import org.apache.spark.Logging
28+
29+
/**
30+
* Server that accept the path of a file an echo back its content.
31+
*/
32+
class FileServer(pResolver: PathResolver, private var port: Int) extends Logging {
33+
34+
private val addr: InetSocketAddress = new InetSocketAddress(port)
35+
private var bossGroup: EventLoopGroup = new OioEventLoopGroup
36+
private var workerGroup: EventLoopGroup = new OioEventLoopGroup
37+
38+
private var channelFuture: ChannelFuture = {
39+
val bootstrap = new ServerBootstrap
40+
bootstrap.group(bossGroup, workerGroup)
41+
.channel(classOf[OioServerSocketChannel])
42+
.option(ChannelOption.SO_BACKLOG, java.lang.Integer.valueOf(100))
43+
.option(ChannelOption.SO_RCVBUF, java.lang.Integer.valueOf(1500))
44+
.childHandler(new FileServerChannelInitializer(pResolver))
45+
bootstrap.bind(addr)
46+
}
47+
48+
try {
49+
val boundAddress = channelFuture.sync.channel.localAddress.asInstanceOf[InetSocketAddress]
50+
port = boundAddress.getPort
51+
} catch {
52+
case ie: InterruptedException =>
53+
port = 0
54+
}
55+
56+
/** Start the file server asynchronously in a new thread. */
57+
def start(): Unit = {
58+
val blockingThread: Thread = new Thread {
59+
override def run(): Unit = {
60+
try {
61+
channelFuture.channel.closeFuture.sync
62+
logInfo("FileServer exiting")
63+
} catch {
64+
case e: InterruptedException =>
65+
logError("File server start got interrupted", e)
66+
}
67+
// NOTE: bootstrap is shutdown in stop()
68+
}
69+
}
70+
blockingThread.setDaemon(true)
71+
blockingThread.start()
72+
}
73+
74+
def getPort: Int = port
75+
76+
def stop(): Unit = {
77+
if (channelFuture != null) {
78+
channelFuture.channel().close().awaitUninterruptibly()
79+
channelFuture = null
80+
}
81+
if (bossGroup != null) {
82+
bossGroup.shutdownGracefully()
83+
bossGroup = null
84+
}
85+
if (workerGroup != null) {
86+
workerGroup.shutdownGracefully()
87+
workerGroup = null
88+
}
89+
}
90+
}
91+
Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,20 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.network.netty.server
18+
package org.apache.spark.network.netty
1919

2020
import io.netty.channel.ChannelInitializer
2121
import io.netty.channel.socket.SocketChannel
22-
import io.netty.handler.codec.LineBasedFrameDecoder
22+
import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters}
2323
import io.netty.handler.codec.string.StringDecoder
24-
import io.netty.util.CharsetUtil
25-
import org.apache.spark.storage.BlockDataProvider
2624

27-
28-
/** Channel initializer that sets up the pipeline for the BlockServer. */
29-
private[netty]
30-
class BlockServerChannelInitializer(dataProvider: BlockDataProvider)
25+
class FileServerChannelInitializer(pResolver: PathResolver)
3126
extends ChannelInitializer[SocketChannel] {
3227

33-
override def initChannel(ch: SocketChannel): Unit = {
34-
ch.pipeline
35-
.addLast("frameDecoder", new LineBasedFrameDecoder(1024)) // max block id length 1024
36-
.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
37-
.addLast("blockHeaderEncoder", new BlockHeaderEncoder)
38-
.addLast("handler", new BlockServerHandler(dataProvider))
28+
override def initChannel(channel: SocketChannel): Unit = {
29+
channel.pipeline
30+
.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter : _*))
31+
.addLast("stringDecoder", new StringDecoder)
32+
.addLast("handler", new FileServerHandler(pResolver))
3933
}
4034
}

0 commit comments

Comments
 (0)