Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
Expand Down Expand Up @@ -61,7 +63,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
/**
* The number of elements in the set.
*/
protected volatile int size;
private AtomicInteger size;

/**
* A single partition of the {@link PartitionedGSet}.
Expand All @@ -84,7 +86,7 @@ public PartitionedGSet(final int capacity,
this.latchLock = latchLock;
// addNewPartition(rootKey).put(rootKey);
// this.size = 1;
this.size = 0;
this.size = new AtomicInteger(0);
LOG.info("Partition capacity = {}", DEFAULT_PARTITION_CAPACITY);
LOG.info("Partition overflow factor = {}", DEFAULT_PARTITION_OVERFLOW);
}
Expand All @@ -108,7 +110,7 @@ public PartitionEntry addNewPartition(final K key) {
assert oldPart == null :
"RangeMap already has a partition associated with " + key;

LOG.debug("Total GSet size = {}", size);
LOG.debug("Total GSet size = {}", size.get());
LOG.debug("Number of partitions = {}", partitions.size());
LOG.debug("Previous partition size = {}",
lastPart == null ? 0 : lastPart.size());
Expand All @@ -118,7 +120,7 @@ public PartitionEntry addNewPartition(final K key) {

@Override
public int size() {
return size;
return size.get();
}

public PartitionEntry getPartition(final K key) {
Expand All @@ -130,7 +132,7 @@ public PartitionEntry getPartition(final K key) {
if(part == null) {
throw new IllegalStateException("Null partition for key: " + key);
}
assert size == 0 || part.partLock.isReadTopLocked() ||
assert size.get() == 0 || part.partLock.isReadTopLocked() ||
part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key;
return part;
}
Expand Down Expand Up @@ -162,7 +164,7 @@ public E put(final E element) {
if(part == null) {
throw new HadoopIllegalArgumentException("Illegal key: " + key);
}
assert size == 0 || part.partLock.isWriteTopLocked() ||
assert size.get() == 0 || part.partLock.isWriteTopLocked() ||
part.partLock.hasWriteChildLock() :
"Must hold write Lock: key = " + key;
LOG.debug("put key: {}", key);
Expand All @@ -173,11 +175,11 @@ public E put(final E element) {
}
E result = part.put(element);
if(result == null) { // new element
size++;
LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size);
size.incrementAndGet();
LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size.get());
} else {
LOG.debug("partitionPGSet.put: replaced key {}, size is now {}",
key, size);
key, size.get());
}
return result;
}
Expand All @@ -199,20 +201,20 @@ public E remove(final K key) {
}
E result = part.remove(key);
if(result != null) {
size--;
size.decrementAndGet();
}
return result;
}

@Override
public void clear() {
LOG.error("Total GSet size = {}", size);
LOG.error("Total GSet size = {}", size.get());
LOG.error("Number of partitions = {}", partitions.size());
printStats();
// assert latchLock.hasWriteTopLock() : "Must hold write topLock";
// SHV May need to clear all partitions?
partitions.clear();
size = 0;
size.set(0);
}

private void printStats() {
Expand Down