public class BlockingDemo {
ArrayBlockingQueue<String> ab=new ArrayBlockingQueue(10);//FIFO的队列
{
init(); //构造块初始化
}
public void init(){
Iterator<String> it=ab.iterator();
new Thread(()->{
while(true) {
try {
String data = ab.take(); //阻塞方式获得元素
System.out.println("receive:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
public void addData(String data) throws InterruptedException {
ab.add(data);
System.out.println("sendData:"+data);
Thread.sleep(1000);
}
public static void main(String[] args) throws InterruptedException {
BlockingDemo blockingDemo=new BlockingDemo();
for(int i=0;i<1000;i++){
blockingDemo.addData("data:"+i);
}
}
}
插入元素
add/offer/put
删除/获取元素
remove/poll/take
在阻塞队列中,提供了四种处理方式
1.插入操作
add(e) :添加元素到队列中,如果队列满了,继续插入 元素会报错,IllegalStateException。
offer(e) : 添加元素到队列,同时会返回元素是否插入 成功的状态,如果成功则返回 true
put(e) :当阻塞队列满了以后,生产者继续通过 put 添加元素,队列会一直阻塞生产者线程,知道队列可用
offer(e,time,unit) :当阻塞队列满了以后继续添加元素, 生产者线程会被阻塞指定时间,如果超时,则线程直接 退出
2.移除操作
remove():当队列为空时,调用 remove 会返回 false, 如果元素移除成功,则返回 true
poll(): 当队列中存在元素,则从队列中取出一个元素, 如果队列为空,则直接返回 null
take():基于阻塞的方式获取队列中的元素,如果队列为 空,则 take 方法会一直阻塞,直到队列中有新的数据可 以消费
poll(time,unit):带超时机制的获取数据,如果队列为空, 则会等待指定的时间再去获取元素返回
capacity: 表示数组的长度,也就是队列的长度
fair:表示是否为公平的阻塞队列,默认情况下构造的是非 公平的阻塞队列。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair){
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
以 add 方法作为入口,在 add 方法中会调用父类的 add 方 法,也就是 AbstractQueue.如果看源码看得比较多的话, 一般这种写法都是调用父类的模版方法来解决通用性问题
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
1.判断添加的数据是否为空
2.添加重入锁
3.判断队列长度,如果队列长度等于数组长度,表示满了 直接返回 false
4.否则,直接调用 enqueue 将元素添加到队列中
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
这个是最核心的逻辑,方法内部通过 putIndex 索引直接将 元素添加到数组 items
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;//通过 putIndex 对数据赋值
items[putIndex] = x;
if (++putIndex == items.length) // 当putIndex 等于数组长度时,将 putIndex 重置为 0
putIndex = 0;
count++; //记录队列元素的个数
notEmpty.signal();//唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素
}
1.当元素满了以后是无法继续添加的,因为会报错
2.其次,队列中的元素肯定会有一个消费者线程通过 take 或者其他方法来获取数据,而获取数据的同时元素也会 从队列中移除
take 方法是一种阻塞获取队列中元素的方法
它的实现原理很简单,有就删除没有就阻塞,注意这个阻 塞是可以中断的,如果队列没有数据那么就加入 notEmpty 条件队列等待(有数据就直接取走,方法结束),如果有新的 put 线程添加了数据,那么 put 操作将会唤醒 take 线程, 执行 take 操作。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//如果队列为空的情况下,直接通过 await 方法阻塞
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
如果队列中添加了元素,那么这个时候,会在 enqueue 中 调用 notempty.signal 唤醒 take 线程来获得元素
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //默认获取 0 位置的元素
items[takeIndex] = null; //将该位置的元素设置为空
if (++takeIndex == items.length) //这里的作用也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部位置开始获取数据
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();//同时更新迭代器中的元素数据
notFull.signal(); //触发 因为队列满了以后导致的被阻塞的线程
return x;
}
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
queueIsEmpty();
else if (takeIndex == 0)
takeIndexWrapped();
}
所以 itrs.elementDequeued() 是用来更新迭代器中的元 素数据的 takeIndex 的索引变化图如下,同时随着数据的移除,会唤 醒处于 put 阻塞状态下的线程来继续添加数据
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
1.原子更新基本类型
AtomicBoolean、AtomicInteger、AtomicLong
2.原子更新数组
AtomicIntegerArray 、 AtomicLongArray 、 AtomicReferenceArray
3.原子更新引用
AtomicReference 、 AtomicReferenceFieldUpdater 、 AtomicMarkableReference(更新带有标记位的引用类 型)
4.原子更新字段
AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、 AtomicStampedReference