
LinkedBlockingQueue是一个基于链表实现的阻塞队列,先进先出顺序,是构建生产者-消费者模式的首选构件。
xxxxxxxxxx/*** 内部数据结构通过单向链表实现* @param <E>*/static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}/*** 内部链表长度,默认为int的最大值*/private final int capacity;/*** 当前链表元素个数*/private final AtomicInteger count = new AtomicInteger();/*** 链表头结点,其item永远为null*/transient Node<E> head;/*** 链表尾节点,其item永远为null*/private transient Node<E> last;/*** 元素出列持有的锁*/private final ReentrantLock takeLock = new ReentrantLock();/*** 当链表为空时,该condition使所有的取出元素线程等待*/private final Condition notEmpty = takeLock.newCondition();/*** 元素入列持有的锁*/private final ReentrantLock putLock = new ReentrantLock();/*** 当链表为满时,该condition是所有放入元素的线程等待*/private final Condition notFull = putLock.newCondition();
xxxxxxxxxx/*** 创建默认长度的链表*/public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}/*** 创建指定长度的链表,链表初始化时,首尾节点为同一个节点,其item为null,这是不变的.* @param capacity*/public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}/*** 根据给定集合创建链表* @param c*/public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE); //创建默认大小的链表final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();}}
xxxxxxxxxx/*** 元素入栈* @param node*/private void enqueue(Node<E> node) {//将新节点作为尾节点,同时将旧尾节点的next指针指向新的尾节点last = last.next = node;}/*** 元素出栈* @return*/private E dequeue() {//获取当前首节点Node<E> h = head;//新的首节点为h的next节点Node<E> first = h.next;//更新旧首节点的引用,防止出现过期引用h.next = h; // help GC//更新首节点head = first;//获取首节点中的元素,在LinkedBlockingQueue中,第一个元素是保存在第二个节点中,// 首节点的item永远为null,保持不变E x = first.item;first.item = null;return x;}
xxxxxxxxxxpublic void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//锁住queue,防止其他添加操作线程并发修改queueputLock.lockInterruptibly();try {//如果queue满了,那么阻塞所有的添加操作线程,使其等待while (count.get() == capacity) {notFull.await();}//queue未满,元素入栈enqueue(node);//更新计数器c = count.getAndIncrement();//如果添加元素后,队列未满,那么唤醒某个添加操作的线程,允许添加if (c + 1 < capacity)notFull.signal();} finally {//解锁putLock.unlock();}if (c == 0)signalNotEmpty();}
LinkedBlockingQueue中提供了一个特殊的offer方法,即添加元素时可以指定过期时间,如果在过期时间内,队列未满,添加成功,则返回true;否则,超过规定时间,未添加成功,则返回false。
xxxxxxxxxxpublic boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {//如果队列满了,并且超出了指定时间,直接返回false,不在阻塞等待if (nanos <= 0)return false;//否则等待指定时间nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}
xxxxxxxxxxpublic E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//锁住queue,防止其他取出操作的线程并发修改queuetakeLock.lockInterruptibly();try {//如果当前队列为空,那么阻塞所有的取出操作线程while (count.get() == 0) {notEmpty.await();}//元素出栈x = dequeue();//更新引用c = count.getAndDecrement();//如果取出元素后,链表不为空,那么唤醒一个取出操作的线程,允许继续取出if (c > 1)notEmpty.signal();} finally {//解锁takeLock.unlock();}if (c == capacity)signalNotFull();return x;}