diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java index f493402959031..d874f428d633d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java @@ -25,6 +25,8 @@ import java.util.Set; import java.util.TreeMap; import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.util.LightWeightGSet.LinkedElement; @@ -61,7 +63,7 @@ public class PartitionedGSet implements GSet { /** * The number of elements in the set. */ - protected volatile int size; + private AtomicInteger size; /** * A single partition of the {@link PartitionedGSet}. @@ -84,7 +86,7 @@ public PartitionedGSet(final int capacity, this.latchLock = latchLock; // addNewPartition(rootKey).put(rootKey); // this.size = 1; - this.size = 0; + this.size = new AtomicInteger(0); LOG.info("Partition capacity = {}", DEFAULT_PARTITION_CAPACITY); LOG.info("Partition overflow factor = {}", DEFAULT_PARTITION_OVERFLOW); } @@ -108,7 +110,7 @@ public PartitionEntry addNewPartition(final K key) { assert oldPart == null : "RangeMap already has a partition associated with " + key; - LOG.debug("Total GSet size = {}", size); + LOG.debug("Total GSet size = {}", size.get()); LOG.debug("Number of partitions = {}", partitions.size()); LOG.debug("Previous partition size = {}", lastPart == null ? 0 : lastPart.size()); @@ -118,7 +120,7 @@ public PartitionEntry addNewPartition(final K key) { @Override public int size() { - return size; + return size.get(); } public PartitionEntry getPartition(final K key) { @@ -130,7 +132,7 @@ public PartitionEntry getPartition(final K key) { if(part == null) { throw new IllegalStateException("Null partition for key: " + key); } - assert size == 0 || part.partLock.isReadTopLocked() || + assert size.get() == 0 || part.partLock.isReadTopLocked() || part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key; return part; } @@ -162,7 +164,7 @@ public E put(final E element) { if(part == null) { throw new HadoopIllegalArgumentException("Illegal key: " + key); } - assert size == 0 || part.partLock.isWriteTopLocked() || + assert size.get() == 0 || part.partLock.isWriteTopLocked() || part.partLock.hasWriteChildLock() : "Must hold write Lock: key = " + key; LOG.debug("put key: {}", key); @@ -173,11 +175,11 @@ public E put(final E element) { } E result = part.put(element); if(result == null) { // new element - size++; - LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size); + size.incrementAndGet(); + LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size.get()); } else { LOG.debug("partitionPGSet.put: replaced key {}, size is now {}", - key, size); + key, size.get()); } return result; } @@ -199,20 +201,20 @@ public E remove(final K key) { } E result = part.remove(key); if(result != null) { - size--; + size.decrementAndGet(); } return result; } @Override public void clear() { - LOG.error("Total GSet size = {}", size); + LOG.error("Total GSet size = {}", size.get()); LOG.error("Number of partitions = {}", partitions.size()); printStats(); // assert latchLock.hasWriteTopLock() : "Must hold write topLock"; // SHV May need to clear all partitions? partitions.clear(); - size = 0; + size.set(0); } private void printStats() {