From 04455669b76e7cc7e6676432223ebed563a0e257 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 25 Mar 2019 16:14:31 -0700 Subject: [PATCH 1/5] Various changes to the API and state assumptions in writers. Proposes the following changes to the API: - closeAndGetLength() is split into separate close() and getNumBytesWritten() operations. - openChannel and openStream renamed to toChannel and toStream Proposes the following changes to the implementation: - close() in the default implementation now persists the length in the partitionLengths array - getNumBytesWritten() doesn't necessitate the writer's resources to be closed ahead of it - Don't close the stream in BypassMergeSortShuffleWriter - only close it in DefaultShufflePartitionWriter#close (for consistency with how we treat channels) --- .../api/shuffle/ShufflePartitionWriter.java | 42 ++++++++++++++-- .../sort/BypassMergeSortShuffleWriter.java | 40 +++++++-------- .../io/DefaultShuffleMapOutputWriter.java | 50 ++++++++++--------- .../spark/rpc/netty/NettyStreamManager.scala | 2 +- .../DefaultShuffleMapOutputWriterSuite.scala | 16 +++--- .../catalyst/expressions/inputFileBlock.scala | 2 +- 6 files changed, 93 insertions(+), 59 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java index 4fb48b414e3ab..925102b40fbbf 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java @@ -17,6 +17,7 @@ package org.apache.spark.api.shuffle; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.nio.channels.Channels; @@ -31,12 +32,43 @@ * @since 3.0.0 */ @Experimental -public interface ShufflePartitionWriter { - OutputStream openStream() throws IOException; +public interface ShufflePartitionWriter extends Closeable { - long closeAndGetLength(); + /** + * Returns an underlying {@link OutputStream} that can write bytes to the underlying data store. + *

+ * Note that this stream itself is not closed by the caller; close the stream in + * the implementation of this class's {@link #close()}.. + */ + OutputStream toStream() throws IOException; - default WritableByteChannel openChannel() throws IOException { - return Channels.newChannel(openStream()); + /** + * Returns an underlying {@link WritableByteChannel} that can write bytes to the underlying data + * store. + *

+ * Note that this channel itself is not closed by the caller; close the stream in + * the implementation of this class's {@link #close()}.. + */ + default WritableByteChannel toChannel() throws IOException { + return Channels.newChannel(toStream()); } + + /** + * Get the number of bytes written by this writer's stream returned by {@link #toStream()} or + * the channel returned by {@link #toChannel()}. + */ + long getNumBytesWritten(); + + /** + * Close all resources created by this ShufflePartitionWriter, via calls to {@link #toStream()} + * or {@link #toChannel()}. + *

+ * This must always close any stream returned by {@link #toStream()}. + *

+ * Note that the default version of {@link #toChannel()} returns a {@link WritableByteChannel} + * that does not itself need to be closed up front; only the underlying output stream given by + * {@link #toStream()} must be closed. + */ + @Override + void close() throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index ca0445155e88c..efffd161d2bad 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -130,7 +130,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { public void write(Iterator> records) throws IOException { assert (partitionWriters == null); ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport - .createMapOutputWriter(shuffleId, mapId, numPartitions); + .createMapOutputWriter(shuffleId, mapId, numPartitions); try { if (!records.hasNext()) { partitionLengths = new long[numPartitions]; @@ -144,11 +144,11 @@ public void write(Iterator> records) throws IOException { partitionWriterSegments = new FileSegment[numPartitions]; for (int i = 0; i < numPartitions; i++) { final Tuple2 tempShuffleBlockIdPlusFile = - blockManager.diskBlockManager().createTempShuffleBlock(); + blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = - blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be @@ -202,20 +202,20 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); boolean copyThrewException = true; - ShufflePartitionWriter writer = mapOutputWriter.getNextPartitionWriter(); - if (transferToEnabled) { - WritableByteChannel outputChannel = writer.openChannel(); - if (file.exists()) { - FileInputStream in = new FileInputStream(file); - try (FileChannel inputChannel = in.getChannel()){ - Utils.copyFileStreamNIO(inputChannel, outputChannel, 0, inputChannel.size()); - copyThrewException = false; - } finally { - Closeables.close(in, copyThrewException); + try (ShufflePartitionWriter writer = mapOutputWriter.getNextPartitionWriter()) { + if (transferToEnabled) { + WritableByteChannel outputChannel = writer.toChannel(); + if (file.exists()) { + FileInputStream in = new FileInputStream(file); + try (FileChannel inputChannel = in.getChannel()) { + Utils.copyFileStreamNIO(inputChannel, outputChannel, 0, inputChannel.size()); + copyThrewException = false; + } finally { + Closeables.close(in, copyThrewException); + } } - } - } else { - try (OutputStream tempOutputStream = writer.openStream()) { + } else { + OutputStream tempOutputStream = writer.toStream(); if (file.exists()) { FileInputStream in = new FileInputStream(file); try { @@ -226,10 +226,10 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro } } } - } - lengths[i] = writer.closeAndGetLength(); - if (file.exists() && !file.delete()) { - logger.error("Unable to delete file for partition {}", i); + lengths[i] = writer.getNumBytesWritten(); + if (file.exists() && !file.delete()) { + logger.error("Unable to delete file for partition {}", i); + } } } } finally { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index 73b058bdd5236..b47d088e9b443 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -73,7 +73,7 @@ public DefaultShuffleMapOutputWriter( } @Override - public ShufflePartitionWriter getNextPartitionWriter() throws IOException { + public ShufflePartitionWriter getNextPartitionWriter() { return new DefaultShufflePartitionWriter(currPartitionId++); } @@ -97,7 +97,7 @@ public void commitAllPartitions() throws IOException { } @Override - public void abort(Throwable error) throws IOException { + public void abort(Throwable error) { try { cleanUp(); } catch (Exception e) { @@ -107,7 +107,7 @@ public void abort(Throwable error) throws IOException { log.warn("Failed to delete temporary shuffle file at {}", outputTempFile.getAbsolutePath()); } if (!outputFile.delete() && outputFile.exists()) { - log.warn("Failed to delete outputshuffle file at {}", outputFile.getAbsolutePath()); + log.warn("Failed to delete output shuffle file at {}", outputFile.getAbsolutePath()); } } @@ -154,42 +154,42 @@ private DefaultShufflePartitionWriter(int partitionId) { } @Override - public OutputStream openStream() throws IOException { + public OutputStream toStream() throws IOException { initStream(); stream = new PartitionWriterStream(); return stream; } @Override - public long closeAndGetLength() { + public FileChannel toChannel() throws IOException { + initChannel(); + currChannelPosition = outputFileChannel.position(); + return outputFileChannel; + } + + @Override + public long getNumBytesWritten() { if (outputFileChannel != null && stream == null) { try { long newPosition = outputFileChannel.position(); - long length = newPosition - currChannelPosition; - partitionLengths[partitionId] = length; - currChannelPosition = newPosition; - return length; + return newPosition - currChannelPosition; } catch (Exception e) { - log.error("The currPartition is: " + partitionId, e); - throw new IllegalStateException("Attempting to calculate position of file channel", e); + log.error("The currPartition is: {}", partitionId, e); + throw new IllegalStateException("Failed to calculate position of file channel", e); } + } else if (stream != null) { + return stream.getCount(); } else { - try { - stream.close(); - } catch (Exception e) { - throw new IllegalStateException("Attempting to close output stream", e); - } - int length = stream.getCount(); - partitionLengths[partitionId] = length; - return length; + return 0; } } @Override - public FileChannel openChannel() throws IOException { - initChannel(); - currChannelPosition = outputFileChannel.position(); - return outputFileChannel; + public void close() throws IOException { + if (stream != null) { + stream.close(); + } + partitionLengths[partitionId] = getNumBytesWritten(); } } @@ -218,7 +218,9 @@ public void close() throws IOException { @Override public void flush() throws IOException { - outputBufferedFileStream.flush(); + if (!isClosed) { + outputBufferedFileStream.flush(); + } } } } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index 780fadd5bda8e..648aeaffc26a8 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils * - arbitrary directories; all files under the directory become available through the manager, * respecting the directory's hierarchy. * - * Only streaming (openStream) is supported. + * Only streaming (toStream) is supported. */ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) extends StreamManager with RpcEnvFileServer { diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala index 63f61697ab0b9..c26803a788032 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala @@ -134,13 +134,13 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft test("writing to an outputstream") { (0 until NUM_PARTITIONS).foreach{ p => val writer = mapOutputWriter.getNextPartitionWriter - val stream = writer.openStream() + val stream = writer.toStream() data(p).foreach { i => stream.write(i)} stream.close() intercept[IllegalStateException] { stream.write(p) } - assert(writer.closeAndGetLength() == D_LEN) + assert(writer.getNumBytesWritten() == D_LEN) } mapOutputWriter.commitAllPartitions() val partitionLengths = (0 until NUM_PARTITIONS).map { _ => D_LEN.toDouble}.toArray @@ -152,14 +152,14 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft test("writing to a channel") { (0 until NUM_PARTITIONS).foreach{ p => val writer = mapOutputWriter.getNextPartitionWriter - val channel = writer.openChannel() + val channel = writer.toChannel() val byteBuffer = ByteBuffer.allocate(D_LEN * 4) val intBuffer = byteBuffer.asIntBuffer() intBuffer.put(data(p)) assert(channel.isOpen) channel.write(byteBuffer) // Bytes require * 4 - assert(writer.closeAndGetLength == D_LEN * 4) + assert(writer.getNumBytesWritten == D_LEN * 4) } mapOutputWriter.commitAllPartitions() val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray @@ -171,7 +171,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft test("copyStreams with an outputstream") { (0 until NUM_PARTITIONS).foreach{ p => val writer = mapOutputWriter.getNextPartitionWriter - val stream = writer.openStream() + val stream = writer.toStream() val byteBuffer = ByteBuffer.allocate(D_LEN * 4) val intBuffer = byteBuffer.asIntBuffer() intBuffer.put(data(p)) @@ -179,7 +179,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft Utils.copyStream(in, stream, false, false) in.close() stream.close() - assert(writer.closeAndGetLength == D_LEN * 4) + assert(writer.getNumBytesWritten == D_LEN * 4) } mapOutputWriter.commitAllPartitions() val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray @@ -191,7 +191,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft test("copyStreamsWithNIO with a channel") { (0 until NUM_PARTITIONS).foreach{ p => val writer = mapOutputWriter.getNextPartitionWriter - val channel = writer.openChannel() + val channel = writer.toChannel() val byteBuffer = ByteBuffer.allocate(D_LEN * 4) val intBuffer = byteBuffer.asIntBuffer() intBuffer.put(data(p)) @@ -201,7 +201,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft val in = new FileInputStream(tempFile) Utils.copyFileStreamNIO(in.getChannel, channel, 0, D_LEN * 4) in.close() - assert(writer.closeAndGetLength == D_LEN * 4) + assert(writer.getNumBytesWritten == D_LEN * 4) } mapOutputWriter.commitAllPartitions() val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala index 3b0141ad52cc7..a4e59bc4eb110 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala @@ -91,6 +91,6 @@ case class InputFileBlockLength() extends LeafExpression with Nondeterministic { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val className = InputFileBlockHolder.getClass.getName.stripSuffix("$") val typeDef = s"final ${CodeGenerator.javaType(dataType)}" - ev.copy(code = code"$typeDef ${ev.value} = $className.getLength();", isNull = FalseLiteral) + ev.copy(code = code"$typeDef ${ev.value} = $className.getNumBytesWritten();", isNull = FalseLiteral) } } From dd4656cc0c2320d23a2d618f86b3c69ae56ca8be Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 25 Mar 2019 16:34:09 -0700 Subject: [PATCH 2/5] Hack around kernel bug --- .../scala/org/apache/spark/util/Utils.scala | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2ad63d86237e5..a9f3c73e90ab8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -340,7 +340,11 @@ private[spark] object Utils extends Logging { output: WritableByteChannel, startPosition: Long, bytesToCopy: Long): Unit = { -// val initialPos = output.position() + val outputInitialState = output match { + case outputFileChannel: FileChannel => + Some((outputFileChannel.position(), outputFileChannel)) + case _ => None + } var count = 0L // In case transferTo method transferred less data than we have required. while (count < bytesToCopy) { @@ -349,21 +353,23 @@ private[spark] object Utils extends Logging { assert(count == bytesToCopy, s"request to copy $bytesToCopy bytes, but actually copied $count bytes.") -// // Check the position after transferTo loop to see if it is in the right position and -// // give user information if not. -// // Position will not be increased to the expected length after calling transferTo in -// // kernel version 2.6.32, this issue can be seen in -// // https://bugs.openjdk.java.net/browse/JDK-7052359 -// // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). -// val finalPos = output.position() -// val expectedPos = initialPos + bytesToCopy -// assert(finalPos == expectedPos, -// s""" -// |Current position $finalPos do not equal to expected position $expectedPos -// |after transferTo, please check your kernel version to see if it is 2.6.32, -// |this is a kernel bug which will lead to unexpected behavior when using transferTo. -// |You can set spark.file.transferTo = false to disable this NIO feature. -// """.stripMargin) + // Check the position after transferTo loop to see if it is in the right position and + // give user information if not. + // Position will not be increased to the expected length after calling transferTo in + // kernel version 2.6.32, this issue can be seen in + // https://bugs.openjdk.java.net/browse/JDK-7052359 + // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). + outputInitialState.foreach { case (initialPos, outputFileChannel) => + val finalPos = outputFileChannel.position() + val expectedPos = initialPos + bytesToCopy + assert(finalPos == expectedPos, + s""" + |Current position $finalPos do not equal to expected position $expectedPos + |after transferTo, please check your kernel version to see if it is 2.6.32, + |this is a kernel bug which will lead to unexpected behavior when using transferTo. + |You can set spark.file.transferTo = false to disable this NIO feature. + """.stripMargin) + } } /** From d032041ad688c74943bde077d09c2ba6b83c39a5 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 25 Mar 2019 16:47:12 -0700 Subject: [PATCH 3/5] Set length only after closing --- .../spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index efffd161d2bad..9381cb2650132 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -202,7 +202,9 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); boolean copyThrewException = true; - try (ShufflePartitionWriter writer = mapOutputWriter.getNextPartitionWriter()) { + ShufflePartitionWriter writer = null; + try { + writer = mapOutputWriter.getNextPartitionWriter(); if (transferToEnabled) { WritableByteChannel outputChannel = writer.toChannel(); if (file.exists()) { @@ -226,11 +228,14 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro } } } - lengths[i] = writer.getNumBytesWritten(); if (file.exists() && !file.delete()) { logger.error("Unable to delete file for partition {}", i); } + } finally { + Closeables.close(writer, copyThrewException); } + + lengths[i] = writer.getNumBytesWritten(); } } finally { writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); From 5a62c67b11fb6558e96abf50b4cf452d2ca21206 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 25 Mar 2019 16:59:46 -0700 Subject: [PATCH 4/5] Flush partition writers in test --- .../shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala index c26803a788032..d6f06efa1fa56 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala @@ -141,6 +141,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft stream.write(p) } assert(writer.getNumBytesWritten() == D_LEN) + writer.close } mapOutputWriter.commitAllPartitions() val partitionLengths = (0 until NUM_PARTITIONS).map { _ => D_LEN.toDouble}.toArray @@ -160,6 +161,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft channel.write(byteBuffer) // Bytes require * 4 assert(writer.getNumBytesWritten == D_LEN * 4) + writer.close } mapOutputWriter.commitAllPartitions() val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray @@ -180,6 +182,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft in.close() stream.close() assert(writer.getNumBytesWritten == D_LEN * 4) + writer.close } mapOutputWriter.commitAllPartitions() val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray @@ -202,6 +205,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft Utils.copyFileStreamNIO(in.getChannel, channel, 0, D_LEN * 4) in.close() assert(writer.getNumBytesWritten == D_LEN * 4) + writer.close } mapOutputWriter.commitAllPartitions() val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray From d21feb2948012974d2b4e2776bc4f03dd3bffaae Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 25 Mar 2019 17:01:43 -0700 Subject: [PATCH 5/5] Revert rewording --- .../scala/org/apache/spark/rpc/netty/NettyStreamManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index 648aeaffc26a8..780fadd5bda8e 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils * - arbitrary directories; all files under the directory become available through the manager, * respecting the directory's hierarchy. * - * Only streaming (toStream) is supported. + * Only streaming (openStream) is supported. */ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) extends StreamManager with RpcEnvFileServer {