diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 67f64d7962035..cf3547a68bffc 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -57,7 +57,7 @@ private static class StreamState { int curChunk = 0; // Used to keep track of the number of chunks being transferred and not finished yet. - volatile long chunksBeingTransferred = 0L; + final AtomicLong chunksBeingTransferred = new AtomicLong(0L); StreamState(String appId, Iterator buffers, Channel channel) { this.appId = appId; @@ -153,7 +153,7 @@ public void checkAuthorization(TransportClient client, long streamId) { public void chunkBeingSent(long streamId) { StreamState streamState = streams.get(streamId); if (streamState != null) { - streamState.chunksBeingTransferred++; + streamState.chunksBeingTransferred.incrementAndGet(); } } @@ -167,7 +167,7 @@ public void streamBeingSent(String streamId) { public void chunkSent(long streamId) { StreamState streamState = streams.get(streamId); if (streamState != null) { - streamState.chunksBeingTransferred--; + streamState.chunksBeingTransferred.decrementAndGet(); } } @@ -180,7 +180,7 @@ public void streamSent(String streamId) { public long chunksBeingTransferred() { long sum = 0L; for (StreamState streamState: streams.values()) { - sum += streamState.chunksBeingTransferred; + sum += streamState.chunksBeingTransferred.get(); } return sum; }