Skip to content

Conversation

@wangshuo128
Copy link
Contributor

@wangshuo128 wangshuo128 commented Dec 20, 2018

What changes were proposed in this pull request?

In current code path, OneForOneStreamManager holds StreamState in a Map named streams.

A StreamState is initialized and put into streams when OpenBlocks request received.
One specific StreamState is removed from streams in two scenarios below:

  1. The last chunk of a stream is fetched
  2. The connection of ChunkFetchRequest is closed

StreamState will never be clean up, if OpenBlocks request is received without any following ChunkFetchRequest. This will cause memory leak in server side, which is harmful for long running service such as ExternalShuffleService.

This PR associates StreamState with channel when handle OpenBlocks request, because OpenBlocks request and following ChunkFetchRequests for a specific stream are sent from the same TransportClient in OneForOneBlockFetcher.

How was this patch tested?

New test added.

@wangshuo128 wangshuo128 changed the title Only OpenBlocks without any ChunkFetch for one stream will cause memory leak in ExternalShuffleService [SPARK-26418][Shuffle] Only OpenBlocks without any ChunkFetch for one stream will cause memory leak in ExternalShuffleService Dec 20, 2018
@wangshuo128 wangshuo128 changed the title [SPARK-26418][Shuffle] Only OpenBlocks without any ChunkFetch for one stream will cause memory leak in ExternalShuffleService [SPARK-26418][SHUFFLE] Only OpenBlocks without any ChunkFetch for one stream will cause memory leak in ExternalShuffleService Dec 20, 2018
@beliefer
Copy link
Contributor

If ExternalShuffleService only received a OpenBlocks message, not received any ChunkFetchRequest message, associate StreamState is hold by streams.
You thought this method that build relationships between Channel and StreamState is valid.
But when a Channel is terminated of failed,TransportClientFactory will create another one, please see the method TransportClientFactory.createClient.
So I propose to add a parameter to control some timeout of Channel that sended OpenBlocks.When the time expired,call the method connectionTerminated.

@wangshuo128
Copy link
Contributor Author

Let me explain my problem in detail.

We use YarnShuffleService as aux service of NodeManager in our cluster. Full GC happened in some NodeManagers. We dump the heap memory, found that the map held StreamState in OneForOneStreamManager was 3G bytes, almost 80% of heap size. Some applications have finished, but the StreamStates were still in OneForOneStreamManager.

In current code, server creates StreamState when handle OpenBlocks request and associates StreamState with channel when handle following ChunkFetchRequests.

I think two reasons will cause this:

  1. OpenBlocks request is received and StreamState is initialized in server side. Then transport layer client lost or even executor lost, no ChunkFetchRequest is sent to server for the stream.
  2. OpenBlocks request is received and StreamState is initialized in server side. ChunkFetchRequests for the stream are sent to server. But server is under heavy pressure and not able to handle the ChunkFetchRequest before timeout. Then client close its connection in TransportChannelHandler.userEventTriggered.

Currently the OpenBlocks request and following FetchChunkRequests for a specific stream are sent in the same TransportClient in OneForOneBlockFetcher. So I think associate StreamState with channel when handle OpenBlocks request will be fine.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine given the client code will always use the same client.

You could clean up application state in ExternalShuffleBlockHandler.applicationRemoved, but this is simpler.

// Connection closed before any FetchChunk request received
streamManager.connectionTerminated(reverseClient.getChannel());

assert streamManager.getStreamsSize() == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use jUnit asserts.

import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.junit.Test;

import java.nio.ByteBuffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java imports go first.

package org.apache.spark.network.shuffle;

import io.netty.channel.Channel;
import org.apache.spark.network.buffer.ManagedBuffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark imports go into their own group. See existing code.

}

@VisibleForTesting
public long getStreamsSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getStreamCount

@vanzin
Copy link
Contributor

vanzin commented Dec 20, 2018

Also, please explain the fix, not the problem, in the PR.

@vanzin
Copy link
Contributor

vanzin commented Dec 20, 2018

ok to test

@SparkQA
Copy link

SparkQA commented Dec 21, 2018

Test build #100344 has finished for PR 23355 at commit 7a94112.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor

beliefer commented Dec 21, 2018

I check OneForOneBlockFetcher again, the method start use the same TransportClient to sent message OpenBlocks and following message FetchChunkRequest.There have no chance to call TransportClientFactory.createClient. So I think your change of code is OK, but still propose to add a parameter to control some timeout of Channel.When the time expired,call the following code

if (state.associatedChannel == channel) {
  streams.remove(entry.getKey());

  // Release all remaining buffers.
  while (state.buffers.hasNext()) {
      state.buffers.next().release();
  }
}

@wangshuo128
Copy link
Contributor Author

wangshuo128 commented Dec 21, 2018

@vanzin
Thanks a lot for review. I will fix the code.

Also, please explain the fix, not the problem, in the PR.

I will add some explanation about the fix in the PR description.

Thank you.

@SparkQA
Copy link

SparkQA commented Dec 21, 2018

Test build #100355 has finished for PR 23355 at commit c1c8551.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangshuo128
Copy link
Contributor Author

@vanzin
Hi, I fixed the code and updated the PR description. Could you take a look if you have time. Thank you!

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is so clearly.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @vanzin that change's fine since OpenBlocks and fetchChunk would always use the same client.

A minor concern is:
Since we always register the channel when we receive the FetchChunkRequest in the past, so the associated channel with the stream will aways be correct. And for this change, if user call sendRpc(OpenBlocks) and fetchChunk with different client, then, we may record the wrong channel(which is the OpenBlocks chanel, rather than fetchChunk channel) with the stream which is being fetched, which may fall into the same issue as this pr fixed. So, I think we should leave some comment above fetchChunk to notify user that this method must use the same client with the OpenBlocks, otherwise, there may be potential memory leak.

ManagedBuffer buf;
try {
streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
streamManager.registerChannel(channel, msg.streamChunkId.streamId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you remove this, you should add it back to NettyBlockRpcServer for non external shuffle service. Otherwise, you'll introduce the bug you fixed with pr to the non external shuffle service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Added it back to NettyBlockRpcServer.

RpcResponseCallback callback = mock(RpcResponseCallback.class);

// Open blocks
handler.receive(reverseClient, openBlocks, callback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert stream count after receiving an OpenBlocks message.

@Ngone51
Copy link
Member

Ngone51 commented Jan 15, 2019

Hi @wangshuo128 , did you see the issue was resolved with this pr ?

@wangshuo128
Copy link
Contributor Author

@Ngone51 Thanks a lot for comment and review!

A minor concern is:
Since we always register the channel when we receive the FetchChunkRequest in the past, so the associated channel with the stream will aways be correct. And for this change, if user call sendRpc(OpenBlocks) and fetchChunk with different client, then, we may record the wrong channel(which is the OpenBlocks chanel, rather than fetchChunk channel) with the stream which is being fetched, which may fall into the same issue as this pr fixed. So, I think we should leave some comment above fetchChunk to notify user that this method must use the same client with the OpenBlocks, otherwise, there may be potential memory leak.

I had the same concern too. I added some comment at TransportClient.fetchChunk .

@wangshuo128
Copy link
Contributor Author

cc @viirya @cloud-fan @gatorsmile Could you give some advice about this PR? Thanks!

@cloud-fan
Copy link
Contributor

is it same as #23521 ?

@SparkQA
Copy link

SparkQA commented Jan 15, 2019

Test build #101239 has finished for PR 23355 at commit 3d37481.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Jan 15, 2019

@cloud-fan It's not same. #23521 register a channel when server receive FetchChunkRequest or SreamRequest after it receiving OpenBlocks, which follows past way. And this pr suggests to register a channel once server receive OpenBlocks.

And I think there's conflict now since #23521 try to make registerChannel a private method.

@wangshuo128
Copy link
Contributor Author

@Ngone51 @cloud-fan Yes, found that this pr is same as 23521 and 23521 is better. I will close this pr. Thank you. Also thanks @viirya !

@Ngone51
Copy link
Member

Ngone51 commented Jan 16, 2019

@wangshuo128 ??? Why is better ? I can't get understand. #23521 does not intend to fix the issue you proposed here. And the time for registering a channel is obviously different between these two prs.

@wangshuo128
Copy link
Contributor Author

@Ngone51 #23521 removes registerChannel and associates stream with channel in registerStream when handle OpenBlocks request.

@Ngone51
Copy link
Member

Ngone51 commented Jan 16, 2019

oh, I miss the newest update, they're same now. Sorry, @wangshuo128 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants