- 浏览: 6305987 次
- 性别:
- 来自: 一片神奇的土地
文章分类
- 全部博客 (745)
- JQuery (25)
- JS (33)
- 数据库 (59)
- Java基础 (56)
- JSON (8)
- XML (8)
- ireport (7)
- 设计模式 (10)
- 心情 (14)
- freemarker (1)
- 问题 (15)
- powerdesigner (2)
- CSS (15)
- DWR (4)
- tomcat (16)
- Hibernate (12)
- Oracle (7)
- Struts (7)
- Spring (34)
- JSP (23)
- 需学习 (64)
- 工具类库 (63)
- Maven (14)
- 笔试题 (34)
- 源码学习 (31)
- 多线程 (39)
- Android (32)
- 缓存 (20)
- SpringMVC (14)
- jQueryEasyUi (12)
- webservice-RPC (13)
- ant (1)
- ASP.NET (10)
- 正则表达式 (3)
- Linux (15)
- JBoss (1)
- EJB (3)
- UML (2)
- JMS (3)
- Flex (8)
- JSTL (2)
- 批处理 (5)
- JVM (16)
- 【工具】 (16)
- 数据结构 (29)
- HTTP/TCP/Socket (18)
- 微信 (1)
- tomcat源码学习 (15)
- Python (30)
- 主机 (2)
- 设计与架构 (19)
- thrift-RPC (2)
- nginx (6)
- 微信小程序 (0)
- 分布式+集群 (12)
- IO (1)
- 消息队列 (4)
- 存储过程 (8)
- redis (9)
- zookeeper (5)
- 海量数据 (5)
最新评论
-
360pluse:
技术更新,战术升级!Python爬虫案例实战从零开始一站通网盘 ...
Python爬虫实战:Scrapy豆瓣电影爬取 -
18335864773:
推荐用 pageoffice 组件生成 word 文件。
JAVA生成WORD工具类 -
jjhe369:
LISTD_ONE 写道起始地址为163.135.0.1 结束 ...
IP地址与CIDR -
baojunhu99:
private final int POOL_SIZE = 5 ...
使用CompletionService获取多线程返回值 -
LovingBaby:
胡说,javascript 运行时是单线程的,event lo ...
Ajax请求是否可以实现同步
LinkedBlockingQueue是一个单向链表结构的队列,也就是只有next,没有prev。如果不指定容量默认为Integer.MAX_VALUE。通过putLock和takeLock两个锁进行同步,两个锁分别实例化notFull和notEmpty两个Condtion,用来协调多线程的存取动作。其中某些方法(如remove,toArray,toString,clear等)的同步需要同时获得这两个锁,此时存或者取操作都会不可进行,需要注意的是所有需要同时lock的地方顺序都是先putLock.lock再takeLock.lock,这样就避免了可能出现的死锁问题。
takeLock实例化出一个notEmpty的Condition,putLock实例化一个notFull的Condition,两个Condition协调即时通知线程队列满与不满的状态信息。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 节点数据结构 */ static class Node<E> { /** The item, volatile to ensure barrier separating write and read */ volatile E item; Node<E> next; Node(E x) { item = x; } } /** 队列的容量 */ private final int capacity; /** 持有节点计数器 */ private final AtomicInteger count = new AtomicInteger(0); /** 头指针 */ private transient Node<E> head; /** 尾指针 */ private transient Node<E> last; /** 用于读取的独占锁*/ private final ReentrantLock takeLock = new ReentrantLock(); /** 队列是否为空的条件 */ private final Condition notEmpty = takeLock.newCondition(); /** 用于写入的独占锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 队列是否已满的条件 */ private final Condition notFull = putLock.newCondition(); private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } //插入数据到链尾 private void insert(E x) { last = last.next = new Node<E>(x); } //从链头取数据 private E extract() { Node<E> first = head.next; head = first; E x = first.item; first.item = null; return x; } private void fullyLock() { putLock.lock(); takeLock.lock(); } private void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); //注意初始化时head/last地址相同,数据都为null // 其实就相当于只有一个节点 } ... }
这里仅仅展示部分源码,主要的方法在后面的分析中列出。分析之前明确一个最基本的概念。天天念叨着编写线程安全的类,什么是线程安全的类?那就是类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(改变,遍历,查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类的编写不是线程安全的。
明确了这个基本的概念就可以很好的理解这个Queue的实现为什么是线程安全的了。在LinkedBlockingQueue的所有共享的全局变量中,final声明的capacity在构造器生成实例时就成了不变量了。而final声明的count由于是AtomicInteger类型的,所以能够保证其操作的原子性。剩下的final的变量都是初始化成了不变量,并且不包含可变属性,所以都是访问安全的。那么剩下的就是Node类型的head和last两个可变量。所以要保证LinkedBlockingQueue是线程安全的就是要保证对head和last的访问是线程安全的。
首先从上面的源码可以看到insert(E x),extract()是真正的操作head,last来入队和出对的方法,但是由于是私有的,所以不能被直接访问,不用担心线程的问题。实际入队的公开的方法是put(E e),offer(E e)和offer(E e, long timeout, TimeUnit unit)。put(...)方法与offer(...)都是把新元素加入到队尾,所不同的是如果不满足条件put会把当前执行的线程扔到等待集中等待被唤醒继续执行,而offer则是直接退出,所以如果是需要使用它的阻塞特性的话,不能直接使用poll(...)。
put(...)方法中加入元素的操作使用this.putLock来限制多线程的访问,并且使用了可中断的方式:
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- int c = -1;
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count; //----------------a
- putLock.lockInterruptibly();//随时保证响应中断 //--------b
- try {
- //*****************************(1)*********************************
- try {
- while (count.get() == capacity)
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
- //*****************************end*********************************
- insert(e);//真正的入队操作
- //********************(2)**********************
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- //******************end**********************
- } finally {
- putLock.unlock();
- } //-------------------------c
- if (c == 0) //---------------d
- signalNotEmpty();
- }
代码段(1)是阻塞操作,代码段(2)是count递增和唤醒等待的操作。两者之间的insert(e)才是入队操作,其实际是操作的队尾引用last,并且没有牵涉到head。所以设计两个锁的原因就在这里!因为出队操作take(),poll()实际是执行extract()仅仅操作队首引用head。增加了this.takeLock这个锁,就实现了多个不同任务的线程入队的同时可以进行出对的操作,并且由于两个操作所共同使用的count是AtomicInteger类型的,所以完全不用考虑计数器递增递减的问题。假设count换成int,则相应的putLock内的count++和takeLock内的count--有可能相互覆盖,最终造成count的值被腐蚀,故这种设计必须使用原子操作类。
我之前说过,保证类的线程安全只要保证head和last的操作的线程安全,也就是保证insert(E x)和extract()线程安全即可。那么上面的put方法中的代码段(1)放在a,b之间,代码段(2)放在c,d之间不是更好?毕竟锁的粒度越小越好。单纯的考虑count的话这样的改变是正确的,但是await()和singal()这两个方法执行时都会检查当前线程是否是独占锁的那个线程,如果不是则抛出java.lang.IllegalMonitorStateException异常。而这两段代码中包含notFull.await()和notFull.signal()这两句使得(1),(2)必须放在lock保护块内。这里说明主要是count本身并不需要putLock或者takeLock的保护,从
- public int size() {
- return count.get();
- }
可以看出count的访问是不需要任何锁的。而在put等方法中,其与锁机制的混用很容易造成迷惑。最后put中的代码d的作用主要是一个低位及时通知的作用,也就是队列刚有值试图获得takeLock去通知等待集中的出队线程。因为c==0意味着count.getAndIncrement()原子递增成功,所以count > 0成立。类似作用的代码:
- if (c == capacity)
- signalNotFull();
在take和poll中也有出现,实现了高位及时通知。
分析完了put,对应的offer,take,poll方法都是类似的实现。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { insert(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { try { while (count.get() == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to a non-interrupted thread throw ie; } x = extract(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
下面看看遍历队列的操作:
- public Object[] toArray() {
- fullyLock();
- try {
- int size = count.get();
- Object[] a = new Object[size];
- int k = 0;
- for (Node<E> p = head.next; p != null; p = p.next)
- a[k++] = p.item;
- return a;
- } finally {
- fullyUnlock();
- }
- }
这个方法很简单主要是要清楚一点:这个操作执行时不允许其他线程再修改队首和队尾,所以使用了fullyLock去获取putLock和takeLock,只要成功则可以保证不会再有修改队列的操作。然后就是安心的遍历到最后一个元素为止了。
另外在offer(E e, long timeout, TimeUnit unit)这个方法中提供了带有超时的入队操作,如果一直不成功的话,它会尝试在timeout的时间内入队:
- for (;;) {
- ...//入队操作
- if (nanos <= 0)
- return false;
- try {
- nanos = notFull.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
- }
其内部循环使用notFull.awaitNanos(nanos)方法反复的计算剩余时间的大概值用于实现延时功能。nanos<=0则放弃尝试,直接退出。
整体而言,LinkedBlockingQueue的实现还是很清晰的。这些看似复杂的数据结构的实现实质都是多线程的基础的综合应用。就好像数学中千变万化的难题其实都是基础公式的组合一样,如果有清晰的基础认知,还是能找到自己分析的思路的。
来源:http://yanxuxin.iteye.com/blog/582162
JDK:
方法区别:
boolean offer(E e)
将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
boolean offer(E e, long timeout, TimeUnit unit)
将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
void put(E e)
将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
E peek()
获取但不移除此队列的头;如果此队列为空,则返回 null。
E poll()
获取并移除此队列的头,如果此队列为空,则返回 null。
E poll(long timeout, TimeUnit unit)
获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
E take()
获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
boolean remove(Object o)
从此队列移除指定元素的单个实例(如果存在)。
int size()
返回队列中的元素个数。
Object[] toArray()
返回按适当顺序包含此队列中所有元素的数组。
T[] toArray(T[] a)
返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
String toString()
返回此 collection 的字符串表示形式。
问题思考:为什么需要用两个锁?
单向链表数据结构:
package tags; import java.util.Iterator; /* * 单向链表从头部插数据,从尾部取数据 * @author Robby */ public class SingleLinked<E> { class Node<E>{ volatile E item; Node<E> next; Node(E x){ item = x; } } private Node<E> head; private Node<E> last; private static int count; SingleLinked(){ last = head = new Node<E>(null); //初始化head/last,此时这两个地址相同,其实就是一个节点 } private void insert(E e){ Node<E> newNode = new Node<E>(e); last.next = newNode; last = last.next; count++; } private E extract(){ //Node<E> first = head.next; //head为null //E e = first.item; head = head.next; //只一句就可以 //first.item = null; count--; return null; } private void remove(E e){ if(e == null) return; Node<E> pre = head; //这两个变量用于记录e的节点和前节点 Node<E> current = head.next; /*for(Node<E> p = current; p != null; p = p.next){ if(p.item.equals(e)){ pre.next = current.next; count--; break; } pre = p; current = p.next; }*/ while(current != null){ if(current.item.equals(e)){ pre.next = current.next; count--;break; } pre = current; current = current.next; } } public E peek(){ //从头部弹出一个数据 Node<E> first = head.next; if(first == null) return null; return first.item; } public Object[] toArray(){ if(count == 0) return null; Object[] o = new Object[count]; int i = 0; for(Node<E> p = head.next; p!=null; p = p.next){ o[i++] = p.item; } return o; } public void clear(){ head.next = null; last = head; count = 0; } //遍历方法是通过新建一个Iterator实现的 public Iterator<E> iterator(){ return new Itr(); } private class Itr implements Iterator{ private Node<E> current; private E element; Itr(){ current = head.next; element = current.item; } public boolean hasNext() { return current != null; } public E next() { E e = element; current = current.next; if(current != null) element = current.item; return e; } public void remove() { } } public String toString(){ Object[] o = toArray(); String str = ""; if(o == null) return str; for(Object ob : o){ str += ob != null && !ob.toString().equals("") ? ob.toString()+"," : ""; } return str.substring(0,str.length() - 1); } @SuppressWarnings("unchecked") public static void main(String[] args){ SingleLinked slink = new SingleLinked(); slink.insert(1); System.out.println("count:"+count+" |"+slink.toString()); slink.insert(2); System.out.println("count:"+count+" |"+slink.toString()); slink.insert(3); System.out.println("count:"+count+" |"+slink.toString()); slink.extract(); System.out.println("count:"+count+" |"+slink.toString()); slink.insert(5); System.out.println("count:"+count+" |"+slink.toString()); //slink.extract(); //System.out.println("count:"+count+" |"+slink.toString()); //slink.clear(); /*slink.remove(3); System.out.println("count:"+count+" |"+slink.toString()); slink.remove(5); System.out.println("count:"+count+" |"+slink.toString()); slink.remove(2); System.out.println("count:"+count+" |"+slink.toString());*/ Iterator it = slink.iterator(); while(it.hasNext()){ System.out.print(it.next()+" "); } } }
基本结构其实并不复杂,LinkedBlockingQueue是在此基础上加入了多线程的实现。
- SingleLinked.zip (1.4 KB)
- 描述: 单向链表源码
- 下载次数: 2
发表评论
-
AQS
2019-03-21 15:08 2028大白话聊聊Java并发面试问题之谈谈你对AQS的理解? ... -
【tomcat系统架构(二)】
2018-08-16 16:22 777四图带你了解Tomcat系统架构——让面试官颤抖Tomca ... -
JAVA内存模型-(程锁V传+启断终结)
2018-04-25 11:29 975Java并发编程:volatile关键字解析-(重要,分析 ... -
ArrayBlockingQueue
2018-04-18 15:54 1257ArrayBlockingQueue p ... -
【多线程总结】
2017-12-12 15:53 639线程需要注意的地方: 1、 public final s ... -
CDO框架架构
2017-03-13 10:46 2619架构: 1、监听器WebApplicationListen ... -
由文件名读取文件内容Utility
2017-03-13 10:39 687//将servicebus.xml内容解析为字符串 // ... -
StandardService分析-tomcat6.x源码阅读
2016-12-02 10:53 1367来源:https://my.oschina.net/dou ... -
StandardServer分析-tomcat6.x源码阅读
2016-11-30 17:37 1453来源:https://my.oschina.net/do ... -
Lifecycle-Tomcat生命周期
2016-11-24 14:19 48461 概述 Catalina包括很多组件,当catali ... -
ClassLoaderTest
2016-11-23 17:35 1095JVM类加载机制1 JVM类加载机制2 JRE下的rt ... -
Digester解析XML文件
2016-11-23 16:50 5038TOMCAT底层解析server.xml ... -
StringTokenizer-大数据情况下截取字符串
2016-11-23 11:00 3068java.util.StringTokenizer ... -
tomcat中处理消息提示的公用类 StringManager.java
2016-11-18 10:24 1750代码很简单,主要学习思路。 tomcat中处理消息 ... -
JAVA IO流中的flush
2016-11-18 09:51 3153前言: 大家在使用Java IO流中Output ... -
服务器集群对Synchronized有没有什么影响
2015-11-11 15:39 5390有个功能大致如下,在一堆没用过的数据中取一条数据,并将其标 ... -
获取id 的一种策略
2015-07-07 17:48 1929从数据库中批量(step个)拿出Id,然后使用,待消耗完后 ... -
(一)Http请求、Http响应、 Socket
2015-03-04 16:49 22830第一章:一个简单的Web服务器 本章说明java ... -
tomcat总结
2015-03-02 18:18 1761百度文库:《how-tomcat-works中文版》 H ... -
模拟的线程池
2014-09-25 09:51 1743自定义数据库连接池 例子: public cla ...
相关推荐
linkedblockingqueue测试程序
LinkedBlockingQueue 首先 LinkedBlockingQueue 是一个 “可选且有界” 的阻塞队列实现,你可以根据需要指定队列的大小。 接下来,我将创建一个 LinkedBlockingQueue ,它最多可以包含100个元素: BlockingQueue...
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
LinkedBlockingQueuejava.pdf
JDK容器学习之Queue:LinkedBlockingQueue
NULL 博文链接:https://xiongjiajia.iteye.com/blog/2325943
并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解
主要介绍了详细分析Java并发集合LinkedBlockingQueue的用法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
主要介绍了java中LinkedBlockingQueue与ArrayBlockingQueue的异同,需要的朋友可以参考下
NULL 博文链接:https://kanpiaoxue.iteye.com/blog/2101309
container Collection 标记: 顶级接口 List 标记: interface ArrayList 标记: class ...底层为链表,增删快,查询快 ...LinkedBlockingQueue ...LinkedBlockingQueue 链表结构实现,无界队列(默认上限Integer.MAX_VALUE)
LinkedBlockingQueue :由容器/列表支持的有界阻塞队列 ConcurrentRingBuffer :由片支持的有界无锁队列 安装 go get - u github . com / theodesp / blockingQueues 用法 非阻塞API queue , _ := NewArrayBlock
hashmap如何解决hash冲突,为什么hashmap中的链表需要转成红黑树? hashmap什么时候会触发扩容? jdk1.8之前并发操作hashmap时为什么会有死循环的问题? hashmap扩容时每个entry需要再计算一次hash吗? hashmap的...
【2018最新最详细】并发多线程教程,课程结构如下 1.并发编程的优缺点 2.线程的状态转换以及基本操作 3.java内存模型以及happens-before规则 4.彻底理解synchronized 5.彻底理解volatile 6.你以为你真的了解final吗...
java.util.concurrent总体概览图。 收取资源分3分。需要的同学可以下载一下。 java.util.concurrent主要包括5个部分executor,colletions,locks,atomic...该图详细的列举了并发包下面的结构,包含所有接口和具体实现类。
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。 threadFactory:线程工厂,主要用来创建线程:默认值 ...
LinkedBlockingQueue以及redis队列写入mysql实例