From 22def779b12eb34f8da4c1c40ab41d32bca29292 Mon Sep 17 00:00:00 2001 From: Henrique Goulart Date: Thu, 2 Jan 2020 01:36:01 +0100 Subject: [PATCH 1/6] OneForOneStreamManager might leak memory in connectionTerminated OneForOneStreamManager may not remove all StreamStates from memory map when a connection is terminated. A RuntimeException might be thrown in StreamState$buffers.next() by one of ExternalShuffleBlockResolver$getBlockData/getRddBlock... breaking the loop through streams.entrySet() This commit removes all StreamStates from memory map first and, after that, releases all buffers from each removed StreamState --- .../server/OneForOneStreamManager.java | 20 ++++++++------ .../server/OneForOneStreamManagerSuite.java | 27 +++++++++++++++++++ 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 67f64d7962035..37f4e2b76743f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -18,6 +18,7 @@ package org.apache.spark.network.server; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; @@ -117,18 +118,21 @@ public static Pair parseStreamChunkId(String streamChunkId) { @Override public void connectionTerminated(Channel channel) { + LinkedList removedStates = new LinkedList<>(); // Close all streams which have been associated with the channel. for (Map.Entry entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { - streams.remove(entry.getKey()); - - // Release all remaining buffers. - while (state.buffers.hasNext()) { - ManagedBuffer buffer = state.buffers.next(); - if (buffer != null) { - buffer.release(); - } + removedStates.add(streams.remove(entry.getKey())); + } + } + + for (StreamState state: removedStates) { + // Release all remaining buffers. + while (state.buffers.hasNext()) { + ManagedBuffer buffer = state.buffers.next(); + if (buffer != null) { + buffer.release(); } } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java index fb3503b783e54..0072038c21b4f 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java @@ -18,6 +18,7 @@ package org.apache.spark.network.server; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import io.netty.channel.Channel; @@ -96,4 +97,30 @@ public void managedBuffersAreFreedWhenConnectionIsClosed() { Mockito.verify(buffer2, Mockito.times(1)).release(); Assert.assertEquals(0, manager.numStreamStates()); } + + @Test + public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrowsException() { + OneForOneStreamManager manager = new OneForOneStreamManager(); + + Iterator buffers = Mockito.mock(Iterator.class); + Mockito.when(buffers.hasNext()).thenReturn(true); + Mockito.when(buffers.next()).thenThrow(RuntimeException.class); + + Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS); + manager.registerStream("appId", buffers, dummyChannel); + manager.registerStream("appId", buffers, dummyChannel); + + Assert.assertEquals(2, manager.numStreamStates()); + + try { + manager.connectionTerminated(dummyChannel); + Assert.fail("connectionTerminated should throws RuntimeException in buffers.next()"); + } catch (RuntimeException e) { + Mockito.verify(buffers, Mockito.times(1)).hasNext(); + Mockito.verify(buffers, Mockito.times(1)).next(); + + Assert.assertEquals(0, manager.numStreamStates()); + } + } + } From a9ebe4ffe15bf1e438e8f27a2b7344c4f7b6eaf7 Mon Sep 17 00:00:00 2001 From: Henrique Goulart Date: Thu, 2 Jan 2020 14:53:21 +0100 Subject: [PATCH 2/6] Add try-catch block to release buffers for all removedStates even there's any exception on releasing any state. --- .../server/OneForOneStreamManager.java | 12 ++++++--- .../server/OneForOneStreamManagerSuite.java | 26 ++++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 37f4e2b76743f..f6431efe95efa 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -129,11 +129,15 @@ public void connectionTerminated(Channel channel) { for (StreamState state: removedStates) { // Release all remaining buffers. - while (state.buffers.hasNext()) { - ManagedBuffer buffer = state.buffers.next(); - if (buffer != null) { - buffer.release(); + try { + while (state.buffers.hasNext()) { + ManagedBuffer buffer = state.buffers.next(); + if (buffer != null) { + buffer.release(); + } } + } catch (RuntimeException e) { + logger.error("Exception trying to release remaining StreamState buffers", e); } } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java index 0072038c21b4f..68bc5e7470cdd 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java @@ -106,21 +106,29 @@ public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrows Mockito.when(buffers.hasNext()).thenReturn(true); Mockito.when(buffers.next()).thenThrow(RuntimeException.class); + ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class); + + Iterator buffers2 = Mockito.mock(Iterator.class); + Mockito.when(buffers2.hasNext()).thenReturn(true).thenReturn(true); + Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer).thenThrow(RuntimeException.class); + Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS); manager.registerStream("appId", buffers, dummyChannel); - manager.registerStream("appId", buffers, dummyChannel); + manager.registerStream("appId", buffers2, dummyChannel); Assert.assertEquals(2, manager.numStreamStates()); - try { - manager.connectionTerminated(dummyChannel); - Assert.fail("connectionTerminated should throws RuntimeException in buffers.next()"); - } catch (RuntimeException e) { - Mockito.verify(buffers, Mockito.times(1)).hasNext(); - Mockito.verify(buffers, Mockito.times(1)).next(); + manager.connectionTerminated(dummyChannel); - Assert.assertEquals(0, manager.numStreamStates()); - } + Mockito.verify(buffers, Mockito.times(1)).hasNext(); + Mockito.verify(buffers, Mockito.times(1)).next(); + + Mockito.verify(buffers2, Mockito.times(2)).hasNext(); + Mockito.verify(buffers2, Mockito.times(2)).next(); + + Mockito.verify(mockManagedBuffer, Mockito.times(1)).release(); + + Assert.assertEquals(0, manager.numStreamStates()); } } From 410651d87c123f7a22f2950384d972bb7e71f30a Mon Sep 17 00:00:00 2001 From: Henrique Goulart Date: Thu, 9 Jan 2020 10:35:39 +0100 Subject: [PATCH 3/6] Rethrow exception when fails to release all StreamState buffers --- .../server/OneForOneStreamManager.java | 33 ++++++++++--------- .../server/OneForOneStreamManagerSuite.java | 19 +++++++---- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index f6431efe95efa..30d44ef229683 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -18,7 +18,6 @@ package org.apache.spark.network.server; import java.util.Iterator; -import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; @@ -118,28 +117,32 @@ public static Pair parseStreamChunkId(String streamChunkId) { @Override public void connectionTerminated(Channel channel) { - LinkedList removedStates = new LinkedList<>(); + boolean failedToReleaseBuffers = false; + // Close all streams which have been associated with the channel. for (Map.Entry entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { - removedStates.add(streams.remove(entry.getKey())); - } - } - - for (StreamState state: removedStates) { - // Release all remaining buffers. - try { - while (state.buffers.hasNext()) { - ManagedBuffer buffer = state.buffers.next(); - if (buffer != null) { - buffer.release(); + streams.remove(entry.getKey()); + + try { + // Release all remaining buffers. + while (state.buffers.hasNext()) { + ManagedBuffer buffer = state.buffers.next(); + if (buffer != null) { + buffer.release(); + } } + } catch (RuntimeException e) { + failedToReleaseBuffers = true; + logger.error("Exception trying to release remaining StreamState buffers", e); } - } catch (RuntimeException e) { - logger.error("Exception trying to release remaining StreamState buffers", e); } } + + if (failedToReleaseBuffers) { + throw new RuntimeException("Failed to release one or more StreamState buffers"); + } } @Override diff --git a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java index 68bc5e7470cdd..e624b233f5367 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java @@ -118,17 +118,22 @@ public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrows Assert.assertEquals(2, manager.numStreamStates()); - manager.connectionTerminated(dummyChannel); + try { + manager.connectionTerminated(dummyChannel); + Assert.fail("connectionTerminated should throw RuntimeException when fails to release all buffers"); - Mockito.verify(buffers, Mockito.times(1)).hasNext(); - Mockito.verify(buffers, Mockito.times(1)).next(); + } catch (RuntimeException e) { - Mockito.verify(buffers2, Mockito.times(2)).hasNext(); - Mockito.verify(buffers2, Mockito.times(2)).next(); + Mockito.verify(buffers, Mockito.times(1)).hasNext(); + Mockito.verify(buffers, Mockito.times(1)).next(); - Mockito.verify(mockManagedBuffer, Mockito.times(1)).release(); + Mockito.verify(buffers2, Mockito.times(2)).hasNext(); + Mockito.verify(buffers2, Mockito.times(2)).next(); - Assert.assertEquals(0, manager.numStreamStates()); + Mockito.verify(mockManagedBuffer, Mockito.times(1)).release(); + + Assert.assertEquals(0, manager.numStreamStates()); + } } } From ac587989396f88bb2a84064f5906b94d086bf12d Mon Sep 17 00:00:00 2001 From: Henrique Goulart Date: Thu, 9 Jan 2020 11:34:42 +0100 Subject: [PATCH 4/6] Fix linter linelength error [ERROR] src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java:[123] (sizes) LineLength: Line is longer than 100 characters (found 106). --- .../spark/network/server/OneForOneStreamManagerSuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java index e624b233f5367..a8f9f22ddd258 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java @@ -120,7 +120,7 @@ public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrows try { manager.connectionTerminated(dummyChannel); - Assert.fail("connectionTerminated should throw RuntimeException when fails to release all buffers"); + Assert.fail("connectionTerminated should throw exception when fails to release all buffers"); } catch (RuntimeException e) { From b02e48a1b1d1f1b5409fbd86ed8ffd3e968ddf5d Mon Sep 17 00:00:00 2001 From: Henrique Goulart Date: Sun, 12 Jan 2020 14:13:25 +0100 Subject: [PATCH 5/6] Remove extra blank line --- .../apache/spark/network/server/OneForOneStreamManagerSuite.java | 1 - 1 file changed, 1 deletion(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java index a8f9f22ddd258..45e1836da641f 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java @@ -135,5 +135,4 @@ public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrows Assert.assertEquals(0, manager.numStreamStates()); } } - } From 0fde4c94fd67f6be55ee6eff511e9d046fd3cd5c Mon Sep 17 00:00:00 2001 From: Henrique Goulart Date: Tue, 14 Jan 2020 22:13:20 +0100 Subject: [PATCH 6/6] Store first exception and re-throw it --- .../network/server/OneForOneStreamManager.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 30d44ef229683..3b5e1bcde2564 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -117,7 +117,7 @@ public static Pair parseStreamChunkId(String streamChunkId) { @Override public void connectionTerminated(Channel channel) { - boolean failedToReleaseBuffers = false; + RuntimeException failedToReleaseBufferException = null; // Close all streams which have been associated with the channel. for (Map.Entry entry: streams.entrySet()) { @@ -134,14 +134,17 @@ public void connectionTerminated(Channel channel) { } } } catch (RuntimeException e) { - failedToReleaseBuffers = true; - logger.error("Exception trying to release remaining StreamState buffers", e); + if (failedToReleaseBufferException == null) { + failedToReleaseBufferException = e; + } else { + logger.error("Exception trying to release remaining StreamState buffers", e); + } } } } - if (failedToReleaseBuffers) { - throw new RuntimeException("Failed to release one or more StreamState buffers"); + if (failedToReleaseBufferException != null) { + throw failedToReleaseBufferException; } }