diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 67f64d7962035..3b5e1bcde2564 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -117,21 +117,35 @@ public static Pair parseStreamChunkId(String streamChunkId) { @Override public void connectionTerminated(Channel channel) { + RuntimeException failedToReleaseBufferException = null; + // Close all streams which have been associated with the channel. for (Map.Entry entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { streams.remove(entry.getKey()); - // Release all remaining buffers. - while (state.buffers.hasNext()) { - ManagedBuffer buffer = state.buffers.next(); - if (buffer != null) { - buffer.release(); + try { + // Release all remaining buffers. + while (state.buffers.hasNext()) { + ManagedBuffer buffer = state.buffers.next(); + if (buffer != null) { + buffer.release(); + } + } + } catch (RuntimeException e) { + if (failedToReleaseBufferException == null) { + failedToReleaseBufferException = e; + } else { + logger.error("Exception trying to release remaining StreamState buffers", e); } } } } + + if (failedToReleaseBufferException != null) { + throw failedToReleaseBufferException; + } } @Override diff --git a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java index fb3503b783e54..45e1836da641f 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java @@ -18,6 +18,7 @@ package org.apache.spark.network.server; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import io.netty.channel.Channel; @@ -96,4 +97,42 @@ public void managedBuffersAreFreedWhenConnectionIsClosed() { Mockito.verify(buffer2, Mockito.times(1)).release(); Assert.assertEquals(0, manager.numStreamStates()); } + + @Test + public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrowsException() { + OneForOneStreamManager manager = new OneForOneStreamManager(); + + Iterator buffers = Mockito.mock(Iterator.class); + Mockito.when(buffers.hasNext()).thenReturn(true); + Mockito.when(buffers.next()).thenThrow(RuntimeException.class); + + ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class); + + Iterator buffers2 = Mockito.mock(Iterator.class); + Mockito.when(buffers2.hasNext()).thenReturn(true).thenReturn(true); + Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer).thenThrow(RuntimeException.class); + + Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS); + manager.registerStream("appId", buffers, dummyChannel); + manager.registerStream("appId", buffers2, dummyChannel); + + Assert.assertEquals(2, manager.numStreamStates()); + + try { + manager.connectionTerminated(dummyChannel); + Assert.fail("connectionTerminated should throw exception when fails to release all buffers"); + + } catch (RuntimeException e) { + + Mockito.verify(buffers, Mockito.times(1)).hasNext(); + Mockito.verify(buffers, Mockito.times(1)).next(); + + Mockito.verify(buffers2, Mockito.times(2)).hasNext(); + Mockito.verify(buffers2, Mockito.times(2)).next(); + + Mockito.verify(mockManagedBuffer, Mockito.times(1)).release(); + + Assert.assertEquals(0, manager.numStreamStates()); + } + } }